--- /dev/null
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+"""
+@author: Frank Brehm
+@contact: frank.brehm@pixelpark.com
+@copyright: © 2017 by Frank Brehm, Berlin
+@summary: The module for the r10k_hook application object.
+"""
+
+# Standard modules
+import sys
+import os
+import logging
+import re
+import textwrap
+import datetime
+
+# Third party modules
+import yaml
+
+# Own modules
+import webhooks
+
+from webhooks.common import pp, to_bytes, to_str, to_bool
+
+__version__ = webhooks.__version__
+LOG = logging.getLogger(__name__)
+DEFAULT_EMAIL = 'frank.brehm@pixelpark.com'
+DEFAULT_SENDER = 'Puppetmaster <{}>'.format(DEFAULT_EMAIL)
+
+
+# =============================================================================
+class R10kHookApp(object):
+ """
+ Class for the application objects.
+ """
+
+ cgi_bin_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
+ base_dir = os.path.dirname(cgi_bin_dir)
+
+ special_chars_re = re.compile(r'[^a-z0-9_\-]', re.IGNORECASE)
+ dev_re = re.compile(r'^dev')
+
+ # -------------------------------------------------------------------------
+ def __init__(self, appname=None, version=__version__):
+ """Constructor."""
+
+ self._appname = None
+ """
+ @ivar: name of the current running application
+ @type: str
+ """
+ if appname:
+ v = str(appname).strip()
+ if v:
+ self._appname = v
+ if not self._appname:
+ self._appname = os.path.basename(sys.argv[0])
+
+ self._version = version
+ """
+ @ivar: version string of the current object or application
+ @type: str
+ """
+
+ self._verbose = 1
+ """
+ @ivar: verbosity level (0 - 9)
+ @type: int
+ """
+
+ self.data = None
+ self.json_data = None
+ self.ref = None
+ self.namespace = None
+ self.name = None
+ self.full_name = None
+ self.git_ssh_url = None
+ self.do_sudo = True
+
+ self.ignore_projects = []
+
+ self.error_data = []
+ self.smtp_server = 'smtp.pixelpark.com'
+ self.smtp_port = 25
+
+ self.default_parent_dir = '/www/data'
+ self.default_email = DEFAULT_EMAIL
+ self.mail_to_addresses = []
+ self.mail_cc_addresses = [
+ 'webmaster@pixelpark.com',
+ self.default_email,
+ ]
+ self.sender_address = DEFAULT_SENDER
+
+ self._log_directory = os.sep + os.path.join('var', 'log', 'webhooks')
+
+ self.read_config()
+ self.init_logging()
+
+ # -----------------------------------------------------------
+ @property
+ def appname(self):
+ """The name of the current running application."""
+ return self._appname
+
+ @appname.setter
+ def appname(self, value):
+ if value:
+ v = str(value).strip()
+ if v:
+ self._appname = v
+
+ # -----------------------------------------------------------
+ @property
+ def version(self):
+ """The version string of the current object or application."""
+ return self._version
+
+ # -----------------------------------------------------------
+ @property
+ def verbose(self):
+ """The verbosity level."""
+ return getattr(self, '_verbose', 0)
+
+ @verbose.setter
+ def verbose(self, value):
+ v = int(value)
+ if v >= 0:
+ self._verbose = v
+ else:
+ LOG.warn("Wrong verbose level %r, must be >= 0", value)
+
+ # -----------------------------------------------------------
+ @property
+ def log_directory(self):
+ """The directory containing the logfiles of this application."""
+ return self._log_directory
+
+ # -----------------------------------------------------------
+ @property
+ def logfile(self):
+ """The logfile of this application."""
+ return os.path.join(self.log_directory, self.appname + '.log')
+
+ # -------------------------------------------------------------------------
+ def __str__(self):
+ """
+ Typecasting function for translating object structure
+ into a string
+
+ @return: structure as string
+ @rtype: str
+ """
+
+ return pp(self.as_dict())
+
+ # -------------------------------------------------------------------------
+ def as_dict(self):
+ """
+ Transforms the elements of the object into a dict
+
+ @return: structure as dict
+ @rtype: dict
+ """
+
+ res = self.__dict__
+ res = {}
+ for key in self.__dict__:
+ if key.startswith('_') and not key.startswith('__'):
+ continue
+ res[key] = self.__dict__[key]
+ res['__class_name__'] = self.__class__.__name__
+ res['appname'] = self.appname
+ res['verbose'] = self.verbose
+ res['base_dir'] = self.base_dir
+ res['cgi_bin_dir'] = self.cgi_bin_dir
+ res['log_directory'] = self.log_directory
+ res['logfile'] = self.logfile
+
+ return res
+
+ # -------------------------------------------------------------------------
+ def read_config(self):
+ """Reading configuration from different YAML files."""
+
+ yaml_files = []
+ # ./deploy.yaml
+ yaml_files.append(os.path.join(self.base_dir, self.appname + '.yaml'))
+ # /etc/pixelpark/deploy.yaml
+ yaml_files.append(os.sep + os.path.join('etc', 'pixelpark', self.appname + '.yaml'))
+
+ for yaml_file in yaml_files:
+ self.read_from_yaml(yaml_file)
+
+ # -------------------------------------------------------------------------
+ def read_from_yaml(self, yaml_file):
+ """Reading configuration from given YAML file."""
+
+ # LOG.debug("Trying to read config from {!r} ...".format(yaml_file))
+ if not os.access(yaml_file, os.F_OK):
+ # LOG.debug("File {!r} does not exists.".format(yaml_file))
+ return
+ # LOG.debug("Reading config from {!r} ...".format(yaml_file))
+ config = {}
+ with open(yaml_file, 'rb') as fh:
+ config = yaml.load(fh.read())
+ # LOG.debug("Read config:\n{}".format(pp(config)))
+
+ if 'verbose' in config:
+ self.verbose = config['verbose']
+
+ if 'do_sudo' in config:
+ self.do_sudo = to_bool(config['do_sudo'])
+
+ if 'log_dir' in config and config['log_dir']:
+ self._log_directory = config['log_dir']
+
+ if 'default_email' in config and config['default_email']:
+ self.default_email = config['default_email']
+
+ if 'smtp_server' in config and config['smtp_server'].strip():
+ self.smtp_server = config['smtp_server'].strip()
+
+ if 'smtp_port' in config and config['smtp_port']:
+ msg = "Invalid port {p!r} for SMTP in {f!r} found.".format(
+ p=config['smtp_port'], f=yaml_file)
+ try:
+ port = int(config['smtp_port'])
+ if port > 0 and port < 2**16:
+ self.smtp_port = port
+ else:
+ self.error_data.append(msg)
+ except ValueError:
+ self.error_data.append(msg)
+
+ if 'default_parent_dir' in config and config['default_parent_dir']:
+ pdir = config['default_parent_dir']
+ if os.path.isabs(pdir):
+ self.default_parent_dir = pdir
+
+ if 'mail_cc_addresses' in config:
+ if config['mail_cc_addresses'] is None:
+ self.mail_cc_addresses = []
+ elif isinstance(config['mail_cc_addresses'], str):
+ if config['mail_cc_addresses']:
+ self.mail_cc_addresses = [config['mail_cc_addresses']]
+ else:
+ self.mail_cc_addresses = []
+ elif isinstance(config['mail_cc_addresses'], list):
+ self.mail_cc_addresses = config['mail_cc_addresses']
+
+ # -------------------------------------------------------------------------
+ def init_logging(self):
+ """
+ Initialize the logger object.
+ It creates a colored loghandler with all output to STDERR.
+ Maybe overridden in descendant classes.
+
+ @return: None
+ """
+
+ root_log = logging.getLogger()
+ root_log.setLevel(logging.INFO)
+ if self.verbose:
+ root_log.setLevel(logging.DEBUG)
+
+ # create formatter
+ format_str = ''
+ if 'REQUEST_METHOD' in os.environ or self.verbose > 1:
+ format_str = '[%(asctime)s]: '
+ format_str += self.appname + ': '
+ if self.verbose:
+ if self.verbose > 1:
+ format_str += '%(name)s(%(lineno)d) %(funcName)s() '
+ else:
+ format_str += '%(name)s '
+ format_str += '%(levelname)s - %(message)s'
+ formatter = logging.Formatter(format_str)
+
+ if 'REQUEST_METHOD' in os.environ:
+
+ #sys.stderr.write("Trying to open logfile {!r} ...\n".format(self.logfile))
+ # we are in a CGI environment
+ if os.path.isdir(self.log_directory) and os.access(self.log_directory, os.W_OK):
+ lh_file = logging.FileHandler(
+ self.logfile, mode='a', encoding='utf-8', delay=True)
+ if self.verbose:
+ lh_file.setLevel(logging.DEBUG)
+ else:
+ lh_file.setLevel(logging.INFO)
+ lh_file.setFormatter(formatter)
+ root_log.addHandler(lh_file)
+
+ else:
+ # create log handler for console output
+ lh_console = logging.StreamHandler(sys.stderr)
+ if self.verbose:
+ lh_console.setLevel(logging.DEBUG)
+ else:
+ lh_console.setLevel(logging.INFO)
+ lh_console.setFormatter(formatter)
+
+ root_log.addHandler(lh_console)
+
+ return
+
+ # -------------------------------------------------------------------------
+ def print_out(self, *objects, sep=' ', end='\n', file=sys.stdout.buffer, flush=True):
+
+ file.write(to_bytes(sep.join(map(lambda x: str(x), objects))))
+ if end:
+ file.write(to_bytes(end))
+ if flush:
+ file.flush()
+
+ # -------------------------------------------------------------------------
+ def __call__(self):
+ """Helper method to make the resulting object callable."""
+
+ self.print_out("Content-Type: text/plain;charset=utf-8\n")
+ self.print_out("Python CGI läuft.\n")
+
+ if self.verbose > 1:
+ LOG.debug("Base directory: {!r}".format(self.base_dir))
+
+
+
+# =============================================================================
+
+if __name__ == "__main__":
+
+ pass
+
+# =============================================================================
+# vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4 list