From 81f325cef0bfcaebd0583097deb8ac60a760266b Mon Sep 17 00:00:00 2001 From: Frank Brehm Date: Tue, 20 Sep 2022 18:38:46 +0200 Subject: [PATCH] Continued with set-ldap-password --- lib/pp_admintools/app/__init__.py | 99 ++++++++++- lib/pp_admintools/app/ldap.py | 64 ++++++- lib/pp_admintools/app/set_ldap_password.py | 183 ++++++++++++++++++++- 3 files changed, 333 insertions(+), 13 deletions(-) diff --git a/lib/pp_admintools/app/__init__.py b/lib/pp_admintools/app/__init__.py index 77d0f80..fef7bfb 100644 --- a/lib/pp_admintools/app/__init__.py +++ b/lib/pp_admintools/app/__init__.py @@ -12,6 +12,7 @@ import logging import argparse import signal import re +import getpass # Third party modules from fb_tools.common import to_bool @@ -32,7 +33,7 @@ LOG = logging.getLogger(__name__) _ = XLATOR.gettext ngettext = XLATOR.ngettext -__version__ = '0.2.1' +__version__ = '0.3.1' # ============================================================================= @@ -219,12 +220,106 @@ class BaseDPXApplication(FbConfigApplication): self.yes = self.args.yes + # ------------------------------------------------------------------------- + def get_password(self, first_prompt=None, second_prompt=None, may_empty=True, repeat=True): + """ + Ask the user for a password on the console. + + @raise AbortAppError: if the user presses Ctrl-D (EOF) + @raise TimeoutOnPromptError: if the user does not finishing after a time + + @param first_prompt: the prompt for the first password question + @type first_prompt: str + @param second_prompt: the prompt for the second password question + @type second_prompt: str + @param may_empty: The behaviour, if the user inputs an empty password: + if True, an empty password will be returned + if False, the question will be repeated. + @type may_empty: bool + @param repeat: Asking for the password a second time, which must be equal + to the first given password. + @type repeat: bool + + @return: The entered password + @rtype: str + + """ + + if not first_prompt: + first_prompt = _('Password:') + ' ' + + if not second_prompt: + second_prompt = _('Repeat password:') + ' ' + + ret_passwd = None + second_passwd = None + + while True: + + ret_passwd = self._get_password(first_prompt, may_empty) + if ret_passwd: + if repeat: + second_passwd = self._get_password(second_prompt, may_empty=False) + if ret_passwd != second_passwd: + msg = _("The entered passwords does not match.") + LOG.error(msg) + continue + break + + return ret_passwd + + # ------------------------------------------------------------------------- + def _get_password(self, prompt, may_empty=True): + + def passwd_alarm_caller(signum, sigframe): + raise TimeoutOnPromptError(self.prompt_timeout) + + msg_intr = _("Interrupted on demand.") + ret_passwd = '' + + try: + signal.signal(signal.SIGALRM, passwd_alarm_caller) + signal.alarm(self.prompt_timeout) + + while True: + + try: + ret_passwd = getpass.getpass(prompt) + except EOFError: + raise AbortAppError(msg_intr) + + signal.alarm(self.prompt_timeout) + + if ret_passwd == '': + if may_empty: + return '' + else: + continue + else: + break + + except (TimeoutOnPromptError, AbortAppError) as e: + msg = _("Got a {}:").format(e.__class__.__name__) + ' ' + str(e) + LOG.error(msg) + self.exit(10) + + except KeyboardInterrupt: + msg = _("Got a {}:").format('KeyboardInterrupt') + ' ' + msg_intr + LOG.error(msg) + self.exit(10) + + finally: + signal.alarm(0) + + return ret_passwd + # ------------------------------------------------------------------------- def ask_for_yes_or_no(self, prompt, default_on_empty=None): """ Ask the user for yes or no. - @raise PjdInterruptError: if the user presses Ctrl-D (EOF) + @raise AbortAppError: if the user presses Ctrl-D (EOF) + @raise TimeoutOnPromptError: if the user does not correct answer after a time @param prompt: the prompt for the question @type prompt: str diff --git a/lib/pp_admintools/app/ldap.py b/lib/pp_admintools/app/ldap.py index 7293036..cd8f6cb 100644 --- a/lib/pp_admintools/app/ldap.py +++ b/lib/pp_admintools/app/ldap.py @@ -50,7 +50,7 @@ from ..config.ldap import LdapConnectionInfo, LdapConfiguration # rom ..config.ldap import DEFAULT_PORT_LDAP, DEFAULT_PORT_LDAPS from ..config.ldap import DEFAULT_TIMEOUT -__version__ = '0.6.1' +__version__ = '0.6.2' LOG = logging.getLogger(__name__) _ = XLATOR.gettext @@ -91,9 +91,10 @@ class DeleteLDAPItemError(FatalLDAPError): class PasswordFileOptionAction(argparse.Action): # ------------------------------------------------------------------------- - def __init__(self, option_strings, must_exists=True, *args, **kwargs): + def __init__(self, option_strings, must_exists=True, must_absolute=True, *args, **kwargs): self.must_exists = bool(must_exists) + self.must_absolute = bool(must_absolute) super(PasswordFileOptionAction, self).__init__( option_strings=option_strings, *args, **kwargs) @@ -102,9 +103,10 @@ class PasswordFileOptionAction(argparse.Action): def __call__(self, parser, namespace, given_path, option_string=None): path = Path(given_path) - if not path.is_absolute(): - msg = _("The path {!r} must be an absolute path.").format(given_path) - raise argparse.ArgumentError(self, msg) + if must_absolute: + if not path.is_absolute(): + msg = _("The path {!r} must be an absolute path.").format(given_path) + raise argparse.ArgumentError(self, msg) if self.must_exists: @@ -397,8 +399,10 @@ class BaseLdapApplication(BaseDPXApplication): if insts is None: insts = [] + elif not self.use_multiple_ldap_connections: + insts = [insts] - if self.use_multiple_ldap_connections and len(insts) == 1 and insts[0].lower() == 'list': + if len(insts) == 1 and insts[0].lower() == 'list': self._show_ldap_instances() self.exit(0) return @@ -425,19 +429,34 @@ class BaseLdapApplication(BaseDPXApplication): instances.insert(0, 'default') max_key_len = 1 + max_tier_len = 1 + max_url_len = 1 + max_bind_dn_len = 1 for inst in instances: if len(inst) > max_key_len: max_key_len = len(inst) + cfg = self.cfg.ldap_connection[inst] + if len(cfg.tier) > max_tier_len: + max_tier_len = len(cfg.tier) + url = '{url}/{base}'.format(url=cfg.url, base=cfg.base_dn) + if len(url) > max_url_len: + max_url_len = len(url) + if len(cfg.bind_dn) > max_bind_dn_len: + max_bind_dn_len = len(cfg.bind_dn) + max_key_len += 1 title = _("Configured LDAP instances:") print(title) print('-' * len(title)) print() + tpl = "{inst:<{width}} {url:<{url_l}} {bind_dn:<{bind_dn_l}} {tier}" for inst in instances: cfg = self.cfg.ldap_connection[inst] - print("{inst:<{width}} {url}/{base}".format( - inst=(inst + ':'), width=max_key_len, url=cfg.url, base=cfg.base_dn)) + url = '{url}/{base}'.format(url=cfg.url, base=cfg.base_dn) + print(tpl.format( + inst=(inst + ':'), width=max_key_len, url=url, bind_dn=cfg.bind_dn, + bind_dn_l=max_bind_dn_len, url_l=max_url_len, tier=cfg.tier)) print() # ------------------------------------------------------------------------- @@ -1152,6 +1171,35 @@ class BaseLdapApplication(BaseDPXApplication): return result + # ------------------------------------------------------------------------- + def read_password_file(self, pw_file): + """Reading a password from the given password file. It returns the first + stripped non empty line.""" + + if self.verbose > 1: + abs_path = pw_file.resolve() + LOG.debug(_("Reading password file {!r} ...").format(str(abs_path))) + + if not pw_file.exists(): + msg = _("The file {!r} does not exists.").format(str(pw_file)) + LOG.warn(msg) + return None + + if not pw_file.is_file(): + msg = _("The given path {!r} exists, but is not a regular file.").format(str(pw_file)) + LOG.warn(msg) + return None + + if not os.access(str(pw_file), os.R_OK): + msg = _("The given file {!r} is not readable.").format(str(pw_file)) + LOG.warn(msg) + return None + + content = pw_file.read_text(encoding='utf-8') + for line in content.splitlines(): + if line.strip() != '': + return line.strip() + return None # ============================================================================= if __name__ == "__main__": diff --git a/lib/pp_admintools/app/set_ldap_password.py b/lib/pp_admintools/app/set_ldap_password.py index dd548e6..22382e7 100644 --- a/lib/pp_admintools/app/set_ldap_password.py +++ b/lib/pp_admintools/app/set_ldap_password.py @@ -9,12 +9,15 @@ from __future__ import absolute_import # Standard modules import logging -import time -import crypt +import os +import pwd +import getpass # Third party modules from ldap3 import MODIFY_REPLACE, MODIFY_ADD, MODIFY_DELETE +import passlib.apps + # Own modules from fb_tools.common import to_bool, is_sequence, pp @@ -24,8 +27,9 @@ from . import AbortAppError, TimeoutOnPromptError from .ldap import LdapAppError, FatalLDAPError from .ldap import BaseLdapApplication +from .ldap import PasswordFileOptionAction -__version__ = '0.1.0' +__version__ = '0.2.0' LOG = logging.getLogger(__name__) _ = XLATOR.gettext @@ -43,6 +47,179 @@ class SetLdapPasswordError(LdapAppError): class SetLdapPasswordApplication(BaseLdapApplication): """Application class for setting a LDAP password.""" + current_userid = os.getuid() + current_user = None + try: + current_user = getpass.getuser() + except KeyError: + pass + + ldap_context = passlib.apps.ldap_context + available_schemes = list(ldap_context.schemes()) + available_schemes.append('ldap_pbkdf2_sha1') + available_schemes.append('ldap_pbkdf2_sha256') + available_schemes.append('ldap_pbkdf2_sha512') + + passlib_context = passlib.context.CryptContext(schemes=available_schemes) + default_schema = 'ldap_salted_sha256' + passlib_context.update(default=default_schema) + + # ------------------------------------------------------------------------- + def __init__(self, appname=None, base_dir=None): + + self.use_default_ldap_connection = False + self.use_multiple_ldap_connections = False + self.show_cmdline_ldap_timeout = True + + self.current_password = None + self.need_current_password = False + self.do_user_bind = False + self.ask_for_password = False + self.new_password = None + self.user_uid = None + self.user_dn = None + self.schema = None + + my_appname = self.get_generic_appname(appname) + + desc = _( + "Changing the password of the given user. If no user was given, then " + "{app} tries to use the name of the user logged in on the controlling terminal." + ).format(app=my_appname) + + super(SetLdapPasswordApplication, self).__init__( + appname=appname, description=desc, base_dir=base_dir, initialized=False) + + self.initialized = True + + # ------------------------------------------------------------------------- + def as_dict(self, short=True): + """ + Transforms the elements of the object into a dict + + @param short: don't include local properties in resulting dict. + @type short: bool + + @return: structure as dict + @rtype: dict + """ + + res = super(SetLdapPasswordApplication, self).as_dict(short=short) + + res['available_schemes'] = self.available_schemes + res['default_schema'] = self.passlib_context.default_scheme() + if self.current_password and self.verbose < 5: + res['current_password'] = '******' + + return res + + # ------------------------------------------------------------------------- + def init_arg_parser(self): + + super(SetLdapPasswordApplication, self).init_arg_parser() + + app_group = self.arg_parser.add_argument_group(_("Options for {}").format( + self.appname)) + + pw_group = app_group.add_mutually_exclusive_group() + + pw_group.add_argument( + '-w', '--password', metavar=_("PASSWORD"), dest="current_pw", + help=_("Use PASSWORD as the current user password."), + ) + + pw_group.add_argument( + '-W', '--password-prompt', action="store_true", dest="current_pw_prompt", + help=_( + "Prompt for current user password. This is used instead of " + "specifying the password on the command line."), + ) + + pw_group.add_argument( + '-y', '--password-file', metavar=_('PASSWORD_FILE'), dest="current_pw_file", + must_absolute=False, action=PasswordFileOptionAction, + help=_("Use contents of PASSWORD_FILE as the current user password."), + ) + + user_help = _( + "The user, which password in the given LDAP instance should be changed. " + "It may be given by its Uid (the alphanumeric POSIX name), its mail address " + "or its LDAP DN.") + if self.current_user: + user_help += ' ' + _( + "If not given, then your current user name {!r} will be used.").format( + self.current_user) + user_help += ' ' + _( + "If you are using a readonly LDAP instance or an instance w/o admin access, " + "then you must provide somehow the current password of the user to change.") + + if self.current_user: + app_group.add_argument( + 'user', metavar=_('USER'), nargs='?', help=user_help) + else: + app_group.add_argument( + 'user', metavar=_('USER'), help=user_help) + + # ------------------------------------------------------------------------- + def post_init(self): + """ + Method to execute before calling run(). + """ + + super(SetLdapPasswordApplication, self).post_init() + + given_user = getattr(self.args, 'user', None) + if given_user: + self.user_uid = given_user + else: + if self.current_user: + given_user = self.current_user + self.do_user_bind = True + self.user_uid = given_user + else: + msg = _("Could not detect your current login name.") + LOG.error(msg) + self.exit(1) + + if given_user == 'root': + msg = _("The user {!r} will never be managed by LDAP.").format('root') + LOG.error(msg) + self.exit(1) + + if self.args.current_pw: + self.current_password = self.args.current_pw + self.do_user_bind = True + elif self.args.current_pw_prompt: + self.do_user_bind = True + elif self.args.current_pw_file: + self.current_password = self.read_password_file(self.args.current_pw_file) + self.do_user_bind = True + + inst = self.ldap_instances[0] + ldap = self.cfg.ldap_connection[inst] + if not ldap.is_admin or ldap.readonly: + self.do_user_bind = True + + # ------------------------------------------------------------------------- + def pre_run(self): + + LOG.debug("Pre running tasks ...") + super(SetLdapPasswordApplication, self).pre_run() + + if self.do_user_bind and not self.current_password: + first_prompt = _("Current password of user {!r}:").format(self.user_uid) + ' ' + second_prompt = _('Repeat password:') + ' ' + self.current_password = self.get_password( + first_prompt, second_prompt, may_empty=False, repeat=False) + + # ------------------------------------------------------------------------- + def _run(self): + + inst = self.ldap_instances[0] + ldap = self.cfg.ldap_connection[inst] + msg = _("Using LDAP instance {inst!r} - {url}.").format(inst=inst, url=ldap.url) + LOG.info(msg) + # ============================================================================= if __name__ == "__main__": -- 2.39.5