# -*- coding: utf-8 -*-
-@summary: An application module for the mkldappasswd application
+@summary: An application module for the mkldappasswd application.
@author: Frank Brehm
@contact: frank.brehm@pixelpark.com
import logging
import sys
-# Own modules
# from fb_tools.common import to_bool, is_sequence, pp
from fb_tools.app import BaseApplication
+# Own modules
from .. import pp
-from ..xlate import XLATOR
from ..errors import PpAppError
-from ..handler.ldap_password import WrongPwdSchemaError
from ..handler.ldap_password import LdapPasswordHandler
-from ..handler.ldap_password import WrongSaltError, WrongRoundsError
+from ..handler.ldap_password import WrongPwdSchemaError
+from ..handler.ldap_password import WrongRoundsError
+from ..handler.ldap_password import WrongSaltError
+from ..xlate import XLATOR
-__version__ = '0.1.2'
+__version__ = '0.1.3'
LOG = logging.getLogger(__name__)
_ = XLATOR.gettext
# -------------------------------------------------------------------------
def __init__(self, appname=None, base_dir=None):
+ """Initialize this application object."""
self.password = None
self.pwd_handler = LdapPasswordHandler(
appname=appname, base_dir=base_dir, initialized=False)
- desc = _("Encrypting the password with a defined password schema.")
+ desc = _('Encrypting the password with a defined password schema.')
super(MkLdapPasswdApplication, self).__init__(
appname=appname, description=desc, base_dir=base_dir, initialized=False)
# -------------------------------------------------------------------------
def as_dict(self, short=True):
- Transforms the elements of the object into a dict
+ Transform the elements of the object into a dict.
@param short: don't include local properties in resulting dict.
@type short: bool
@return: structure as dict
@rtype: dict
res = super(MkLdapPasswdApplication, self).as_dict(short=short)
if self.password and self.verbose < 5:
# -------------------------------------------------------------------------
def init_arg_parser(self):
- app_group = self.arg_parser.add_argument_group(_("Options for {}").format(
+ """Initialize special command line parameters."""
+ app_group = self.arg_parser.add_argument_group(_('Options for {}').format(
schema_list = []
help_txt = _(
- "The schema (hashing method) to use to hash the new password. "
- "Default: {default!r}.").format(default=def_schema)
+ 'The schema (hashing method) to use to hash the new password. '
+ 'Default: {default!r}.').format(default=def_schema)
- '-m', '--method', metavar=_("TYPE"), dest="method", choices=schema_list, help=help_txt)
+ '-m', '--method', metavar=_('TYPE'), dest='method', choices=schema_list, help=help_txt)
pwd_group = app_group.add_mutually_exclusive_group()
- '--stdin', action="store_true", dest="stdin",
- help=_("Like {}").format('--password-fd=0')
+ '--stdin', action='store_true', dest='stdin',
+ help=_('Like {}').format('--password-fd=0')
- '-S', '--salt', metavar='SALT', dest="salt",
+ '-S', '--salt', metavar='SALT', dest='salt',
- "A possible salt to use on hashing the password. Caution: "
- "not all hashing schemes are supporting a salt.")
+ 'A possible salt to use on hashing the password. Caution: '
+ 'not all hashing schemes are supporting a salt.')
- '-R', '--rounds', metavar=_('NUMBER'), dest="rounds", type=int,
+ '-R', '--rounds', metavar=_('NUMBER'), dest='rounds', type=int,
- "The number of calculation rounds to use on hashing the password. Caution: "
- "not all hashing schemes are supporting calculation rounds.")
+ 'The number of calculation rounds to use on hashing the password. Caution: '
+ 'not all hashing schemes are supporting calculation rounds.')
pwd_help = _(
- "The password to hash. If not given and no file desriptor was given, "
- "then the password will be requested on TTY.")
+ 'The password to hash. If not given and no file desriptor was given, '
+ 'then the password will be requested on TTY.')
'password', metavar=_('PASSWORD'), nargs='?', help=pwd_help)
# -------------------------------------------------------------------------
def post_init(self):
- """
- Method to execute before calling run().
- """
+ """Execute this as the part of the initialization of this object."""
super(MkLdapPasswdApplication, self).post_init()
if self.verbose > 1:
- msg = "Given args:\n" + pp(self.args.__dict__)
+ msg = 'Given args:\n' + pp(self.args.__dict__)
self.pwd_handler.verbose = self.verbose
# -------------------------------------------------------------------------
def pre_run(self):
+ """Execute this before calling run()."""
if not self.password:
if self.args.stdin:
if pwd:
self.password = pwd
- msg = _("Got no password by {}.").format('STDIN')
+ msg = _('Got no password by {}.').format('STDIN')
self.exit(1, msg)
- first_prompt = _("Password:") + ' '
+ first_prompt = _('Password:') + ' '
second_prompt = _('Repeat password:') + ' '
self.password = self.get_password(
first_prompt, second_prompt, may_empty=False, repeat=False)
# -------------------------------------------------------------------------
def show_password(self):
+ """Display the hashed password."""
msg = _("Encrypting password with hashing schema '{schema}' ...").format(
schema=self.colored(self.pwd_handler.schema_id, 'CYAN'))
salt = getattr(self.args, 'salt', None)
rounds = getattr(self.args, 'rounds', None)
- LOG.debug(_("Used schema: {!r}.").format(self.pwd_handler.schema))
+ LOG.debug(_('Used schema: {!r}.').format(self.pwd_handler.schema))
hashed_passwd = self.pwd_handler.get_hash(
self.password, self.pwd_handler.schema, salt=salt, rounds=rounds)
# =============================================================================
-if __name__ == "__main__":
+if __name__ == '__main__':
#!/usr/bin/env python
# -*- coding: utf-8 -*-
+@summary: The module for a application object related to PowerDNS.
@author: Frank Brehm
@contact: frank.brehm@pixelpark.com
@copyright: © 2023 by Frank Brehm, Berlin
-@summary: The module for a application object related to PowerDNS.
from __future__ import absolute_import
# Standard modules
+# import copy
+import ipaddress
import logging
import logging.config
-import re
-# import copy
import os
-import ipaddress
+import re
import socket
# Third party modules
-import psutil
-from requests.exceptions import ConnectionError, ReadTimeout, ConnectTimeout
+from fb_pdnstools.errors import PDNSApiNotFoundError
+from fb_pdnstools.errors import PDNSApiValidationError
+from fb_pdnstools.server import PowerDNSServer
+from fb_pdnstools.zone import PowerDNSZone
-# Own modules
from fb_tools import MAX_TIMEOUT
from fb_tools.xlate import format_list
-from fb_pdnstools.zone import PowerDNSZone
-from fb_pdnstools.server import PowerDNSServer
-from fb_pdnstools.errors import PDNSApiNotFoundError
-from fb_pdnstools.errors import PDNSApiValidationError
+import psutil
+from requests.exceptions import ConnectTimeout, ConnectionError, ReadTimeout
+# Own modules
+from .mail import BaseMailApplication, MailAppError
from .. import __version__ as GLOBAL_VERSION
from .. import pp
from ..argparse_actions import PortOptionAction
-from .mail import MailAppError, BaseMailApplication
from ..config.pdns import PdnsConfiguration
# from ..config.pdns import PdnsConfigError, PdnsConfiguration
from ..xlate import XLATOR
-__version__ = '0.9.10'
+__version__ = '0.9.11'
LOG = logging.getLogger(__name__)
_ = XLATOR.gettext
# =============================================================================
class PpPDNSAppError(MailAppError):
- """Base error class for all exceptions happened during
- execution this configured application"""
+ """
+ Base error class for this module.
+ It is intended for all exceptions happened during
+ execution this configured application.
+ """
# =============================================================================
class PpPDNSApplication(BaseMailApplication):
- """
- Class for configured application objects related to PowerDNS.
- """
+ """Class for configured application objects related to PowerDNS."""
# -------------------------------------------------------------------------
def __init__(
cfg_class=PdnsConfiguration, initialized=False, usage=None, description=None,
argparse_epilog=None, argparse_prefix_chars='-', env_prefix=None,
+ """Initialize this application object."""
if instance:
self._instance = instance
if not self.cfg:
- msg = _("Configuration not available.")
+ msg = _('Configuration not available.')
raise PpPDNSAppError(msg)
# -----------------------------------------------------------
def api_key(self):
- "The API key to use the PowerDNS API"
+ """Return the API key to use the PowerDNS API."""
return self._api_key
def api_key(self, value):
if value is None or str(value).strip() == '':
- raise PpPDNSAppError(_("Invalid API key {!r} given.").format(value))
+ raise PpPDNSAppError(_('Invalid API key {!r} given.').format(value))
self._api_key = str(value).strip()
# -----------------------------------------------------------
def api_host(self):
- "The host name or address providing the PowerDNS API."
+ """Return the host name or address providing the PowerDNS API."""
return self._api_host
def api_host(self, value):
if value is None or str(value).strip() == '':
- raise PpPDNSAppError(_("Invalid API host {!r} given.").format(value))
+ raise PpPDNSAppError(_('Invalid API host {!r} given.').format(value))
self._api_host = str(value).strip().lower()
# -----------------------------------------------------------
def api_port(self):
- "The TCP port number of the PowerDNS API."
+ """Return the TCP port number of the PowerDNS API."""
return self._api_port
def api_port(self, value):
v = int(value)
if v < 1:
- raise PpPDNSAppError(_("Invalid API port {!r} given.").format(value))
+ raise PpPDNSAppError(_('Invalid API port {!r} given.').format(value))
self._api_port = v
# -----------------------------------------------------------
def api_servername(self):
- "The (virtual) name of the PowerDNS server used in API calls."
+ """Return the (virtual) name of the PowerDNS server used in API calls."""
return self._api_servername
def api_servername(self, value):
if value is None or str(value).strip() == '':
- raise PpPDNSAppError(_("Invalid API server name {!r} given.").format(value))
+ raise PpPDNSAppError(_('Invalid API server name {!r} given.').format(value))
self._api_servername = str(value).strip()
# -----------------------------------------------------------
def api_server_version(self):
- "The version of the PowerDNS server, how provided by API."
+ """Return the version of the PowerDNS server, how provided by API."""
return self._api_server_version
# -----------------------------------------------------------
def instance(self):
- "The name of the PowerDNS instance."
+ """Return the name of the PowerDNS instance."""
return self._instance
def instance(self, value):
if value is None:
- raise PpPDNSAppError(_("Invalid instance {!r} given.").format(None))
+ raise PpPDNSAppError(_('Invalid instance {!r} given.').format(None))
v = str(value).strip().lower()
if v not in self.api_keys.keys():
- raise PpPDNSAppError(_("Invalid instance {!r} given.").format(value))
+ raise PpPDNSAppError(_('Invalid instance {!r} given.').format(value))
# -------------------------------------------------------------------------
def eval_instance(self, inst_name):
+ """Evaluate the active PDNS-Server from configuration."""
if self.verbose > 2:
- msg = _("Evaluating instance {!r} ...").format(inst_name)
+ msg = _('Evaluating instance {!r} ...').format(inst_name)
if not self.cfg:
- msg = _("Configuration not available.")
+ msg = _('Configuration not available.')
raise PpPDNSAppError(msg)
if inst_name not in self.cfg.pdns_api_instances:
- msg = _("PDNS instance {!r} is not configured.").format(inst_name)
+ msg = _('PDNS instance {!r} is not configured.').format(inst_name)
raise PpPDNSAppError(msg)
self._instance = inst_name
# -------------------------------------------------------------------------
def as_dict(self, short=True):
- Transforms the elements of the object into a dict
+ Transform the elements of the object into a dict.
@param short: don't include local properties in resulting dict.
@type short: bool
@return: structure as dict
@rtype: dict
res = super(PpPDNSApplication, self).as_dict(short=short)
res['api_host'] = self.api_host
res['api_port'] = self.api_port
# -------------------------------------------------------------------------
def init_arg_parser(self):
- Method to initiate the argument parser.
+ Initiate the argument parser with special command line parameters.
This method should be explicitely called by all init_arg_parser()
methods in descendant classes.
pdns_group = self.arg_parser.add_argument_group(_('PowerDNS API options'))
inst_group = pdns_group.add_mutually_exclusive_group()
'-I', '--inst', '--instance',
- metavar=_("INSTANCE"), choices=insts, dest="inst",
+ metavar=_('INSTANCE'), choices=insts, dest='inst',
- "Select, which PowerDNS instance to use. Valid values: {v}, "
- "default: {d!r}.").format(v=inst_list, d=self.instance),
+ 'Select, which PowerDNS instance to use. Valid values: {v}, '
+ 'default: {d!r}.').format(v=inst_list, d=self.instance),
'-G', '--global',
- action='store_true', dest="inst_global",
- help=_("Using the {!r} PowerDNS instance.").format('global'),
+ action='store_true', dest='inst_global',
+ help=_('Using the {!r} PowerDNS instance.').format('global'),
'-L', '--local',
- action='store_true', dest="inst_local",
- help=_("Using the {!r} PowerDNS instance.").format('local'),
+ action='store_true', dest='inst_local',
+ help=_('Using the {!r} PowerDNS instance.').format('local'),
'-P', '--public',
- action='store_true', dest="inst_public",
- help=_("Using the {!r} PowerDNS instance.").format('public'),
+ action='store_true', dest='inst_public',
+ help=_('Using the {!r} PowerDNS instance.').format('public'),
'-p', '--port',
- metavar=_("PORT"), type=int, dest='api_port',
+ metavar=_('PORT'), type=int, dest='api_port',
- what="PowerDNS API", action=PortOptionAction,
- help=_("Which port to connect to PowerDNS API, default: {}.").format(
+ what='PowerDNS API', action=PortOptionAction,
+ help=_('Which port to connect to PowerDNS API, default: {}.').format(
help_msg = _(
- "The timeout in seconds for the communication with the PowerDNS-API. "
- "Default: {} seconds.").format(PdnsConfiguration.default_pdns_timeout)
+ 'The timeout in seconds for the communication with the PowerDNS-API. '
+ 'Default: {} seconds.').format(PdnsConfiguration.default_pdns_timeout)
- '-T', '--timeout', metavar=_("SECONDS"), type=int, dest='timeout',
+ '-T', '--timeout', metavar=_('SECONDS'), type=int, dest='timeout',
# -------------------------------------------------------------------------
def init_logging(self):
- """Initialize the logger object.
+ """
+ Initialize the logger object.
It creates a colored loghandler with all output to STDERR.
Maybe overridden in descendant classes.
@return: None
super(PpPDNSApplication, self).init_logging()
if self.verbose < 3:
path = section[key].strip()
if not path:
- msg = _("No path given for{d} [{s}]/{k} in configuration.").format(
+ msg = _('No path given for{d} [{s}]/{k} in configuration.').format(
d=d, s=section_name, k=key)
self.config_has_errors = True
if absolute and not os.path.isabs(path):
msg = _(
- "Path {p!r} for{d} [{s}]/{k} in configuration must be an absolute "
- "path.").format(p=path, d=d, s=section_name, k=key)
+ 'Path {p!r} for{d} [{s}]/{k} in configuration must be an absolute '
+ 'path.').format(p=path, d=d, s=section_name, k=key)
self.config_has_errors = True
# -------------------------------------------------------------------------
def post_init(self):
- Method to execute before calling run(). Here could be done some
- finishing actions after reading in commandline parameters,
- configuration a.s.o.
+ Execute this before calling run().
+ Here could be done some finishing actions after reading in commandline
+ parameters, configuration a.s.o.
This method could be overwritten by descendant classes, these
methods should allways include a call to post_init() of the
parent class.
if self.verbose > 1:
- LOG.debug(_("Executing {} ...").format('post_init()'))
+ LOG.debug(_('Executing {} ...').format('post_init()'))
super(PpPDNSApplication, self).post_init()
if self.args.timeout:
if self.args.timeout > MAX_TIMEOUT:
- msg = _("Timeout of {t} seconds too big, at most {m} seconds are allowed.")
+ msg = _('Timeout of {t} seconds too big, at most {m} seconds are allowed.')
msg = msg.format(t=self.args.timeout, m=MAX_TIMEOUT)
self.exit(1, msg)
if self.args.timeout < 0:
- msg = _("A timeout may not be negative.")
+ msg = _('A timeout may not be negative.')
self.exit(1, msg)
self.cfg.pdns_timeout = self.args.timeout
# -------------------------------------------------------------------------
def pre_run(self):
- """
- Dummy function to run before the main routine.
- Could be overwritten by descendant classes.
- """
+ """Run this before the main routine."""
if self.verbose > 1:
- LOG.debug(_("Executing {} ...").format('pre_run()'))
+ LOG.debug(_('Executing {} ...').format('pre_run()'))
- LOG.debug(_("Setting Loglevel of the requests module to {}.").format('WARNING'))
- logging.getLogger("requests").setLevel(logging.WARNING)
+ LOG.debug(_('Setting Loglevel of the requests module to {}.').format('WARNING'))
+ logging.getLogger('requests').setLevel(logging.WARNING)
super(PpPDNSApplication, self).pre_run()
# -------------------------------------------------------------------------
def _run(self):
- Dummy function as main routine.
+ Execute the underlaying things.
MUST be overwritten by descendant classes.
- LOG.debug(_("Executing nothing ..."))
+ LOG.debug(_('Executing nothing ...'))
# -------------------------------------------------------------------------
def post_run(self):
- Dummy function to run after the main routine.
- Could be overwritten by descendant classes.
+ Execute this cwafterbefore the main routine.
+ Could be overwritten by descendant classes.
if self.verbose > 1:
- LOG.debug(_("Executing {} ...").format('post_run()'))
+ LOG.debug(_('Executing {} ...').format('post_run()'))
if self.pdns:
self.pdns = None
# -------------------------------------------------------------------------
def get_api_server_version(self):
+ """Try to get the version of the requested PowerDNS server."""
if not self.pdns:
- raise PpPDNSAppError(_("The PDNS server object does not exists."))
+ raise PpPDNSAppError(_('The PDNS server object does not exists.'))
if not self.pdns.initialized:
- raise PpPDNSAppError(_("The PDNS server object is not initialized."))
+ raise PpPDNSAppError(_('The PDNS server object is not initialized.'))
return self.pdns.get_api_server_version()
except (ConnectionError, ReadTimeout, ConnectTimeout) as e:
- msg = _("Got a {} during evaluating the PDNS server version from API:").format(
+ msg = _('Got a {} during evaluating the PDNS server version from API:').format(
msg += ' ' + str(e)
raise PpPDNSAppError(msg)
url += ':{}'.format(self.api_port)
url += '/api/v1' + path
- LOG.debug("Used URL: {!r}".format(url))
+ LOG.debug('Used URL: {!r}'.format(url))
return url
# -------------------------------------------------------------------------
def perform_request(self, path, method='GET', data=None, headers=None, may_simulate=False):
- """Performing the underlying API request."""
+ """Perform the underlying API request."""
if not self.pdns:
- raise PpPDNSAppError(_("The PDNS server object does not exists."))
+ raise PpPDNSAppError(_('The PDNS server object does not exists.'))
if not self.pdns.initialized:
- raise PpPDNSAppError(_("The PDNS server object is not initialized."))
+ raise PpPDNSAppError(_('The PDNS server object is not initialized.'))
return self.pdns.perform_request(
path, method=method, data=data, headers=headers, may_simulate=may_simulate)
# -------------------------------------------------------------------------
def get_api_zones(self):
+ """Try to get the list of supported DNS zones from PDNS-Server."""
if not self.pdns:
- raise PpPDNSAppError(_("The PDNS server object does not exists."))
+ raise PpPDNSAppError(_('The PDNS server object does not exists.'))
if not self.pdns.initialized:
- raise PpPDNSAppError(_("The PDNS server object is not initialized."))
+ raise PpPDNSAppError(_('The PDNS server object is not initialized.'))
return self.pdns.get_api_zones()
# -------------------------------------------------------------------------
def get_api_zone(self, zone_name):
+ """Try to get the given DNS zone from PDNS-Server."""
if not self.pdns:
- raise PpPDNSAppError(_("The PDNS server object does not exists."))
+ raise PpPDNSAppError(_('The PDNS server object does not exists.'))
if not self.pdns.initialized:
- raise PpPDNSAppError(_("The PDNS server object is not initialized."))
+ raise PpPDNSAppError(_('The PDNS server object is not initialized.'))
zone_unicode = zone_name
json_response = None
- zout = "{!r}".format(zone_name)
+ zout = '{!r}'.format(zone_name)
if 'xn--' in zone_name:
zone_unicode = zone_name.encode('idna').decode('idna')
- zout = "{!r} ({})".format(zone_name, zone_unicode)
- LOG.debug(_("Trying to get complete information about zone {!r} ...").format(zone_name))
+ zout = '{!r} ({})'.format(zone_name, zone_unicode)
+ LOG.debug(_('Trying to get complete information about zone {!r} ...').format(zone_name))
- path = "/servers/{}/zones/{}".format(self.pdns.api_servername, zone_name)
+ path = '/servers/{}/zones/{}'.format(self.pdns.api_servername, zone_name)
json_response = self.perform_request(path)
except (PDNSApiNotFoundError, PDNSApiValidationError):
- LOG.error(_("The given zone {} was not found.").format(zout))
+ LOG.error(_('The given zone {} was not found.').format(zout))
return None
if self.verbose > 2:
- LOG.debug(_("Got a response:") + '\n' + pp(json_response))
+ LOG.debug(_('Got a response:') + '\n' + pp(json_response))
zone = PowerDNSZone.init_from_dict(
json_response, appname=self.appname, verbose=self.verbose, base_dir=self.base_dir)
if self.verbose > 2:
- LOG.debug(_("Zone object:") + '\n' + pp(zone.as_dict()))
+ LOG.debug(_('Zone object:') + '\n' + pp(zone.as_dict()))
return zone
# =============================================================================
-if __name__ == "__main__":
+if __name__ == '__main__':