--- /dev/null
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+"""
+@author: Frank Brehm
+@contact: frank.brehm@pixelpark.com
+@copyright: © 2018 by Frank Brehm, Berlin
+@summary: The module for the application object.
+"""
+from __future__ import absolute_import
+
+# Standard modules
+import sys
+import os
+import logging
+import re
+import traceback
+
+# Third party modules
+import argparse
+
+# Own modules
+from .errors import FunctionNotImplementedError, PpAppError
+
+from .common import terminal_can_colors
+from .common import caller_search_path
+
+from .colored import ColoredFormatter, colorstr
+
+from .obj import PpBaseObject
+
+__version__ = '0.1.1'
+LOG = logging.getLogger(__name__)
+
+
+# =============================================================================
+class CrTplApplication(PpBaseObject):
+ """
+ Class for the application objects.
+ """
+
+ re_prefix = re.compile(r'^[a-z0-9][a-z0-9_]*$', re.IGNORECASE)
+ re_anum = re.compile(r'[^A-Z0-9_]+', re.IGNORECASE)
+
+ # -------------------------------------------------------------------------
+ def __init__(
+ self, appname=None, verbose=0, version=__version__, base_dir=None,
+ initialized=False, usage=None, description=None,
+ argparse_epilog=None, argparse_prefix_chars='-', env_prefix=None):
+
+ self.arg_parser = None
+ """
+ @ivar: argparser object to parse commandline parameters
+ @type: argparse.ArgumentParser
+ """
+
+ self.args = None
+ """
+ @ivar: an object containing all commandline parameters
+ after parsing them
+ @type: Namespace
+ """
+
+ self._exit_value = 0
+ """
+ @ivar: return value of the application for exiting with sys.exit().
+ @type: int
+ """
+
+ self._usage = usage
+ """
+ @ivar: usage text used on argparse
+ @type: str
+ """
+
+ self._description = description
+ """
+ @ivar: a short text describing the application
+ @type: str
+ """
+
+ self._argparse_epilog = argparse_epilog
+ """
+ @ivar: an epilog displayed at the end of the argparse help screen
+ @type: str
+ """
+
+ self._argparse_prefix_chars = argparse_prefix_chars
+ """
+ @ivar: The set of characters that prefix optional arguments.
+ @type: str
+ """
+
+ self._terminal_has_colors = False
+ """
+ @ivar: flag, that the current terminal understands color ANSI codes
+ @type: bool
+ """
+
+ self._quiet = False
+
+ self.env = {}
+ """
+ @ivar: a dictionary with all application specifiv environment variables,
+ they will detected by the env_prefix property of this object,
+ and their names will transformed before saving their values in
+ self.env by removing the env_prefix from the variable name.
+ @type: dict
+ """
+
+ self._env_prefix = None
+ """
+ @ivar: a prefix for environment variables to detect them and to assign
+ their transformed names and their values in self.env
+ @type: str
+ """
+
+ super(CrTplApplication, self).__init__(
+ appname=appname,
+ verbose=verbose,
+ version=version,
+ base_dir=base_dir,
+ initialized=False,
+ )
+
+ if env_prefix:
+ ep = str(env_prefix).strip()
+ if not ep:
+ msg = "Invalid env_prefix {!r} given - it may not be empty.".format(env_prefix)
+ raise PpAppError(msg)
+ match = self.re_prefix.search(ep)
+ if not match:
+ msg = (
+ "Invalid characters found in env_prefix {!r}, only "
+ "alphanumeric characters and digits and underscore "
+ "(this not as the first character) are allowed.").format(env_prefix)
+ raise PpAppError(msg)
+ self._env_prefix = ep
+ else:
+ ep = self.appname.upper() + '_'
+ self._env_prefix = self.re_anum.sub('_', ep)
+
+ self._init_arg_parser()
+ self._perform_arg_parser()
+
+ self._init_env()
+ self._perform_env()
+
+ # -----------------------------------------------------------
+ @property
+ def exit_value(self):
+ """The return value of the application for exiting with sys.exit()."""
+ return self._exit_value
+
+ @exit_value.setter
+ def exit_value(self, value):
+ v = int(value)
+ if v >= 0:
+ self._exit_value = v
+ else:
+ LOG.warn("Wrong exit_value {!r}, must be >= 0".format(value))
+
+ # -----------------------------------------------------------
+ @property
+ def exitvalue(self):
+ """The return value of the application for exiting with sys.exit()."""
+ return self._exit_value
+
+ @exitvalue.setter
+ def exitvalue(self, value):
+ self.exit_value = value
+
+ # -----------------------------------------------------------
+ @property
+ def usage(self):
+ """The usage text used on argparse."""
+ return self._usage
+
+ # -----------------------------------------------------------
+ @property
+ def description(self):
+ """A short text describing the application."""
+ return self._description
+
+ # -----------------------------------------------------------
+ @property
+ def argparse_epilog(self):
+ """An epilog displayed at the end of the argparse help screen."""
+ return self._argparse_epilog
+
+ # -----------------------------------------------------------
+ @property
+ def argparse_prefix_chars(self):
+ """The set of characters that prefix optional arguments."""
+ return self._argparse_prefix_chars
+
+ # -----------------------------------------------------------
+ @property
+ def terminal_has_colors(self):
+ """A flag, that the current terminal understands color ANSI codes."""
+ return self._terminal_has_colors
+
+ # -----------------------------------------------------------
+ @property
+ def env_prefix(self):
+ """A prefix for environment variables to detect them."""
+ return self._env_prefix
+
+ # -----------------------------------------------------------
+ @property
+ def usage_term(self):
+ """The localized version of 'usage: '"""
+ return 'Usage: '
+
+ # -----------------------------------------------------------
+ @property
+ def usage_term_len(self):
+ """The length of the localized version of 'usage: '"""
+ return len(self.usage_term)
+
+ # -----------------------------------------------------------
+ @property
+ def quiet(self):
+ """Quiet execution of the application,
+ only warnings and errors are emitted."""
+ return self._quiet
+
+ @quiet.setter
+ def quiet(self, value):
+ self._quiet = bool(value)
+
+ # -------------------------------------------------------------------------
+ def exit(self, retval=-1, msg=None, trace=False):
+ """
+ Universal method to call sys.exit(). If fake_exit is set, a
+ FakeExitError exception is raised instead (useful for unittests.)
+
+ @param retval: the return value to give back to theoperating system
+ @type retval: int
+ @param msg: a last message, which should be emitted before exit.
+ @type msg: str
+ @param trace: flag to output a stack trace before exiting
+ @type trace: bool
+
+ @return: None
+
+ """
+
+ retval = int(retval)
+ trace = bool(trace)
+
+ root_logger = logging.getLogger()
+ has_handlers = False
+ if root_logger.handlers:
+ has_handlers = True
+
+ if msg:
+ if has_handlers:
+ if retval:
+ LOG.error(msg)
+ else:
+ LOG.info(msg)
+ if not has_handlers:
+ if hasattr(sys.stderr, 'buffer'):
+ sys.stderr.buffer.write(str(msg) + "\n")
+ else:
+ sys.stderr.write(str(msg) + "\n")
+
+ if trace:
+ if has_handlers:
+ if retval:
+ LOG.error(traceback.format_exc())
+ else:
+ LOG.info(traceback.format_exc())
+ else:
+ traceback.print_exc()
+
+ sys.exit(retval)
+
+ # -------------------------------------------------------------------------
+ def as_dict(self, short=True):
+ """
+ Transforms the elements of the object into a dict
+
+ @param short: don't include local properties in resulting dict.
+ @type short: bool
+
+ @return: structure as dict
+ @rtype: dict
+ """
+
+ res = super(CrTplApplication, self).as_dict(short=short)
+ res['exit_value'] = self.exit_value
+ res['usage'] = self.usage
+ res['quiet'] = self.quiet
+ res['description'] = self.description
+ res['argparse_epilog'] = self.argparse_epilog
+ res['argparse_prefix_chars'] = self.argparse_prefix_chars
+ res['terminal_has_colors'] = self.terminal_has_colors
+ res['env_prefix'] = self.env_prefix
+
+ return res
+
+ # -------------------------------------------------------------------------
+ def init_logging(self):
+ """
+ Initialize the logger object.
+ It creates a colored loghandler with all output to STDERR.
+ Maybe overridden in descendant classes.
+
+ @return: None
+ """
+
+ log_level = logging.INFO
+ if self.verbose:
+ log_level = logging.DEBUG
+ elif self.quiet:
+ log_level = logging.WARNING
+
+ root_logger = logging.getLogger()
+ root_logger.setLevel(log_level)
+
+ # create formatter
+ format_str = ''
+ if self.verbose:
+ format_str = '[%(asctime)s]: '
+ format_str += self.appname + ': '
+ if self.verbose:
+ if self.verbose > 1:
+ format_str += '%(name)s(%(lineno)d) %(funcName)s() '
+ else:
+ format_str += '%(name)s '
+ format_str += '%(levelname)s - %(message)s'
+ formatter = None
+ if self.terminal_has_colors:
+ formatter = ColoredFormatter(format_str)
+ else:
+ formatter = logging.Formatter(format_str)
+
+ # create log handler for console output
+ lh_console = logging.StreamHandler(sys.stderr)
+ lh_console.setLevel(log_level)
+ lh_console.setFormatter(formatter)
+
+ root_logger.addHandler(lh_console)
+
+ return
+
+ # -------------------------------------------------------------------------
+ def terminal_can_color(self):
+ """
+ Method to detect, whether the current terminal (stdout and stderr)
+ is able to perform ANSI color sequences.
+
+ @return: both stdout and stderr can perform ANSI color sequences
+ @rtype: bool
+
+ """
+
+ term_debug = False
+ if self.verbose > 3:
+ term_debug = True
+ return terminal_can_colors(debug=term_debug)
+
+ # -------------------------------------------------------------------------
+ def post_init(self):
+ """
+ Method to execute before calling run(). Here could be done some
+ finishing actions after reading in commandline parameters,
+ configuration a.s.o.
+
+ This method could be overwritten by descendant classes, these
+ methhods should allways include a call to post_init() of the
+ parent class.
+
+ """
+
+ self.perform_arg_parser()
+ self.init_logging()
+
+ self.initialized = True
+
+ # -------------------------------------------------------------------------
+ def pre_run(self):
+ """
+ Dummy function to run before the main routine.
+ Could be overwritten by descendant classes.
+
+ """
+
+ if self.simulate:
+ LOG.warn("Simulation mode - nothing is really done.")
+
+ # -------------------------------------------------------------------------
+ def _run(self):
+ """
+ Dummy function as main routine.
+
+ MUST be overwritten by descendant classes.
+
+ """
+
+ raise FunctionNotImplementedError('_run', self.__class__.__name__)
+
+ # -------------------------------------------------------------------------
+ def __call__(self):
+ """
+ Helper method to make the resulting object callable, e.g.::
+
+ app = PBApplication(...)
+ app()
+
+ @return: None
+
+ """
+
+ self.run()
+
+ # -------------------------------------------------------------------------
+ def run(self):
+ """
+ The visible start point of this object.
+
+ @return: None
+
+ """
+
+ if not self.initialized:
+ self.handle_error(
+ "The application is not completely initialized.", '', True)
+ self.exit(9)
+
+ try:
+ self.pre_run()
+ except Exception as e:
+ self.handle_error(str(e), e.__class__.__name__, True)
+ self.exit(98)
+
+ if not self.initialized:
+ raise PpAppError(
+ "Object {!r} seems not to be completely initialized.".format(
+ self.__class__.__name__))
+
+ try:
+ self._run()
+ except Exception as e:
+ self.handle_error(str(e), e.__class__.__name__, True)
+ self.exit_value = 99
+
+ if self.verbose > 1:
+ LOG.info("Ending.")
+
+ try:
+ self.post_run()
+ except Exception as e:
+ self.handle_error(str(e), e.__class__.__name__, True)
+ self.exit_value = 97
+
+ self.exit(self.exit_value)
+
+ # -------------------------------------------------------------------------
+ def post_run(self):
+ """
+ Dummy function to run after the main routine.
+ Could be overwritten by descendant classes.
+
+ """
+
+ if self.verbose > 1:
+ LOG.info("executing post_run() ...")
+
+ # -------------------------------------------------------------------------
+ def _init_arg_parser(self):
+ """
+ Local called method to initiate the argument parser.
+
+ @raise PBApplicationError: on some errors
+
+ """
+
+ self.arg_parser = argparse.ArgumentParser(
+ prog=self.appname,
+ description=self.description,
+ usage=self.usage,
+ epilog=self.argparse_epilog,
+ prefix_chars=self.argparse_prefix_chars,
+ add_help=False,
+ )
+
+ self.init_arg_parser()
+
+ general_group = self.arg_parser.add_argument_group('General options')
+ general_group.add_argument(
+ '--color',
+ action="store",
+ dest='color',
+ const='yes',
+ default='auto',
+ nargs='?',
+ choices=['yes', 'no', 'auto'],
+ help="Use colored output for messages.",
+ )
+
+ verbose_group = general_group.add_mutually_exclusive_group()
+
+ verbose_group.add_argument(
+ "-v", "--verbose",
+ action="count",
+ dest='verbose',
+ help='Increase the verbosity level',
+ )
+
+ verbose_group.add_argument(
+ "-q", "--quiet",
+ action="store_true",
+ dest='quiet',
+ help='Silent execution, only warnings and errors are emitted.',
+ )
+
+ general_group.add_argument(
+ "-h", "--help",
+ action='help',
+ dest='help',
+ help='Show this help message and exit'
+ )
+ general_group.add_argument(
+ "--usage",
+ action='store_true',
+ dest='usage',
+ help="Display brief usage message and exit"
+ )
+ general_group.add_argument(
+ "-V", '--version',
+ action='version',
+ version='Version of %(prog)s: {}'.format(self.version),
+ help="Show program's version number and exit"
+ )
+
+ # -------------------------------------------------------------------------
+ def init_arg_parser(self):
+ """
+ Public available method to initiate the argument parser.
+
+ Note::
+ avoid adding the general options '--verbose', '--help', '--usage'
+ and '--version'. These options are allways added after executing
+ this method.
+
+ Descendant classes may override this method.
+
+ """
+
+ pass
+
+ # -------------------------------------------------------------------------
+ def _perform_arg_parser(self):
+ """
+ Underlaying method for parsing arguments.
+ """
+
+ self.args = self.arg_parser.parse_args()
+
+ if self.args.usage:
+ self.arg_parser.print_usage(sys.stdout)
+ self.exit(0)
+
+ if self.args.verbose is not None and self.args.verbose > self.verbose:
+ self.verbose = self.args.verbose
+
+ if self.args.quiet:
+ self.quiet = self.args.quiet
+
+ if self.args.color == 'yes':
+ self._terminal_has_colors = True
+ elif self.args.color == 'no':
+ self._terminal_has_colors = False
+ else:
+ self._terminal_has_colors = self.terminal_can_color()
+
+ # -------------------------------------------------------------------------
+ def perform_arg_parser(self):
+ """
+ Public available method to execute some actions after parsing
+ the command line parameters.
+
+ Descendant classes may override this method.
+ """
+
+ pass
+
+ # -------------------------------------------------------------------------
+ def _init_env(self):
+ """
+ Initialization of self.env by application specific environment
+ variables.
+
+ It calls self.init_env(), after it has done his job.
+
+ """
+
+ for (key, value) in list(os.environ.items()):
+
+ if not key.startswith(self.env_prefix):
+ continue
+
+ newkey = key.replace(self.env_prefix, '', 1)
+ self.env[newkey] = value
+
+ self.init_env()
+
+ # -------------------------------------------------------------------------
+ def init_env(self):
+ """
+ Public available method to initiate self.env additional to the implicit
+ initialization done by this module.
+ Maybe it can be used to import environment variables, their
+ names not starting with self.env_prefix.
+
+ Currently a dummy method, which ca be overriden by descendant classes.
+
+ """
+
+ pass
+
+ # -------------------------------------------------------------------------
+ def _perform_env(self):
+ """
+ Method to do some useful things with the found environment.
+
+ It calls self.perform_env(), after it has done his job.
+
+ """
+
+ # try to detect verbosity level from environment
+ if 'VERBOSE' in self.env and self.env['VERBOSE']:
+ v = 0
+ try:
+ v = int(self.env['VERBOSE'])
+ except ValueError:
+ v = 1
+ if v > self.verbose:
+ self.verbose = v
+
+ self.perform_env()
+
+ # -------------------------------------------------------------------------
+ def perform_env(self):
+ """
+ Public available method to perform found environment variables after
+ initialization of self.env.
+
+ Currently a dummy method, which ca be overriden by descendant classes.
+
+ """
+
+ pass
+
+ # -------------------------------------------------------------------------
+ def colored(self, msg, color):
+ """
+ Wrapper function to colorize the message. Depending, whether the current
+ terminal can display ANSI colors, the message is colorized or not.
+
+ @param msg: The message to colorize
+ @type msg: str
+ @param color: The color to use, must be one of the keys of COLOR_CODE
+ @type color: str
+
+ @return: the colorized message
+ @rtype: str
+
+ """
+
+ if not self.terminal_has_colors:
+ return msg
+ return colorstr(msg, color)
+
+ # -------------------------------------------------------------------------
+ def get_command(self, cmd, quiet=False):
+ """
+ Searches the OS search path for the given command and gives back the
+ normalized position of this command.
+ If the command is given as an absolute path, it check the existence
+ of this command.
+
+ @param cmd: the command to search
+ @type cmd: str
+ @param quiet: No warning message, if the command could not be found,
+ only a debug message
+ @type quiet: bool
+
+ @return: normalized complete path of this command, or None,
+ if not found
+ @rtype: str or None
+
+ """
+
+ if self.verbose > 2:
+ LOG.debug("Searching for command {!r} ...".format(cmd))
+
+ # Checking an absolute path
+ if os.path.isabs(cmd):
+ if not os.path.exists(cmd):
+ LOG.warning("Command {!r} doesn't exists.".format(cmd))
+ return None
+ if not os.access(cmd, os.X_OK):
+ msg = "Command {!r} is not executable.".format(cmd)
+ LOG.warning(msg)
+ return None
+ return os.path.normpath(cmd)
+
+ # Checking a relative path
+ for d in caller_search_path():
+ if self.verbose > 3:
+ LOG.debug("Searching command in {!r} ...".format(d))
+ p = os.path.join(d, cmd)
+ if os.path.exists(p):
+ if self.verbose > 2:
+ LOG.debug("Found {!r} ...".format(p))
+ if os.access(p, os.X_OK):
+ return os.path.normpath(p)
+ else:
+ LOG.debug("Command {!r} is not executable.".format(p))
+
+ # command not found, sorry
+ if quiet:
+ if self.verbose > 2:
+ LOG.debug("Command {!r} not found.".format(cmd))
+ else:
+ LOG.warning("Command {!r} not found.".format(cmd))
+
+ return None
+
+# =============================================================================
+
+if __name__ == "__main__":
+
+ pass
+
+# =============================================================================
+
+# vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4 list
--- /dev/null
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+"""
+@summary: additional logging formatter for colored output via console
+"""
+
+# Standard modules
+import logging
+import copy
+
+# Third party modules
+
+# Own modules
+
+__version__ = '0.1.1'
+
+# =============================================================================
+# Color coding module variables and helper functions
+
+COLOR_CODE = {
+ 'ENDC': 0, # RESET COLOR
+ 'BOLD': 1,
+ 'UNDERLINE': 4,
+ 'BLINK': 5,
+ 'INVERT': 7,
+ 'CONCEALD': 8,
+ 'STRIKE': 9,
+ 'GREY30': 90,
+ 'GREY40': 2,
+ 'GREY65': 37,
+ 'GREY70': 97,
+ 'GREY20_BG': 40,
+ 'GREY33_BG': 100,
+ 'GREY80_BG': 47,
+ 'GREY93_BG': 107,
+ 'DARK_RED': 31,
+ 'RED': 91,
+ 'RED_BG': 41,
+ 'LIGHT_RED_BG': 101,
+ 'DARK_YELLOW': 33,
+ 'YELLOW': 93,
+ 'YELLOW_BG': 43,
+ 'LIGHT_YELLOW_BG': 103,
+ 'DARK_BLUE': 34,
+ 'BLUE': 94,
+ 'BLUE_BG': 44,
+ 'LIGHT_BLUE_BG': 104,
+ 'DARK_MAGENTA': 35,
+ 'PURPLE': 95,
+ 'MAGENTA_BG': 45,
+ 'LIGHT_PURPLE_BG': 105,
+ 'DARK_CYAN': 36,
+ 'AUQA': 96,
+ 'AQUA': 96,
+ 'CYAN_BG': 46,
+ 'LIGHT_AUQA_BG': 106,
+ 'LIGHT_AQUA_BG': 106,
+ 'DARK_GREEN': 32,
+ 'GREEN': 92,
+ 'GREEN_BG': 42,
+ 'LIGHT_GREEN_BG': 102,
+ 'BLACK': 30,
+}
+
+
+# -----------------------------------------------------------------------------
+def termcode(num):
+ """
+ Output of an ANSII terminal code.
+ """
+
+ return('\033[%sm' % (num))
+
+
+# -----------------------------------------------------------------------------
+def colorstr(message, color):
+ """
+ Wrapper function to colorize the message.
+
+ @param message: The message to colorize
+ @type message: str
+ @param color: The color to use, must be one of the keys of COLOR_CODE
+ @type color: str
+
+ @return: the colorized message
+ @rtype: str
+
+ """
+
+ tcode = ''
+ if isinstance(color, (list, tuple)):
+ for clr in color:
+ tcode += termcode(COLOR_CODE[clr])
+ else:
+ tcode = termcode(COLOR_CODE[color])
+
+ return tcode + message + termcode(COLOR_CODE['ENDC'])
+
+
+# ----------------------------------------
+LOG = logging.getLogger(__name__)
+
+
+# =============================================================================
+class ColoredFormatter(logging.Formatter):
+ """
+ A variant of code found at:
+ http://stackoverflow.com/questions/384076/how-can-i-make-the-python-logging-output-to-be-colored
+ """
+
+ LEVEL_COLOR = {
+ 'DEBUG': None,
+ 'INFO': 'GREEN',
+ 'WARNING': 'YELLOW',
+ 'ERROR': ('BOLD', 'RED'),
+ 'CRITICAL': 'RED_BG',
+ }
+
+ # -------------------------------------------------------------------------
+ def __init__(self, fmt=None, datefmt=None):
+ """
+ Initialize the formatter with specified format strings.
+
+ Initialize the formatter either with the specified format string, or a
+ default. Allow for specialized date formatting with the optional
+ datefmt argument (if omitted, you get the ISO8601 format).
+ """
+
+ logging.Formatter.__init__(self, fmt, datefmt)
+
+ # -----------------------------------------------------------
+ @property
+ def color_debug(self):
+ """The color used to output debug messages."""
+ return self.LEVEL_COLOR['DEBUG']
+
+ @color_debug.setter
+ def color_debug(self, value):
+ self.LEVEL_COLOR['DEBUG'] = value
+
+ # -----------------------------------------------------------
+ @property
+ def color_info(self):
+ """The color used to output info messages."""
+ return self.LEVEL_COLOR['INFO']
+
+ @color_info.setter
+ def color_info(self, value):
+ self.LEVEL_COLOR['INFO'] = value
+
+ # -----------------------------------------------------------
+ @property
+ def color_warning(self):
+ """The color used to output warning messages."""
+ return self.LEVEL_COLOR['WARNING']
+
+ @color_warning.setter
+ def color_warning(self, value):
+ self.LEVEL_COLOR['WARNING'] = value
+
+ # -----------------------------------------------------------
+ @property
+ def color_error(self):
+ """The color used to output error messages."""
+ return self.LEVEL_COLOR['ERROR']
+
+ @color_error.setter
+ def color_error(self, value):
+ self.LEVEL_COLOR['ERROR'] = value
+
+ # -----------------------------------------------------------
+ @property
+ def color_critical(self):
+ """The color used to output critical messages."""
+ return self.LEVEL_COLOR['CRITICAL']
+
+ @color_critical.setter
+ def color_critical(self, value):
+ self.LEVEL_COLOR['CRITICAL'] = value
+
+ # -------------------------------------------------------------------------
+ def format(self, record):
+ """
+ Format the specified record as text.
+ """
+
+ record = copy.copy(record)
+ levelname = record.levelname
+
+ if levelname in self.LEVEL_COLOR:
+
+ record.name = colorstr(record.name, 'BOLD')
+ record.filename = colorstr(record.filename, 'BOLD')
+ record.module = colorstr(record.module, 'BOLD')
+ record.funcName = colorstr(record.funcName, 'BOLD')
+ record.pathname = colorstr(record.pathname, 'BOLD')
+ record.processName = colorstr(record.processName, 'BOLD')
+ record.threadName = colorstr(record.threadName, 'BOLD')
+
+ if self.LEVEL_COLOR[levelname] is not None:
+ record.levelname = colorstr(
+ levelname, self.LEVEL_COLOR[levelname])
+ record.msg = colorstr(record.msg, self.LEVEL_COLOR[levelname])
+
+ return logging.Formatter.format(self, record)
+
+
+# =============================================================================
+
+if __name__ == "__main__":
+ pass
+
+# =============================================================================
+
+# vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4
--- /dev/null
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+"""
+@author: Frank Brehm
+@summary: module for some common used error classes
+"""
+
+# Standard modules
+import errno
+
+
+__version__ = '0.1.1'
+
+# =============================================================================
+class PpError(Exception):
+ """
+ Base error class for all other self defined exceptions.
+ """
+
+ pass
+
+
+# =============================================================================
+class PpAppError(PpError):
+
+ pass
+
+
+# =============================================================================
+class FunctionNotImplementedError(PpError, NotImplementedError):
+ """
+ Error class for not implemented functions.
+ """
+
+ # -------------------------------------------------------------------------
+ def __init__(self, function_name, class_name):
+ """
+ Constructor.
+
+ @param function_name: the name of the not implemented function
+ @type function_name: str
+ @param class_name: the name of the class of the function
+ @type class_name: str
+
+ """
+
+ self.function_name = function_name
+ if not function_name:
+ self.function_name = '__unkown_function__'
+
+ self.class_name = class_name
+ if not class_name:
+ self.class_name = '__unkown_class__'
+
+ # -------------------------------------------------------------------------
+ def __str__(self):
+ """
+ Typecasting into a string for error output.
+ """
+
+ msg = "Function {func}() has to be overridden in class {cls!r}."
+ return msg.format(func=self.function_name, cls=self.class_name)
+
+
+# =============================================================================
+class IoTimeoutError(PpError, IOError):
+ """
+ Special error class indicating a timout error on a read/write operation
+ """
+
+ # -------------------------------------------------------------------------
+ def __init__(self, strerror, timeout, filename=None):
+ """
+ Constructor.
+
+ @param strerror: the error message about the operation
+ @type strerror: str
+ @param timeout: the timout in seconds leading to the error
+ @type timeout: float
+ @param filename: the filename leading to the error
+ @type filename: str
+
+ """
+
+ t_o = None
+ try:
+ t_o = float(timeout)
+ except ValueError:
+ pass
+ self.timeout = t_o
+
+ if t_o is not None:
+ strerror += " (timeout after {:0.1f} secs)".format(t_o)
+
+ if filename is None:
+ super(IoTimeoutError, self).__init__(errno.ETIMEDOUT, strerror)
+ else:
+ super(IoTimeoutError, self).__init__(
+ errno.ETIMEDOUT, strerror, filename)
+
+
+# =============================================================================
+class ReadTimeoutError(IoTimeoutError):
+ """
+ Special error class indicating a timout error on reading of a file.
+ """
+
+ # -------------------------------------------------------------------------
+ def __init__(self, timeout, filename):
+ """
+ Constructor.
+
+ @param timeout: the timout in seconds leading to the error
+ @type timeout: float
+ @param filename: the filename leading to the error
+ @type filename: str
+
+ """
+
+ strerror = "Timeout error on reading"
+ super(ReadTimeoutError, self).__init__(strerror, timeout, filename)
+
+
+# =============================================================================
+class WriteTimeoutError(IoTimeoutError):
+ """
+ Special error class indicating a timout error on a writing into a file.
+ """
+
+ # -------------------------------------------------------------------------
+ def __init__(self, timeout, filename):
+ """
+ Constructor.
+
+ @param timeout: the timout in seconds leading to the error
+ @type timeout: float
+ @param filename: the filename leading to the error
+ @type filename: str
+
+ """
+
+ strerror = "Timeout error on writing"
+ super(WriteTimeoutError, self).__init__(strerror, timeout, filename)
+
+
+# =============================================================================
+
+if __name__ == "__main__":
+ pass
+
+# =============================================================================
+
+# vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4
--- /dev/null
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+"""
+@author: Frank Brehm
+@contact: frank.brehm@pixelpark.com
+@copyright: © 2018 by Frank Brehm, Publicies Pixelpark GmbH, Berlin
+"""
+from __future__ import absolute_import
+
+# Standard modules
+import sys
+import os
+import logging
+import datetime
+import traceback
+
+# Third party modules
+
+# Own modules
+from .common import pp, to_bytes
+
+from .errors import PpError
+
+__version__ = '0.2.4'
+
+LOG = logging.getLogger(__name__)
+
+
+# =============================================================================
+class PpBaseObjectError(PpError):
+ """
+ Base error class useable by all descendand objects.
+ """
+
+ pass
+
+
+# =============================================================================
+class PpBaseObject(object):
+ """
+ Base class for all objects.
+ """
+
+ # -------------------------------------------------------------------------
+ def __init__(
+ self, appname=None, verbose=0, version=__version__, base_dir=None,
+ initialized=False):
+ """
+ Initialisation of the base object.
+
+ Raises an exception on a uncoverable error.
+ """
+
+ self._appname = None
+ """
+ @ivar: name of the current running application
+ @type: str
+ """
+ if appname:
+ v = str(appname).strip()
+ if v:
+ self._appname = v
+ if not self._appname:
+ self._appname = os.path.basename(sys.argv[0])
+
+ self._version = version
+ """
+ @ivar: version string of the current object or application
+ @type: str
+ """
+
+ self._verbose = int(verbose)
+ """
+ @ivar: verbosity level (0 - 9)
+ @type: int
+ """
+ if self._verbose < 0:
+ msg = "Wrong verbose level {!r}, must be >= 0".format(verbose)
+ raise ValueError(msg)
+
+ self._initialized = False
+ """
+ @ivar: initialisation of this object is complete
+ after __init__() of this object
+ @type: bool
+ """
+
+ self._base_dir = base_dir
+ """
+ @ivar: base directory used for different purposes, must be an existent
+ directory. Defaults to directory of current script daemon.py.
+ @type: str
+ """
+ if base_dir:
+ if not os.path.exists(base_dir):
+ msg = "Base directory {!r} does not exists.".format(base_dir)
+ self.handle_error(msg)
+ self._base_dir = None
+ elif not os.path.isdir(base_dir):
+ msg = "Base directory {!r} is not a directory.".format(base_dir)
+ self.handle_error(msg)
+ self._base_dir = None
+ if not self._base_dir:
+ self._base_dir = os.path.dirname(os.path.abspath(sys.argv[0]))
+
+ self._initialized = bool(initialized)
+
+ # -----------------------------------------------------------
+ @property
+ def appname(self):
+ """The name of the current running application."""
+ if hasattr(self, '_appname'):
+ return self._appname
+ return os.path.basename(sys.argv[0])
+
+ @appname.setter
+ def appname(self, value):
+ if value:
+ v = str(value).strip()
+ if v:
+ self._appname = v
+
+ # -----------------------------------------------------------
+ @property
+ def version(self):
+ """The version string of the current object or application."""
+ return getattr(self, '_version', __version__)
+
+ # -----------------------------------------------------------
+ @property
+ def verbose(self):
+ """The verbosity level."""
+ return getattr(self, '_verbose', 0)
+
+ @verbose.setter
+ def verbose(self, value):
+ v = int(value)
+ if v >= 0:
+ self._verbose = v
+ else:
+ LOG.warn("Wrong verbose level {!r}, must be >= 0".format(value))
+
+ # -----------------------------------------------------------
+ @property
+ def initialized(self):
+ """The initialisation of this object is complete."""
+ return getattr(self, '_initialized', False)
+
+ @initialized.setter
+ def initialized(self, value):
+ self._initialized = bool(value)
+
+ # -----------------------------------------------------------
+ @property
+ def base_dir(self):
+ """The base directory used for different purposes."""
+ return self._base_dir
+
+ @base_dir.setter
+ def base_dir(self, value):
+ if value.startswith('~'):
+ value = os.path.expanduser(value)
+ if not os.path.exists(value):
+ msg = "Base directory {!r} does not exists.".format(value)
+ LOG.error(msg)
+ elif not os.path.isdir(value):
+ msg = "Base directory {!r} is not a directory.".format(value)
+ LOG.error(msg)
+ else:
+ self._base_dir = value
+
+ # -------------------------------------------------------------------------
+ def __str__(self):
+ """
+ Typecasting function for translating object structure
+ into a string
+
+ @return: structure as string
+ @rtype: str
+ """
+
+ return pp(self.as_dict(short=True))
+
+ # -------------------------------------------------------------------------
+ def __repr__(self):
+ """Typecasting into a string for reproduction."""
+
+ out = "<%s(" % (self.__class__.__name__)
+
+ fields = []
+ fields.append("appname={!r}".format(self.appname))
+ fields.append("verbose={!r}".format(self.verbose))
+ fields.append("version={!r}".format(self.version))
+ fields.append("base_dir={!r}".format(self.base_dir))
+ fields.append("initialized={!r}".format(self.initialized))
+
+ out += ", ".join(fields) + ")>"
+ return out
+
+ # -------------------------------------------------------------------------
+ def as_dict(self, short=True):
+ """
+ Transforms the elements of the object into a dict
+
+ @param short: don't include local properties in resulting dict.
+ @type short: bool
+
+ @return: structure as dict
+ @rtype: dict
+ """
+
+ res = self.__dict__
+ res = {}
+ for key in self.__dict__:
+ if short and key.startswith('_') and not key.startswith('__'):
+ continue
+ val = self.__dict__[key]
+ if isinstance(val, PpBaseObject):
+ res[key] = val.as_dict(short=short)
+ else:
+ res[key] = val
+ res['__class_name__'] = self.__class__.__name__
+ res['appname'] = self.appname
+ res['version'] = self.version
+ res['verbose'] = self.verbose
+ res['initialized'] = self.initialized
+ res['base_dir'] = self.base_dir
+
+ return res
+
+ # -------------------------------------------------------------------------
+ def handle_error(
+ self, error_message=None, exception_name=None, do_traceback=False):
+ """
+ Handle an error gracefully.
+
+ Print a traceback and continue.
+
+ @param error_message: the error message to display
+ @type error_message: str
+ @param exception_name: name of the exception class
+ @type exception_name: str
+ @param do_traceback: allways show a traceback
+ @type do_traceback: bool
+
+ """
+
+ msg = 'Exception happened: '
+ if exception_name is not None:
+ exception_name = exception_name.strip()
+ if exception_name:
+ msg = exception_name + ': '
+ else:
+ msg = ''
+ if error_message:
+ msg += str(error_message)
+ else:
+ msg += 'undefined error.'
+
+ root_log = logging.getLogger()
+ has_handlers = False
+ if root_log.handlers:
+ has_handlers = True
+
+ if has_handlers:
+ LOG.error(msg)
+ if do_traceback:
+ LOG.error(traceback.format_exc())
+ else:
+ curdate = datetime.datetime.now()
+ curdate_str = "[" + curdate.isoformat(' ') + "]: "
+ msg = curdate_str + msg + "\n"
+ if hasattr(sys.stderr, 'buffer'):
+ sys.stderr.buffer.write(to_bytes(msg))
+ else:
+ sys.stderr.write(msg)
+ if do_traceback:
+ traceback.print_exc()
+
+ return
+
+ # -------------------------------------------------------------------------
+ def handle_info(self, message, info_name=None):
+ """
+ Shows an information. This happens both to STDERR and to all
+ initialized log handlers.
+
+ @param message: the info message to display
+ @type message: str
+ @param info_name: Title of information
+ @type info_name: str
+
+ """
+
+ msg = ''
+ if info_name is not None:
+ info_name = info_name.strip()
+ if info_name:
+ msg = info_name + ': '
+ msg += str(message).strip()
+
+ root_log = logging.getLogger()
+ has_handlers = False
+ if root_log.handlers:
+ has_handlers = True
+
+ if has_handlers:
+ LOG.info(msg)
+ else:
+ curdate = datetime.datetime.now()
+ curdate_str = "[" + curdate.isoformat(' ') + "]: "
+ msg = curdate_str + msg + "\n"
+ if hasattr(sys.stderr, 'buffer'):
+ sys.stderr.buffer.write(to_bytes(msg))
+ else:
+ sys.stderr.write(msg)
+
+ return
+
+# =============================================================================
+
+if __name__ == "__main__":
+
+ pass
+
+# =============================================================================
+
+# vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4