Source code for cbs_utils.misc

"""
Some miscellaneous functions used throughout many cbs modules
"""

import argparse
import errno
import logging
import os
import pathlib
import re
import subprocess
import sys
import time
from pathlib import Path

import numpy as np
import pandas as pd

logger = logging.getLogger(__name__)

try:
    import dateutil.parser as dparser
except ImportError:
    print("Warning: dateutil could not be imported. Some functions may fail")
    dparser = None

try:
    import yaml
except ImportError:
    print("Warning: yaml could not be imported. Some functions may fail")
    yaml = None

try:
    import yamlloader
except ImportError:
    print("Warning: yamlloader could not be imported. Some functions may fail")
    yamlloader = None

try:
    from cbs_utils import Q_
except ImportError:
    Q_ = None

MSG_FORMAT = "{:30s} : {}"


[docs]class Chdir(object): """Class which allows to move to a directory, do something, and move back when done Parameters ---------- new_path: str Location where you want to do something Notes ----- Used on the Gompute cluster in the batch processing script to submit a job inside a directory and then move back to the higher directory in order to move to the next case Examples -------- Go to a known directory (C:/) >>> os.chdir("C:/") >>> os.getcwd() 'C:\\\\' With the Chdir command we move to the C:/Temp directory where we can do something. >>> with Chdir("C:/Windows") as d: ... # in this block we can do something in the directory Temp. ... os.getcwd() 'C:\\\\Windows' We have left the block under Chdir, so we are back at the directory where we started >>> os.getcwd() 'C:\\\\' """ def __init__(self, new_path): self.newPath = new_path def __enter__(self): self.savedPath = os.getcwd() os.chdir(self.newPath) return self def __exit__(self, *args): os.chdir(self.savedPath)
[docs]class Timer(object): """Class to measure the time it takes execute a section of code Parameters ---------- message : str a string to use to the output line name : str, optional The name of the routine timed. verbose : bool, optional if True, produce output units : str, optional time units to use. Default 'ms' n_digits : int, optional number of decimals to add to the timer units Example ------- Use a `with` / `as` construction to enclose the section of code which need to be timed Also, make sure that merge the logger to activate the logger function of the Timer class >>> import logging >>> from numpy import allclose >>> from cbs_utils.misc import (Timer, merge_loggers) >>> number_of_seconds = 1.0 >>> logger = logging.getLogger(__name__) >>> merge_loggers(logger, "cbs_utils") >>> with Timer(units="s", n_digits=0) as timer: ... time.sleep(number_of_seconds) Elapsed time routine : 1 s >>> allclose(number_of_seconds, timer.secs, rtol=0.1) True """ def __init__(self, message="Elapsed time", name="routine", verbose=True, units='ms', n_digits=0, field_width=20): self.message = message self.name = name self.units = units self.secs = None self.duration = None self.verbose = verbose # build the format string. E.g. for field_with=20 and n_digits=1 and units=ms, this produces # the following # "{:<20s} : {:<20s} {:>10.1f} ms" self.format_string = "{:<" + \ "{}".format(field_width) + \ "s}" + \ " {:<" + \ "{}".format(field_width) + \ "s} : {:>" + "{}.{}".format(10, n_digits) + \ "f}" + \ " {}".format(self.units) def __enter__(self): self.start = time.time() return self def __exit__(self, *args): self.end = time.time() # start and end are in seconds. Convert time delta to nano seconds self.delta_time = np.timedelta64(int(1e9 * (self.end - self.start)), 'ns') self.secs = float(self.delta_time / np.timedelta64(1, "s")) # debug output logger.debug("Found delta time in ns: {}".format(self.delta_time)) if self.verbose: # convert the delta time to the desired units self.duration = self.delta_time / np.timedelta64(1, self.units) # produce output logger.info(self.format_string.format(self.message, self.name, self.duration, self.units))
[docs]class ConditionalDecorator(object): """ Add a decorator to a function only if the condition is True Parameters ---------- dec: decorator The decorator which you want to add when condition is true condition: bool Only add the decorator if this condition is True """ def __init__(self, dec, condition): self.decorator = dec self.condition = condition def __call__(self, func): if not self.condition: # Return the function unchanged, not decorated. return func return self.decorator(func)
[docs]class PackageInfo(object): """ A class to analyse the version properties of this package Parameters ---------- module_object: :obj:`Module` reference to the module for which want to to store the properties """ def __init__(self, module_object): self.module_object = module_object self.package_version = None self.git_sha = None self.python_version = None self.build_date = None self.bundle_dir = None if getattr(sys, 'frozen', False): # we are running in a bundle self.get_bundle_version() else: self.get_source_version()
[docs] def get_bundle_version(self): """ Get the version of the current package from the _version_frozen module which was written by the build_executable script. """ try: import _version_frozen except ImportError: print("Could not load _version_frozen. All stay None") else: print("here with bundle {}".format(_version_frozen)) self.bundle_dir = sys._MEIPASS self.package_version = _version_frozen.VERSIONTAG self.git_sha = _version_frozen.GIT_SHA self.python_version = _version_frozen.PYTHON_VERSION self.build_date = _version_frozen.BUILD_DATE
[docs] def get_source_version(self): """ Get the version of the current package via the versioneer approach """ # we are running in a normal Python environment self.bundle_dir = os.path.dirname(os.path.abspath(self.module_object.__file__)) self.package_version = self.module_object.__version__ self.git_sha = self.module_object.__git_sha_key__ self.python_version = get_python_version_number(sys.version_info) self.build_date = pd.to_datetime("now").strftime("%Y%m%d")
[docs]def valid_date(s): """ Check if supplied data *s* is a valid date for the format Year-Month-Day Parameters ---------- s : str A valid date in the form of YYYY-MM-DD, so first the year, then the month, then the day Returns ------- :class:`datetime` Date object with with the year, month, day obtained from the valid string representation Raises ------ argparse.ArgumentTypeError: Notes ----- This is a helper function for the argument parser module `argparse` which allows you to check if the argument passed on the command line is a valid date. Examples -------- This is the direct usage of `valid_date` to see if the date supplied is of format YYYY-MM-DD >>> try: ... date = valid_date("1973-11-12") ... except argparse.ArgumentTypeError: ... print("This date is invalid") ... else: ... print("This date is valid") This date is valid In case an invalid date is supplied >>> try: ... date = valid_date("1973-15-12") ... except argparse.ArgumentTypeError: ... print("This date is invalid") ... else: ... print("This date is valid") This date is invalid Here it is demonstrated how to add a '--startdate' command line option to the argparse parser which checks if a valid date is supplied >>> parser = argparse.ArgumentParser() >>> p = parser.add_argument("--startdate", ... help="The Start Date - format YYYY-MM-DD ", ... required=True, ... type=valid_date) References ---------- https://stackoverflow.com/questions/25470844/specify-format-for-input-arguments-argparse-python """ try: return time.strptime(s, "%Y-%m-%d") except ValueError: msg = "Not a valid date: '{0}'.\nSupply date as YYYY-MM-DD".format(s) raise argparse.ArgumentTypeError(msg)
[docs]def get_path_depth(path_name): """ Get the depth of a path or file name Parameters ---------- path_name : str Path name to get the depth from Returns ------- int depth of the path Examples -------- >>> get_path_depth("C:\Anaconda") 1 >>> get_path_depth("C:\Anaconda\share") 2 >>> get_path_depth("C:\Anaconda\share\pywafo") 3 >>> get_path_depth(".\imaginary\path\subdir\share") 4 """ if os.path.isfile(path_name) and os.path.exists(path_name): current_path = os.path.split(path_name)[0] else: current_path = path_name depth = 0 previous_path = current_path while current_path not in ("", "."): current_path = os.path.split(current_path)[0] if current_path == previous_path: # for a full path name we end at the root 'C:\'. Detect that be comparing with the # previous round break previous_path = current_path depth += 1 return depth
[docs]def scan_base_directory(walk_dir=".", supplied_file_list=None, file_has_string_pattern="", file_has_not_string_pattern="", dir_has_string_pattern="", dir_has_not_string_pattern="", start_date_time=None, end_date_time=None, time_zone=None, time_stamp_year_first=True, time_stamp_day_first=False, extension=None, max_depth=None, sort_file_base_names=False ): """Recursively scan the directory `walk_dir` and get all files underneath obeying the search strings and/or date/time ranges Parameters ---------- walk_dir : str, optional The base directory to start the import. Default = "." supplied_file_list: list, optional In case walk dir is not given we can explicitly pass a file list to analyse. Default = None dir_has_string_pattern : str, optional Requires the directory name to have this pattern (Default value = ""). This selection is only made on the first directory level below the walk_dir dir_has_not_string_pattern : str, optional Requires the directory name NOT to have this pattern (Default value = ""). This selection is only made on the first directory level below the walk_dir file_has_string_pattern : str, optional Requires the file name to have this pattern (Default value = "", i.e. matches all) file_has_not_string_pattern : str, optional Requires the file name NOT to have this pattern (Default value = "") extension : str or None, optional Extension of the file to match. If None, also matches. Default = None max_depth : int, optional Sets a maximum depth to which the search is carried out. Default = None, which does not limit the search depth. For deep file structures setting a limit to the search depth speeds up the search. sort_file_base_names: bool, option If True, sort the resulting file list alphabetically based on the file base name. Default = False start_date_time: DateTime or None, optional If given, get the date time from the current file name and only add the files with a date/time equal or large the *start_date_time*. Default is None end_date_time: DateTime or None, optional If given, get the date time from the current file name and only add the files with a date/time smaller than the *end_date_time*. Default is None time_zone:str or None, optional If given add this time zone to the file stamp. The start and end time should also have a time zone time_stamp_year_first: bool, optional Passed to the datetime parser. If true, the year is first in the date/time string. Default = True time_stamp_day_first: bool, optional Passed to the datetime parser. If true, the day is first in the date/time string. Default = False Returns ------- list All the file names found below the input directory `walk_dir` obeying all the search strings Examples -------- Find all the python files under the share directory in the Anaconda installation folder >>> scan_dir = "C:\\Anaconda\\share" >>> file_list = scan_base_directory(scan_dir, extension='.py') Find all the python files under the share directory in the Anaconda installation folder belonging to the pywafo directory >>> file_list = scan_base_directory(scan_dir, extension='.py', dir_has_string_pattern="wafo") Note that wafo matches on the directory 'pywafo', which is the first directory level below the scan directory. However, if we would match on '^wafo' the returned list would be empty as the directory has to *start* with wafo. In order to get all the files with "test" in the name with a directory depth smaller than 3 do >>> file_list = scan_base_directory(scan_dir, extension='.py', dir_has_string_pattern="wafo", ... file_has_string_pattern="test", max_depth=3) Test the date/time boundaries. First create a file list from 28 sep 2017 00:00 to 5:00 with a hour interval and convert it to a string list >>> file_names = ["AMS_{}.mdf".format(dt.strftime("%y%m%dT%H%M%S")) for dt in ... pd.date_range("20170928T000000", "20170928T030000", freq="30min")] >>> for file_name in file_names: ... print(file_name) AMS_170928T000000.mdf AMS_170928T003000.mdf AMS_170928T010000.mdf AMS_170928T013000.mdf AMS_170928T020000.mdf AMS_170928T023000.mdf AMS_170928T030000.mdf Use the scan_base_directory to get the files within a specific date/time range >>> file_selection = scan_base_directory(supplied_file_list=file_names, ... start_date_time="20170928T010000", end_date_time="20170928T023000") >>> for file_name in file_selection: ... print(file_name) AMS_170928T010000.mdf AMS_170928T013000.mdf AMS_170928T020000.mdf Note that the selected range run from 1 am until 2 am; the end_date_time of 2.30 am is not included """ # get the regular expression for the has_pattern and has_not_pattern of the files and # directories file_has_string = get_regex_pattern(file_has_string_pattern) file_has_not_string = get_regex_pattern(file_has_not_string_pattern) dir_has_string = get_regex_pattern(dir_has_string_pattern) dir_has_not_string = get_regex_pattern(dir_has_not_string_pattern) logger.debug(MSG_FORMAT.format("file_has_string", file_has_string)) logger.debug(MSG_FORMAT.format("file_has_not_string", file_has_not_string)) logger.debug(MSG_FORMAT.format("dir_has_string", dir_has_string)) logger.debug(MSG_FORMAT.format("dir_has_not_string", dir_has_not_string)) # use os.walk to recursively walk over all the file and directories top_directory = True file_list = list() logger.debug("Scanning directory {}".format(walk_dir)) for root, subdirs, files in os.walk(walk_dir, topdown=True): if supplied_file_list is not None: root = "." subdirs[:] = list() files = supplied_file_list logger.debug("root={} sub={} files={}".format(root, subdirs, files)) logger.debug(MSG_FORMAT.format("root", root)) logger.debug(MSG_FORMAT.format("sub dirs", subdirs)) logger.debug(MSG_FORMAT.format("files", files)) # get the relative path towards the top directory (walk_dir) relative_path = os.path.relpath(root, walk_dir) depth = get_path_depth(relative_path) if root == walk_dir: top_directory = True else: top_directory = False # base on the first directory list we are going to make selection of directories to # process if top_directory: include_dirs = list() for subdir in subdirs: add_dir = False if dir_has_string is None or bool(dir_has_string.search(subdir)): add_dir = True if add_dir and dir_has_not_string is not None: if bool(dir_has_not_string.search(subdir)): add_dir = False if add_dir: include_dirs.append(subdir) # overrule the subdirectory list of os.walk: # http://stackoverflow.com/questions/19859840/excluding-directories-in-os-walk logger.debug("Overruling subdirs with {}".format(include_dirs)) subdirs[:] = include_dirs for filename in files: (filebase, ext) = os.path.splitext(filename) if extension is None or extension == ext: add_file = False if file_has_string is None or bool(file_has_string.search(filebase)): # if has_string is none, the search pattern was either empty or invalid (which # happens during typing the regex in the edit_box). In this case, always add the # file. If not none, filter on the regex, so only add the file if the search # pattern is in the filename add_file = True # do not add the file in case the has_not string edit has been set (!="") and if the # file contains the pattern if add_file and file_has_not_string is not None: if bool(file_has_not_string.search(filebase)): # in case we want to exclude the file, the has_not search pattern must be # valid so may not be None add_file = False if add_file and (start_date_time is not None or end_date_time is not None): # we have supplied a start time or a end time. See if we can get a date time # from the file name file_time_stamp = get_time_stamp_from_string( string_with_date_time=filebase, yearfirst=time_stamp_year_first, dayfirst=time_stamp_day_first, timezone=time_zone) if file_time_stamp is not None: # we found a file time stamp. Compare it with the start time if start_date_time is not None: if isinstance(start_date_time, str): # in case the start time was supplied as a string start_date_time = get_time_stamp_from_string( string_with_date_time=start_date_time, yearfirst=time_stamp_year_first, dayfirst=time_stamp_day_first, timezone=time_zone) if file_time_stamp < start_date_time: # the file time stamp is smaller, so don't add it add_file = False # if a end time is supplied. Also compare it with the end time if end_date_time is not None: if isinstance(end_date_time, str): end_date_time = get_time_stamp_from_string( string_with_date_time=end_date_time, yearfirst=time_stamp_year_first, dayfirst=time_stamp_day_first, timezone=time_zone) if file_time_stamp >= end_date_time: # the file time stamp is larger, so don't add it add_file = False if dir_has_string is not None and top_directory: # in case we have specified a directory name with a string search, exclude the # top directory add_file = False if max_depth is not None and depth > max_depth: add_file = False # create the full base name file file_name_to_add = os.path.join(walk_dir, relative_path, filebase) # get the path to the stl relative to the selected scan directory if add_file: logger.debug("Adding file {}".format(filebase)) file_list.append(clear_path(file_name_to_add + ext)) # sort on the file name. First split the file base from the path, because if the file are in # different directories, the first file is not necessarily the oldest if sort_file_base_names: df = pd.DataFrame(data=file_list, index=[os.path.split(f)[1] for f in file_list], columns=["file_list"]) df.sort_index(inplace=True) file_list = df.file_list.values return file_list
[docs]def make_directory(directory): """Create a directory in case it does not yet exist. Parameters ---------- directory : Path or str Name of the directory to create Notes ----- This function is used to create directories without checking if it already exist. If the directory already exists, we can silently continue. Example ------- If you want to create a directory 'outdir', just do:: make_directory("outdir") The directory is created if it doesn't exist, or, we just continue silently if it already exists Raises ------ OSError The OSError is only raised if it is not an `EEXIST` error. This implies that the creation of the directory failed due to another reason than the directory already being present. It could be that the file system is full or that we may not have write permission """ # make sure we are woring with a pathlib Path if isinstance(directory, str): directory = Path(directory) try: directory.mkdir() logger.debug("Created directory : {}".format(directory)) except OSError as exc: # an OSError was raised, see what is the cause if exc.errno == errno.EEXIST and os.path.isdir(directory): # the output directory already exists, that is ok so just continue pass else: # something else was wrong. Raise an error logger.warning( "Failed to create the directory {} because raised:\n{}".format(directory, exc)) raise
[docs]def get_logger(name) -> logging.Logger: """Get the logger of the current level and set the level based on the main routine. Then return it Parameters ---------- name : str the name of the logger to set. Returns ------- logging.Loggertype log: a handle of the current logger Notes ----- This routine is used on top of each function to get the handle to the current logger and automatically set the verbosity level of the logger based on the main function Examples -------- Assume you define a function which need to generate logging information based on the logger created in the main program. In that case you can do >>> def small_function(): ... logger = get_logger(__name__) ... logger.info("Inside 'small_function' This is information to the user") ... logger.debug("Inside 'small_function' This is some debugging stuff") ... logger.warning("Inside 'small_function' This is a warning") ... logger.critical("Inside 'small_function' The world is collapsing!") The logger can be created in the main program using the create_logger routine >>> def main(logging_level): ... main_logger = create_logger(console_log_level=logging_level) ... main_logger.info("Some information in the main") ... main_logger.debug("Now we are calling the function") ... small_function() ... main_logger.debug("We are back in the main function") Let's call the main fuction in DEBUGGING mode >>> main(logging.DEBUG) INFO : Some information in the main DEBUG : Now we are calling the function INFO : Inside 'small_function' This is information to the user DEBUG : Inside 'small_function' This is some debugging stuff WARNING : Inside 'small_function' This is a warning CRITICAL : Inside 'small_function' The world is collapsing! DEBUG : We are back in the main function You can see that the logging level inside the `small_function` is obtained from the main level. Do the same but now in the normal information mode >>> main(logging.INFO) INFO : Some information in the main INFO : Inside 'small_function' This is information to the user WARNING : Inside 'small_function' This is a warning CRITICAL : Inside 'small_function' The world is collapsing! We can call in the silent mode, suppressing all debugging and normal info, but not Warnings >>> main(logging.WARNING) WARNING : Inside 'small_function' This is a warning CRITICAL : Inside 'small_function' The world is collapsing! Finally, to suppress everything except for critical warnings >>> main(logging.CRITICAL) CRITICAL : Inside 'small_function' The world is collapsing! """ # the logger is based on the current main routine log = logging.getLogger(name) log.setLevel(logging.getLogger("__main__").getEffectiveLevel()) return log
[docs]def merge_loggers(main_logger, logger_name_to_merge, logger_level_to_merge=logging.INFO): """ Add the logger of an external module to the local logger Parameters ---------- main_logger: Logger reference of the main logger logger_name_to_merge: str Name of the logger we want to merge logger_level_to_merge: int Level of the logger to merge Returns ------- Logger: merged logger Examples -------- In case you have created a logger in your script with the create_logger function >>> logger = create_logger() And also you have create a module file your_module.py with it's own logger >>> module_logger = logging.getLogger(__name__) In this case you would use the __name__ variable in 'your_module', so this logger is called 'your_module' Now in case you want to add the logger of 'your_module' to the local logger of your script, do >>> merge_loggers(logger, 'your_module') Now all the logger statements in 'your_logger' are also added to logger output """ logger = logging.getLogger(logger_name_to_merge) logger.setLevel(logger_level_to_merge) for handler in main_logger.handlers: if handler not in logger.handlers: logger.addHandler(handler) return logger
[docs]def is_exe(fpath): """Test if a file is an executable Parameters ---------- fpath : str return true or false: Returns ------- bool In case `fpath` is a file that can be executed return True, else False Notes ----- This function can only be used on Linux file systems as the `which` command is used to identity the location of the program. """ # use system command 'which' to locate the full location of the file p = subprocess.Popen("which {}".format(fpath), shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) p_list = p.stdout.read().splitlines() if p_list: # which return a path so copy it to fpath fpath = p_list[0] return os.path.isfile(fpath) and os.access(fpath, os.X_OK)
[docs]def clear_path(path_name): """routine to clear spurious dots and slashes from a path name example bla/././oke becomes bla/oke Parameters ---------- path_name : return: clear_path as a string Returns ------- type clear_path as a string Examples -------- >>> long_path = os.path.join(".", "..", "ok", "yoo", ".", ".", "") + "/" >>> print(long_path) .\..\ok\\yoo\.\.\/ >>> print(clear_path(long_path)) ..\\ok\\yoo """ return str(pathlib.PurePath(path_name))
[docs]def create_logger(name="root", log_file=None, console_log_level=logging.INFO, console_log_format_long=False, console_log_format_clean=False, file_log_level=logging.INFO, file_log_format_long=True, redirect_stderr=True, formatter=None, formatter_file=None, ) -> logging.Logger: """Create a console logger Parameters ---------- name : str, optional Name of the logger. Default = "root" log_file : str, optional The name of the log file in case we want to write it to file. If it is not specified, no file is created console_log_level: int, optional The level of the console output. Defaults to logging.INFO console_log_format_long : bool Use a long informative format for the logging output to the console console_log_format_clean : bool Use a very clean format for the logging output. If given together with consosl_log_format_long an AssertionError is raised file_log_level: int, optional In case the log file is used, specify the log level. Can be different from the console log level. Defaults to logging.INFO file_log_format_long: bool, optional Use a longer format for the file output. Default to True redirect_stderr: bool, optional If True the stderr output is written to a file with .err extension in stated of .out. Default = True formatter: Formatter A formatter can also be explicitly passed formatter_file: Formatter A formatter can also be explicitly passed Returns ------- logging.Logger The handle to the logger which we can use to create output to the screen using the logging module Examples -------- Create a logger at the verbosity level, so no debug information is generated >>> logger = create_logger() >>> logger.debug("This is a debug message") The info and warning message are both plotted >>> logger.info("This is a information message") INFO : This is a information message >>> logger.warning("This is a warning message") WARNING : This is a warning message Create a logger at the debug level >>> logger = create_logger(console_log_level=logging.DEBUG) >>> logger.debug("This is a debug message") DEBUG : This is a debug message >>> logger.info("This is a information message") INFO : This is a information message >>> logger.warning("This is a warning message") WARNING : This is a warning message Create a logger at the warning level. All output is suppressed, except for the warnings >>> logger = create_logger(console_log_level=logging.WARNING) >>> logger.debug("This is a debug message") >>> logger.info("This is a information message") >>> logger.warning("This is a warning message") WARNING : This is a warning message It is also possible to redirect the output to a file. The file name given without an extension, as two file are created: one with the extension .out and one with the extension .err, for the normal user generated out put and system errors output respectively. >>> data_dir = os.path.join(os.path.split(__file__)[0], "..", "..", "data") >>> file_name = os.path.join(data_dir, "log_file") >>> logger = create_logger(log_file=file_name, console_log_level=logging.INFO, ... file_log_level=logging.DEBUG, file_log_format_long=False) >>> logger.debug("This is a debug message") >>> logger.info("This is a information message") INFO : This is a information message >>> logger.warning("This is a warning message") WARNING : This is a warning message >>> print("system normal message") system normal message >>> print("system error message", file=sys.stderr) At this point, two files have been generated, log_file.out and log_file.err. The first contains the normal logging output whereas the second contains error message generated by other packages which do not use the logging module. Note that the normal print statement shows up in the console but not in the file, whereas the second print statement to the stderr output does not show on the screen but is written to log_file.err To show the contents of the generated files we do >>> with open(file_name+".out", "r") as fp: ... for line in fp.readlines(): ... print(line.strip()) DEBUG : This is a debug message INFO : This is a information message WARNING : This is a warning message >>> sys.stderr.flush() # forces to flush the stderr output buffer to file >>> with open(file_name + ".err", "r") as fp: ... for line in fp.readlines(): ... print(line.strip()) system error message References ---------- https://docs.python.org/3/library/logging.html#levels """ # start with creating the logger with a DEBUG level _logger = logging.getLogger(name) _logger.setLevel(logging.DEBUG) _logger.handlers = [] # create a console handle with a console log level which may be higher than the current level ch = logging.StreamHandler(stream=sys.stdout) ch.setLevel(console_log_level) fh = None # create file handler if a file name is given with more info if log_file is not None: log_file_out = log_file + ".out" fh = logging.FileHandler(log_file_out, mode='w') fh.setLevel(file_log_level) if redirect_stderr: error_file = log_file + ".err" sys.stderr = open(error_file, 'w') formatter_long = logging.Formatter('[%(asctime)s] %(levelname)8s --- %(message)s ' + '(%(filename)s:%(lineno)s)', datefmt='%Y-%m-%d %H:%M:%S') formatter_normal = logging.Formatter('%(levelname)6s : %(message)s') formatter_short = logging.Formatter('%(message)s') if console_log_format_clean and console_log_format_long: raise AssertionError("Can only specify either a long or a short logging format. Not both " "at the same time") # create formatter and add it to the handlers for the console output if formatter is not None: # if the formatter is given it overrides all other formatter_cons = formatter elif console_log_format_long: formatter_cons = formatter_long elif console_log_format_clean: formatter_cons = formatter_short else: formatter_cons = formatter_normal ch.setFormatter(formatter_cons) if log_file is not None: if formatter_file is not None: # if the formatter is given it overrides all other formatter_file = formatter_file elif file_log_format_long: formatter_file = formatter_long else: formatter_file = formatter_normal # create console handler with a higher log level fh.setFormatter(formatter_file) _logger.addHandler(ch) if log_file: _logger.addHandler(fh) return _logger
[docs]def delete_module(modname, paranoid=None): """Delete a module from memory which loaded before Parameters ---------- modname : str The name of the module to remove paranoid : list or None (Default value = None) """ from sys import modules try: thismod = modules[modname] except KeyError: raise ValueError(modname) these_symbols = dir(thismod) if paranoid: try: paranoid[:] # sequence support except IndexError: raise ValueError('must supply a finite list for paranoid') else: these_symbols = paranoid[:] del modules[modname] for mod in list(modules.values()): try: delattr(mod, modname) except AttributeError: pass if paranoid: for symbol in these_symbols: if symbol[:2] == '__': # ignore special symbols continue try: delattr(mod, symbol) except AttributeError: pass
[docs]def get_clean_version(version) -> str: """turns the full version string into a clean one without the build Parameters ---------- version : str The version string as return from versioneer. Returns ------- str The clean version string Notes ----- The version string matches the following regular expression "([.|\d]+)([+]*)(.*)" This function return the clean version string given by the part "([.|\d]+)" Examples -------- >>> get_clean_version("1.3") '1.3' >>> get_clean_version("2.5+dev.g43429") '2.5' >>> get_clean_version("4.3.1+dev.g43429-dirty") '4.3.1' """ match = re.search("([.|\d]+)([+]*)(.*)", version) if bool(match): version = match.group(1) else: version = version return version
[docs]def read_settings_file(file_name) -> dict: """Read the yaml file to get the setup information. Parameters ---------- file_name : str Name of the configuration file. Can be a full path name as well Returns ------- dict All the settings as obtained from the yaml configuration file Notes ----- The file name of the yaml file is searched for in the following order 1. The current directory where the script is executed. If a full path is given, this will be accepted too. 2. The directory where the original script is located. In this way, a default settings file can be put in the script directory and the user does not need to copy it except a setting values needs to be changed Raises ------ AssertionError: In case the file can not be found """ if os.path.exists(file_name): logger.info("Loading configuration file {}".format(file_name)) configuration_file = file_name else: logger.info("Loading configuration file from script dir {}".format(__name__)) configuration_file = os.path.join(os.path.split(__file__)[0], os.path.split(file_name)[1]) try: logger.debug("Trying to read configuration file {}".format(configuration_file)) with open(configuration_file, "r") as stream: settings = yaml.load(stream=stream, Loader=yamlloader.ordereddict.CLoader) except IOError as err: raise AssertionError("Configuration file can not be found in either current directory of " "script directory. Goodbye. {}".format(err)) return settings
[docs]def get_python_version_number(version_info) -> str: """Script to turn the version info as obtained with sys.version_info into a digit number Parameters ---------- version_info : return: a string with the current python version as a clear digit, i.e. 3.5.3 Returns ------- str a string with the current python version as a clear digit, i.e. 3.5.3 Examples -------- >>> version_string = get_python_version_number(sys.version_info) """ python_version = "{:d}".format(version_info.major) if version_info.minor != "": python_version += ".{:d}".format(version_info.minor) if version_info.micro != "": python_version += ".{:d}".format(version_info.micro) return python_version
[docs]def get_regex_pattern(search_pattern): """Routine to turn a string into a regular expression which can be used to match a string Parameters ---------- search_pattern : str A regular expression in the form of a string Returns ------- None or compiled regular expression A regular expression as return by the re.compile fucntion or None in case a invalid regular expression was given Notes ----- An empty string or an invalid search_pattern will yield a None return """ regular_expresion = None if search_pattern != "": try: regular_expresion = re.compile("{}".format(search_pattern)) except re.error: regular_expresion = None return regular_expresion
[docs]def clear_argument_list(argv): """ Small utility to remove the \'\\\\r\' character from the last argument of the argv list appearing in cygwin Parameters ---------- argv : list The argument list stored in `sys.argv` Returns ------- list Cleared argument list """ new_argv = list() for arg in argv: # replace the '\r' character with a empty space arg = re.sub("\r", "", arg) if arg != "": # only add the argument if it is not empty new_argv.append(arg) return new_argv
[docs]def query_yes_no(question, default_answer="no"): """Ask a yes/no question via raw_input() and return their answer. Parameters ---------- question : str A question to ask the user default_answer : str, optional A default answer that is given when only return is hit. Default to 'no' Returns ------- str: "yes" or "no", depending on the input of the user """ log = get_logger(__name__) valid = {"yes": "yes", "y": "yes", "ye": "yes", "no": "no", "n": "no"} if not default_answer: prompt = " [y/n] " elif default_answer == "yes": prompt = " [Y/n] " elif default_answer == "no": prompt = " [y/N] " else: raise ValueError("invalid default answer: '%s'" % default_answer) while 1: # sys.stdout.write(question + prompt) log.warning(question + prompt) choice = input().lower() if default_answer is not None and choice == '': return default_answer elif choice in list(valid.keys()): return valid[choice] else: sys.stdout.write("Please respond with 'yes' or 'no' " "(or 'y' or 'n').\n")
[docs]def move_script_path_to_back_of_search_path(script_file, append_at_the_end=True) -> list: """Move the name of a script to the front or the back of the search path Parameters ---------- script_file : str Name of the script to move append_at_the_end: bool, optional, default=True Append the name of the script to the end. In case this flag is false, the script file is prepended to the path Returns ------- list: The new system path stored in a list Notes ----- This script is sometimes required if the __version string is messing up with another __version string Examples -------- sys.path = move_script_path_to_back_of_search_path(__file__) """ script_file = os.path.realpath(script_file) path_to_script = os.path.split(script_file)[0] path_to_script_forward = re.sub("\\\\", "/", path_to_script) new_sys_path = list() for path in sys.path: path_forward = re.sub("\\\\", "/", path) if path_forward != path_to_script_forward: new_sys_path.append(path) if append_at_the_end: new_sys_path.append(path_to_script) else: new_sys_path = [path_to_script] + new_sys_path return new_sys_path
[docs]def read_value_from_dict_if_valid(dictionary, key, default_value=None): """small routine to read a value from a dictionary. If the value is not set, just return the default value Parameters ---------- dictionary : dictionary which is supposed to carry this key field key : the name of the field to read the value from default_value : default value in case we fail to read the key field (if it does not exist) Returns ------- type value of the key field or the default value """ try: value = dictionary[key] except KeyError: value = default_value return value
[docs]def set_value_if_valid(value, new_value): """small routine to set a value on if it is not none. Otherwise the original value is taken Parameters ---------- value : the original value which you can pre-define with a default value new_value : the new value. Only set this if it is not none Returns ------- type net value or the original if new_value was None """ if new_value is not None: value = new_value return value
[docs]def compare_objects(obj1, obj2, counter=0, max_recursion_depth=4): """ Compare if two object are equal Parameters ---------- obj1: class first object obj2: class second object counter: int Current recursion depth. Keeps track of how many time we have recursively called this function max_recursion_depth: int Maximum depth to which we are comparing the objects. Notes ----- * This function compares all the attributes of two object to see if their values are the same * An attribute field may be another object which we also want to compare with the same attribute of the other object. This is done by recursively calling this function again. * Due to the recursive call mechanism we may end up in a infinite loop. To prevent this, a maximum recursion depth can be given. * The test function *test_sequence_tool* of the *sequence_tool_utils* module uses this function to compare to *SequenceToolSummary* objects Raises ------ AssertionError: In case on of the object fields is not equal """ for att in dir(obj1): if att.startswith("_"): continue val1 = getattr(obj1, att) val2 = getattr(obj2, att) try: np.testing.assert_equal(val1, val2) except AssertionError: if type(val1) is str or type(val1) is list: raise counter += 1 if counter < max_recursion_depth: compare_objects(val1, val2) else: continue
[docs]def set_default_dimension(parse_value, default_dimension=None, force_default_units=False): """ Add a *pint* dimension to a value Parameters ---------- parse_value: ndarray or str or float Value with optional a dimension written in the form of a str. Can be an array or list of strings as well default_dimension: str Required default dimension force_default_units: bool If true the only allowed dimension is the default dimension. Raise an error in case this is not the case. Default = False Returns ------- :obj:`Quantity` Value with the quantity as give by the default Raises ------ AssertionError In case the dimension of the *parse_value* argument is not not but: 1. Its dimensionality is not the same as the dimensionality of the *default_dimension* 2. Its units is not the same as the unit of the *default_dimension* and the *force_default_units* flag is set to *True* Notes ----- * This function is a add-on to the *pint* module, a package to define, operate and manipulate physical quantities: https://pypi.python.org/pypi/Pint. * This function is used to add a dimension to a value which is parsed from a text file. * It is checked if the value given in the text file has dimension already, for example that it was given as "1.0 m/s". * If a dimension was given already: check if the dimensionality (in this case: Length/Time) is the same as the dimensionality of the *default_dimension* input argument. * In case the input value does not have an explicit dimension, the dimension given by *default_dimension* is added to the value. * This function works on both scalar and list values Examples -------- Assume we want to read input values from a text file as plain numbers and we want to add a default dimension of *meter* to it in case the value do not have an explicit dimension yet. Just do >>> logger = create_logger(console_log_level=logging.CRITICAL) >>> value_without_dimension = 1.0 # this is the values as we read from the text file >>> value_with_dimension = set_default_dimension(value_without_dimension, "meter") >>> print(value_with_dimension) 1.0 meter The variable *value_with_dimension* is now a pint quantity which carries the dimension meter. In case the input variable already has a dimension, we should also be able to use this *function*. The only requirement is that the dimensionality is the same. So this should work >>> value_with_dimension = set_default_dimension("2.5 meter", "meter") >>> print(value_with_dimension) 2.5 meter This should work as well >>> value_with_dimension = set_default_dimension("5.0 mm", "meter") >>> print(value_with_dimension) 5.0 millimeter But this fails as the dimensionality of the input argument is not corresponding with the dimensionality of the default dimension >>> try: ... value_with_dimension = set_default_dimension("5.0 mm", "second") ... except AssertionError: ... print("This fails because the dimensionality is not the same") This fails because the dimensionality is not the same This function should also work for arrays and list >>> values_without_dimension = np.linspace(0, 1, num=5, endpoint=True) >>> values_with_dimension = set_default_dimension(values_without_dimension, "meter/second^2") >>> print(values_with_dimension) [0. 0.25 0.5 0.75 1. ] meter / second ** 2 Notes ----- * Hz are not converted to rad/s as expected. Therefore do not try to use this to convert Hz -> rad/s * If the input argument *parse_val* is None, a None is returned as output as well """ if default_dimension is not None: def_unit_val = Q_(1, default_dimension) else: def_unit_val = None dimensionless_unit_val = Q_("1") dimensionless = dimensionless_unit_val.dimensionality if parse_value is not None: # in case no dimensions are given with the parse_value argument, impose them if isinstance(parse_value, (list, tuple, np.ndarray)): # to properly deal with arrays and list first check if we have one v = Q_(parse_value[0]) # if this is allowed we have and array. Check the value and dimension of the first # element if v.dimensionality == dimensionless and v.units == dimensionless_unit_val.units: if not isinstance(parse_value[0], type(dimensionless_unit_val)): # there are no dimensions. Just convert the array, add the dimensions later ret_val = Q_(parse_value) else: # we have added the quantity to the parse_value already. Just copy it ret_val = parse_value else: # The element have a dimension, to convert the array in a bare array without # dimensions and copy the dimension type to the default. Then we can just do the # conversion below parse_value = np.array([Q_(x).magnitude for x in parse_value]) ret_val = Q_(parse_value) if def_unit_val is not None and v.dimensionality != def_unit_val.dimensionality: raise AssertionError( "The first value of the array given has a dimension with a different " "dimensionality as the default dimension. Found {}. Expected {}" "".format(v.dimensionality, def_unit_val.dimensionality)) def_unit_val = v.units elif not isinstance(parse_value, type(dimensionless_unit_val)): # the parse_value is not yet a quantity objects ret_val = Q_(parse_value) else: # the parser value is a quantity already. Just copy it ret_val = parse_value if ret_val.dimensionality == dimensionless and ret_val.units == dimensionless_unit_val.units: # if no dimension is given, add the default dimension ret_val = Q_(np.asarray(parse_value), default_dimension) if ret_val.dimensionality != dimensionless: logger.debug("A dimensionless value was and a default dimension was imposed " "{} -> {}.".format(parse_value, ret_val)) elif def_unit_val is not None: # check if the dimensionality is the same as the def_units if ret_val.dimensionality != def_unit_val.dimensionality: raise AssertionError( "Value given has a dimension with a different dimensionality as the default " "dimension\nFound {}. Expected {}".format(ret_val.dimensionality, def_unit_val.dimensionality)) # we want to force the units. Check it if force_default_units: if ret_val.units != def_unit_val.units: raise AssertionError( "The dimensions given to the value do not match the default units. \n" "Found {}. Expected {}\nPlease fix or set *only_default_units_allowed* " "to False".format(ret_val.units, def_unit_val.units)) else: # in case a none value is given as input just return none as output ret_val = None return ret_val
[docs]def get_value_magnitude(value, convert_to_base_units=True): """ Get the magnitude of value with *Pint* dimension in terms of its base units or just return a float if *value* does not have a dimension Parameters ---------- value: Quantity or float or None A value with a Pint dimension or a normal float. In both cases, the value without dimension is returned convert_to_base_units: bool, optional Before turning the value into a magnitude first turn the quantity into its SI base units. Default = True Returns ------- float or None Magnitude of the value in case a Pint Quantity was added to the input or just the value itself. If *convert_to_base_units* was set to True the value is first converted to its SI base units Examples -------- Assume we have a value with a pint dimension >>> velocity = Q_("2.5 m/s") >>> print("Current velocity with dimension is: {}".format(velocity)) Current velocity with dimension is: 2.5 meter / second We can now get the magnitude of *velocity* using this function as >>> velocity_mag = get_value_magnitude(velocity) >>> print("Velocity without dimension is: {}".format(velocity_mag)) Velocity without dimension is: 2.5 In case the input argument of the *get_value_magnitude* is a float and does not have a dimension, the value itself is returned >>> velocity_mag2 = get_value_magnitude(velocity_mag) >>> print("Velocity without dimension is: {}".format(velocity_mag2)) Velocity without dimension is: 2.5 In case we have a dimension in none SI units, the value is by default first converted to its SI base units. >>> velocity_knots = Q_("1 knot") >>> velocity_mag = get_value_magnitude(velocity_knots) >>> print("Velocity {} is converted to its magnitude in m/s: {:.2f}" ... "".format(velocity_knots, velocity_mag)) Velocity 1 knot is converted to its magnitude in m/s: 0.51 In case that the *convert_to_base_units* flag is False we just get the magnitude in the same units as the input argument >>> velocity_knots = Q_("2.5 knot") >>> velocity_mag = get_value_magnitude(velocity_knots, convert_to_base_units=False) >>> print("Velocity {} is converted to its magnitude in knots: {:.2f}" ... "".format(velocity_knots, velocity_mag)) Velocity 2.5 knot is converted to its magnitude in knots: 2.50 Notes ----- * This function is used inside other functions in which it is not know before hand if an input argument is passed with or without a Pint dimension and we only are interested in the magnitude of the value. Use this function to get the magnitude """ try: if convert_to_base_units: value = value.to_base_units() value_mag = value.magnitude except AttributeError: value_mag = value return value_mag
[docs]def get_time_stamp_from_string(string_with_date_time, yearfirst=True, dayfirst=False, timezone=None): """ Try to get a date/time stamp from a string Parameters ---------- string_with_date_time: str The string to analyses yearfirst: bool, optional if true put the year first. See *dateutils.parser*. Default = True dayfirst: bool, optional if true put the day first. See *dateutils.parser*. Default = False timezone: str or None, optional if given try to add this time zone:w Returns ------- :obj:`DateTime` Pandas data time string Examples -------- The date time in the file 'AMSBALDER_160929T000000' is 29 sep 2016 and does not have a time zone specification. The returned time stamp does also not have a time zone >>> file_name="AMSBALDER_160929T000000" >>> time_stamp =get_time_stamp_from_string(string_with_date_time=file_name) >>> print("File name {} has time stamp {}".format(file_name, time_stamp)) File name AMSBALDER_160929T000000 has time stamp 2016-09-29 00:00:00 We can also force to add a time zone. The Etc/GMT-2 time zone is UTC + 2 time zone which is the central europe summer time (CEST) or the Europe/Amsterdam Summer time. >>> time_stamp =get_time_stamp_from_string(string_with_date_time=file_name, ... timezone="Etc/GMT-2") >>> print("File name {} has time stamp {}".format(file_name, time_stamp)) File name AMSBALDER_160929T000000 has time stamp 2016-09-29 00:00:00+02:00 This time we assume the file name already contains a time zone, 2 hours + UTC. Since we already have a time zone, the *timezone* option can only convert the date time to the specified time zone. >>> file_name="AMSBALDER_160929T000000+02" >>> time_stamp =get_time_stamp_from_string(string_with_date_time=file_name, ... timezone="Etc/GMT-2") >>> print("File name {} has time stamp {}".format(file_name, time_stamp)) File name AMSBALDER_160929T000000+02 has time stamp 2016-09-29 00:00:00+02:00 In case the time zone given by the *timezone* options differs with the time zone in the file name, the time zone is converted >>> file_name="AMSBALDER_160929T000000+00" >>> time_stamp =get_time_stamp_from_string(string_with_date_time=file_name, ... timezone="Etc/GMT-2") >>> print("File name {} has time stamp {}".format(file_name, time_stamp)) File name AMSBALDER_160929T000000+00 has time stamp 2016-09-29 02:00:00+02:00 """ try: file_time_stamp = dparser.parse(string_with_date_time, fuzzy=True, yearfirst=yearfirst, dayfirst=dayfirst) file_time_stamp = pd.Timestamp(file_time_stamp) except ValueError: file_time_stamp = None else: # we have found a time stamp. See if we have to add a time zone if timezone is not None: try: file_time_stamp = file_time_stamp.tz_localize(timezone) except TypeError: # a time zone was present already. Then try to convert it file_time_stamp = file_time_stamp.tz_convert(timezone) return file_time_stamp
[docs]def range1(start=None, stop=None): """ Return a range including the end value Parameters ---------- start: int or None Start in case both start and stop are defined. Othersize start becomes stop stop Stop value incudling end in case also start is definefd. Returns ------- list Range of integer values in betwween start and stop, including the stpo value """ assert (start is not None or stop is not None), "At least one parameter must be given" if stop is None: stop = start + 1 start = 0 else: stop = stop + 1 return list(range(start, stop))
[docs]def is_postcode(postcode): """ kijk of een string een postcode is Parameters ---------- postcode: srt De string om te controleren Returns ------- bool: True als het een postcode is """ return bool(re.match(r"\d{4}\s{0,1}[a-zA-Z]{2}", postcode))
[docs]def standard_postcode(postcode): """ Maak een standaard vorm van een postcode Parameters ---------- postcode: str Postcode string in niet standaard vorm, zoals 2613 AB, 2613ab, etc Returns ------- str: Post code in standaard vorm: 2613AB """ return re.sub(r"\s+", "", postcode).upper()
[docs]def get_dir_size(directory_name): """ Returns the size of the current directory in Bytes Parameters ---------- directory_name: str Name of the directory Returns ------- int: Size of the directory in Buyt Notes ----- * Just of oneliner using the Pathlib """ return sum(f.stat().st_size for f in Path(directory_name).iterdir() if f.is_file())
[docs]def dataframe_clip_strings(df, max_width, include=None, exclude=None): """ Clip all strings in a dataframe Parameters ---------- df: DataFrame Pandas data frame max_width: int Clip strings to this width include: list, optional give a list of column names to clip. Exclude the rest include: list, optional give a list of column names not to clip. Include the rest Returns ------- Pandas data frame with clip string columns """ for cn in df.columns: if include is not None and cn not in include: continue if exclude is not None and cn in exclude: continue try: df[cn] = [vv[:min(max_width, len(vv))] for vv in df[cn].values if vv is not None] except TypeError: pass return df
[docs]def get_version(default_version=None): """ Get the current git version of this questionary Returns ------- str: current git version """ process = subprocess.Popen(["git", "describe", "--tags"], stdout=subprocess.PIPE, stderr=subprocess.PIPE) stat1, stat2 = process.communicate() if stat1.decode() == "": logger.info("No git version found in questionnaire folder. Is it under git control?") survey_version = default_version logger.info("Overruling with version in yaml file: {}".format(survey_version)) else: survey_version = stat1.decode().strip() logger.info("Survey version found: {}".format(stat1.decode())) return survey_version
[docs]def get_branch(default_branch=None): """ Get the current git version of this questionary Parameters ---------- default_branch: str De default naam die we aan het branch geven als we niks kunnen vinden Returns ------- str: current branch version """ process = subprocess.Popen(["git", "branch", "--no-color"], stdout=subprocess.PIPE, stderr=subprocess.PIPE) stat1, stat2 = process.communicate() git_branch = stat1.decode().strip() branch_list = git_branch.split("\n") survey_branch = default_branch if not branch_list: logger.info("No git version found in questionnaire folder. Is it under git control?") logger.info("Overruling with version in yaml file: {}".format(survey_branch)) else: for branch in branch_list: # deze regex matcht de actieve branch (met de * voor de branch naam) match = re.match("\*\s(.*)", branch) if bool(match): survey_branch = match.group(1) break return survey_branch
[docs]class CacheInfo(object): """ Klasse om een informatie van de cache te bewaren Parameters ---------- file_name: str Naam van de cache file directory: str, optionalo Cache directory. Default = "." file_type: str type van de cache """ def __init__(self, file_name, directory=".", file_type=None, reset_cache=False): self.file_name = Path(file_name) self.directory = Path(directory) self.file_type = file_type self.directory.mkdir(exist_ok=True) self.read_from_cache = False logger.debug(f"Creating cache name with file={file_name} dir={directory} type={file_type}") self.cache_file = self.make_file_name() # create a meta file name where we can store some info file_name_base = str(self.cache_file.with_suffix("").with_suffix("")) self.cache_meta_file = Path(file_name_base + "_meta.yml") self.set_read_from_cache_flag(reset_cache=reset_cache)
[docs] def set_read_from_cache_flag(self, reset_cache): if self.cache_file.exists() and not reset_cache: self.read_from_cache = True else: self.read_from_cache = False
[docs] def make_file_name(self) -> Path: suffixes = self.file_name.suffixes if len(suffixes) > 1: this_file_type = ".".join(suffixes) elif len(suffixes) == 1: this_file_type = self.file_name.suffix else: if self.file_type is None: raise ValueError("file name does not have a type and file_type not defied") else: this_file_type = self.file_type if self.file_type is not None: assert this_file_type == self.file_type file_name_base = str(self.file_name.with_suffix('').with_suffix('')) this_file_type = re.sub("^\.", "", this_file_type) cache_file = Path(self.directory) / Path(".".join([file_name_base, this_file_type])) logger.debug(f"Cache file: {cache_file}") return cache_file
[docs]def clean_up_name(name): """ Verwijder alle vervelende chars zoals [ of ] of \\ Parameters ---------- name: str String die schoon gemaakt moet worden Returns ------- str: Schone naam """ bad_chars = "[]/.() " for char in list(bad_chars): name = name.replace(char, "_") name = re.sub("_{1,}", "_", name) name = re.sub("_$", "", name) return name