import argparse
import traceback
import io
+import cgi
+import textwrap
from email.message import EmailMessage
special_chars_re = re.compile(r'[^a-z0-9_\-]', re.IGNORECASE)
dev_re = re.compile(r'^dev')
+ valid_output_types = {
+ 'txt': 'text/plain',
+ 'html': 'text/html',
+ 'json': 'application/json',
+ }
+ default_output_type = 'txt'
+ default_mime_type = valid_output_types[default_output_type]
# -------------------------------------------------------------------------
def __init__(self, appname=None, base_dir=None, verbose=0, version=__version__):
self.data_dir = self.default_data_dir
self._read_stdin = True
self.tz_name = self.default_tz_name
+ self._html_title = None
+
+ if not hasattr(self, '_output_type'):
+ self._output_type = self.default_output_type
+ self._mime_type = self.default_mime_type
super(BaseHookApp, self).__init__(
appname=appname, verbose=verbose, version=version,
base_dir=base_dir, initialized=False,
)
+ self._mime_type = self.valid_output_types[self.output_type]
self._start_verbose = self.verbose
self._simulate = False
self.git_ssh_url = None
self.do_sudo = True
self.tz = None
+ self.query = {}
self.cmdline_args = None
self.do_arg_parser()
self.read_config()
+
+ if self.cmdline_args.data_dir:
+ self._log_directory = self.data_dir
+
self.init_logging()
if 'REQUEST_METHOD' in os.environ:
LOG.debug("We are in a CGI: REQUEST_METHOD {!r}".format(os.environ['REQUEST_METHOD']))
def read_stdin(self, value):
self._read_stdin = to_bool(value)
+ # -----------------------------------------------------------
+ @property
+ def html_title(self):
+ """Title of the generated HTML page, if output_type == 'html'."""
+ if self._html_title:
+ return self._html_title
+ return self.appname
+
# -----------------------------------------------------------
@property
def log_directory(self):
return None
return os.path.join(self.puppet_envs_dir, cur_env)
+ # -----------------------------------------------------------
+ @property
+ def output_type(self):
+ """Typ der Ausgabe - TXT, HTML oder JSON."""
+ return getattr(self, '_output_type', self.default_output_type)
+
+ @output_type.setter
+ def output_type(self, value):
+ if not value:
+ raise ValueError("Invalid output type {!r}.".format(value))
+ val = str(value).strip().lower()
+ if val not in self.valid_output_types:
+ raise ValueError("Invalid output type {!r}.".format(value))
+
+ self._output_type = val
+ self._mime_type = self.valid_output_types[val]
+
+ # -----------------------------------------------------------
+ @property
+ def mime_type(self):
+ """MIME-Typ der Response."""
+ return self._mime_type
+
# -------------------------------------------------------------------------
def as_dict(self, short=True):
"""
res['default_data_dir'] = self.default_data_dir
res['read_stdin'] = self.read_stdin
res['default_tz_name'] = self.default_tz_name
+ res['valid_output_types'] = self.valid_output_types
+ res['default_output_type'] = self.default_output_type
+ res['default_mime_type'] = self.default_mime_type
+ res['output_type'] = self.output_type
+ res['mime_type'] = self.mime_type
+ res['html_title'] = self.html_title
return res
help="Show program's version number and exit"
)
+ arg_parser.add_argument(
+ 'query', nargs='?',
+ help="An optional query string like on HTTP GET requests."
+ )
+
self.cmdline_args = arg_parser.parse_args()
if self.cmdline_args.usage:
if not os.environ.get('REQUEST_METHOD', None):
os.environ['REQUEST_METHOD'] = 'GET'
+ if self.cmdline_args.query:
+ if not os.environ.get('QUERY_STRING', None):
+ os.environ['QUERY_STRING'] = self.cmdline_args.query
+
if self.cmdline_args.simulate:
sys.stderr.write("\nSimulation mode - nothing is really done.\n\n")
self.simulate = True
+ self._get_query()
+ if 'output_type' in self.query:
+ try:
+ self.output_type = self.query['output_type']
+ except ValueError as e:
+ LOG.error(str(e))
+
+ # -------------------------------------------------------------------------
+ def _get_query(self):
+
+ self.query = {}
+ form = cgi.FieldStorage()
+ for key in form:
+ val = form.getvalue(key)
+ self.query[key] = val
+
# -------------------------------------------------------------------------
def search_curl_bin(self):
def __call__(self):
"""Helper method to make the resulting object callable."""
- self.print_out("Content-Type: text/plain;charset=utf-8\n")
- self.print_out("Python CGI läuft.\n")
+ self.print_out("Content-Type: {};charset=utf-8\n".format(self.mime_type))
+ if self.output_type == 'txt':
+ self.print_out("Python CGI läuft.\n")
+ elif self.output_type == 'html':
+ self.print_htlm_start()
if self.verbose > 1:
LOG.debug("Base directory: {!r}".format(self.base_dir))
self.send_error_msgs(self.full_name)
else:
self.send_error_msgs()
+ if self.output_type == 'html':
+ self.print_htlm_end()
LOG.info("Finished.")
sys.exit(0)
+ # -------------------------------------------------------------------------
+ def print_htlm_start(self):
+
+ html = textwrap.dedent('''\
+ <!doctype html>
+ <html lang="de">
+ <head>
+ <meta charset="utf-8">
+ <title>{t}</title>
+ </head>
+ <body>
+ ''').strip().format(t=self.html_title)
+ self.print_out(html)
+
+ # ------------------------------------------------------------------------
+ def print_htlm_end(self):
+
+ self.print_out('</body>\n</html>\n')
+
# -------------------------------------------------------------------------
def pre_run(self):