]> Frank Brehm's Git Trees - pixelpark/pp-admin-tools.git/commitdiff
Continued with set-ldap-password
authorFrank Brehm <frank.brehm@pixelpark.com>
Tue, 20 Sep 2022 16:38:46 +0000 (18:38 +0200)
committerFrank Brehm <frank.brehm@pixelpark.com>
Tue, 20 Sep 2022 16:38:46 +0000 (18:38 +0200)
lib/pp_admintools/app/__init__.py
lib/pp_admintools/app/ldap.py
lib/pp_admintools/app/set_ldap_password.py

index 77d0f80b1fbc544a815f9c98f92ff644527938c6..fef7bfbb02d9bb4dff22ce2ad23db25f5aa5f751 100644 (file)
@@ -12,6 +12,7 @@ import logging
 import argparse
 import signal
 import re
+import getpass
 
 # Third party modules
 from fb_tools.common import to_bool
@@ -32,7 +33,7 @@ LOG = logging.getLogger(__name__)
 _ = XLATOR.gettext
 ngettext = XLATOR.ngettext
 
-__version__ = '0.2.1'
+__version__ = '0.3.1'
 
 
 # =============================================================================
@@ -219,12 +220,106 @@ class BaseDPXApplication(FbConfigApplication):
 
         self.yes = self.args.yes
 
+    # -------------------------------------------------------------------------
+    def get_password(self, first_prompt=None, second_prompt=None, may_empty=True, repeat=True):
+        """
+        Ask the user for a password on the console.
+
+        @raise AbortAppError: if the user presses Ctrl-D (EOF)
+        @raise TimeoutOnPromptError: if the user does not finishing after a time
+
+        @param first_prompt: the prompt for the first password question
+        @type first_prompt: str
+        @param second_prompt: the prompt for the second password question
+        @type second_prompt: str
+        @param may_empty: The behaviour, if the user inputs an empty password:
+                          if True, an empty password will be returned
+                          if False, the question will be repeated.
+        @type may_empty: bool
+        @param repeat: Asking for the password a second time, which must be equal
+                       to the first given password.
+        @type repeat: bool
+
+        @return: The entered password
+        @rtype: str
+
+        """
+
+        if not first_prompt:
+            first_prompt = _('Password:') + ' '
+
+        if not second_prompt:
+            second_prompt = _('Repeat password:') + ' '
+
+        ret_passwd = None
+        second_passwd = None
+
+        while True:
+
+            ret_passwd = self._get_password(first_prompt, may_empty)
+            if ret_passwd:
+                if repeat:
+                    second_passwd = self._get_password(second_prompt, may_empty=False)
+                    if ret_passwd != second_passwd:
+                        msg = _("The entered passwords does not match.")
+                        LOG.error(msg)
+                        continue
+            break
+
+        return ret_passwd
+
+    # -------------------------------------------------------------------------
+    def _get_password(self, prompt, may_empty=True):
+
+        def passwd_alarm_caller(signum, sigframe):
+            raise TimeoutOnPromptError(self.prompt_timeout)
+
+        msg_intr = _("Interrupted on demand.")
+        ret_passwd = ''
+
+        try:
+            signal.signal(signal.SIGALRM, passwd_alarm_caller)
+            signal.alarm(self.prompt_timeout)
+
+            while True:
+
+                try:
+                    ret_passwd = getpass.getpass(prompt)
+                except EOFError:
+                    raise AbortAppError(msg_intr)
+
+                signal.alarm(self.prompt_timeout)
+
+                if ret_passwd == '':
+                    if may_empty:
+                        return ''
+                    else:
+                        continue
+                else:
+                    break
+
+        except (TimeoutOnPromptError, AbortAppError) as e:
+            msg = _("Got a {}:").format(e.__class__.__name__) + ' ' + str(e)
+            LOG.error(msg)
+            self.exit(10)
+
+        except KeyboardInterrupt:
+            msg = _("Got a {}:").format('KeyboardInterrupt') + ' ' + msg_intr
+            LOG.error(msg)
+            self.exit(10)
+
+        finally:
+            signal.alarm(0)
+
+        return ret_passwd
+
     # -------------------------------------------------------------------------
     def ask_for_yes_or_no(self, prompt, default_on_empty=None):
         """
         Ask the user for yes or no.
 
-        @raise PjdInterruptError: if the user presses Ctrl-D (EOF)
+        @raise AbortAppError: if the user presses Ctrl-D (EOF)
+        @raise TimeoutOnPromptError: if the user does not correct answer after a time
 
         @param prompt: the prompt for the question
         @type prompt: str
index 729303605f7ce6cfa5019e4a520087c09a389b5c..cd8f6cb75541017b409fcf1c746462481d71354f 100644 (file)
@@ -50,7 +50,7 @@ from ..config.ldap import LdapConnectionInfo, LdapConfiguration
 # rom ..config.ldap import DEFAULT_PORT_LDAP, DEFAULT_PORT_LDAPS
 from ..config.ldap import DEFAULT_TIMEOUT
 
-__version__ = '0.6.1'
+__version__ = '0.6.2'
 LOG = logging.getLogger(__name__)
 
 _ = XLATOR.gettext
@@ -91,9 +91,10 @@ class DeleteLDAPItemError(FatalLDAPError):
 class PasswordFileOptionAction(argparse.Action):
 
     # -------------------------------------------------------------------------
-    def __init__(self, option_strings, must_exists=True, *args, **kwargs):
+    def __init__(self, option_strings, must_exists=True, must_absolute=True, *args, **kwargs):
 
         self.must_exists = bool(must_exists)
+        self.must_absolute = bool(must_absolute)
 
         super(PasswordFileOptionAction, self).__init__(
             option_strings=option_strings, *args, **kwargs)
@@ -102,9 +103,10 @@ class PasswordFileOptionAction(argparse.Action):
     def __call__(self, parser, namespace, given_path, option_string=None):
 
         path = Path(given_path)
-        if not path.is_absolute():
-            msg = _("The path {!r} must be an absolute path.").format(given_path)
-            raise argparse.ArgumentError(self, msg)
+        if must_absolute:
+            if not path.is_absolute():
+                msg = _("The path {!r} must be an absolute path.").format(given_path)
+                raise argparse.ArgumentError(self, msg)
 
         if self.must_exists:
 
@@ -397,8 +399,10 @@ class BaseLdapApplication(BaseDPXApplication):
 
         if insts is None:
             insts = []
+        elif not self.use_multiple_ldap_connections:
+            insts = [insts]
 
-        if self.use_multiple_ldap_connections and len(insts) == 1 and insts[0].lower() == 'list':
+        if len(insts) == 1 and insts[0].lower() == 'list':
             self._show_ldap_instances()
             self.exit(0)
             return
@@ -425,19 +429,34 @@ class BaseLdapApplication(BaseDPXApplication):
             instances.insert(0, 'default')
 
         max_key_len = 1
+        max_tier_len = 1
+        max_url_len = 1
+        max_bind_dn_len = 1
         for inst in instances:
             if len(inst) > max_key_len:
                 max_key_len = len(inst)
+            cfg = self.cfg.ldap_connection[inst]
+            if len(cfg.tier) > max_tier_len:
+                max_tier_len = len(cfg.tier)
+            url = '{url}/{base}'.format(url=cfg.url, base=cfg.base_dn)
+            if len(url) > max_url_len:
+                max_url_len = len(url)
+            if len(cfg.bind_dn) > max_bind_dn_len:
+                max_bind_dn_len = len(cfg.bind_dn)
+
         max_key_len += 1
 
         title = _("Configured LDAP instances:")
         print(title)
         print('-' * len(title))
         print()
+        tpl = "{inst:<{width}}  {url:<{url_l}}  {bind_dn:<{bind_dn_l}}  {tier}"
         for inst in instances:
             cfg = self.cfg.ldap_connection[inst]
-            print("{inst:<{width}}  {url}/{base}".format(
-                inst=(inst + ':'), width=max_key_len, url=cfg.url, base=cfg.base_dn))
+            url = '{url}/{base}'.format(url=cfg.url, base=cfg.base_dn)
+            print(tpl.format(
+                inst=(inst + ':'), width=max_key_len, url=url, bind_dn=cfg.bind_dn,
+                bind_dn_l=max_bind_dn_len, url_l=max_url_len, tier=cfg.tier))
         print()
 
     # -------------------------------------------------------------------------
@@ -1152,6 +1171,35 @@ class BaseLdapApplication(BaseDPXApplication):
 
         return result
 
+    # -------------------------------------------------------------------------
+    def read_password_file(self, pw_file):
+        """Reading a password from the given password file. It returns the first
+            stripped non empty line."""
+
+        if self.verbose > 1:
+            abs_path = pw_file.resolve()
+            LOG.debug(_("Reading password file {!r} ...").format(str(abs_path)))
+
+        if not pw_file.exists():
+            msg = _("The file {!r} does not exists.").format(str(pw_file))
+            LOG.warn(msg)
+            return None
+
+        if not pw_file.is_file():
+            msg = _("The given path {!r} exists, but is not a regular file.").format(str(pw_file))
+            LOG.warn(msg)
+            return None
+
+        if not os.access(str(pw_file), os.R_OK):
+            msg = _("The given file {!r} is not readable.").format(str(pw_file))
+            LOG.warn(msg)
+            return None
+
+        content = pw_file.read_text(encoding='utf-8')
+        for line in content.splitlines():
+            if line.strip() != '':
+                return line.strip()
+        return None
 
 # =============================================================================
 if __name__ == "__main__":
index dd548e69733d1f3b039f5b78e93a7b9b6e6bc440..22382e75cb3fdd00a2e8cd39c2b702f30d9ff573 100644 (file)
@@ -9,12 +9,15 @@ from __future__ import absolute_import
 
 # Standard modules
 import logging
-import time
-import crypt
+import os
+import pwd
+import getpass
 
 # Third party modules
 from ldap3 import MODIFY_REPLACE, MODIFY_ADD, MODIFY_DELETE
 
+import passlib.apps
+
 # Own modules
 from fb_tools.common import to_bool, is_sequence, pp
 
@@ -24,8 +27,9 @@ from . import AbortAppError, TimeoutOnPromptError
 
 from .ldap import LdapAppError, FatalLDAPError
 from .ldap import BaseLdapApplication
+from .ldap import PasswordFileOptionAction
 
-__version__ = '0.1.0'
+__version__ = '0.2.0'
 LOG = logging.getLogger(__name__)
 
 _ = XLATOR.gettext
@@ -43,6 +47,179 @@ class SetLdapPasswordError(LdapAppError):
 class SetLdapPasswordApplication(BaseLdapApplication):
     """Application class for setting a LDAP password."""
 
+    current_userid = os.getuid()
+    current_user = None
+    try:
+        current_user = getpass.getuser()
+    except KeyError:
+        pass
+
+    ldap_context = passlib.apps.ldap_context
+    available_schemes = list(ldap_context.schemes())
+    available_schemes.append('ldap_pbkdf2_sha1')
+    available_schemes.append('ldap_pbkdf2_sha256')
+    available_schemes.append('ldap_pbkdf2_sha512')
+
+    passlib_context = passlib.context.CryptContext(schemes=available_schemes)
+    default_schema = 'ldap_salted_sha256'
+    passlib_context.update(default=default_schema)
+
+    # -------------------------------------------------------------------------
+    def __init__(self, appname=None, base_dir=None):
+
+        self.use_default_ldap_connection = False
+        self.use_multiple_ldap_connections = False
+        self.show_cmdline_ldap_timeout = True
+
+        self.current_password = None
+        self.need_current_password = False
+        self.do_user_bind = False
+        self.ask_for_password = False
+        self.new_password = None
+        self.user_uid = None
+        self.user_dn = None
+        self.schema = None
+
+        my_appname = self.get_generic_appname(appname)
+
+        desc = _(
+            "Changing the password of the given user. If no user was given, then "
+            "{app} tries to use the name of the user logged in on the controlling terminal."
+        ).format(app=my_appname)
+
+        super(SetLdapPasswordApplication, self).__init__(
+            appname=appname, description=desc, base_dir=base_dir, initialized=False)
+
+        self.initialized = True
+
+    # -------------------------------------------------------------------------
+    def as_dict(self, short=True):
+        """
+        Transforms the elements of the object into a dict
+
+        @param short: don't include local properties in resulting dict.
+        @type short: bool
+
+        @return: structure as dict
+        @rtype:  dict
+        """
+
+        res = super(SetLdapPasswordApplication, self).as_dict(short=short)
+
+        res['available_schemes'] = self.available_schemes
+        res['default_schema'] = self.passlib_context.default_scheme()
+        if self.current_password and self.verbose < 5:
+            res['current_password'] = '******'
+
+        return res
+
+    # -------------------------------------------------------------------------
+    def init_arg_parser(self):
+
+        super(SetLdapPasswordApplication, self).init_arg_parser()
+
+        app_group = self.arg_parser.add_argument_group(_("Options for {}").format(
+            self.appname))
+
+        pw_group = app_group.add_mutually_exclusive_group()
+
+        pw_group.add_argument(
+            '-w', '--password', metavar=_("PASSWORD"), dest="current_pw",
+            help=_("Use PASSWORD as the current user password."),
+        )
+
+        pw_group.add_argument(
+            '-W', '--password-prompt', action="store_true", dest="current_pw_prompt",
+            help=_(
+                "Prompt for current user password. This is used instead of "
+                "specifying the password on the command line."),
+        )
+
+        pw_group.add_argument(
+            '-y', '--password-file', metavar=_('PASSWORD_FILE'), dest="current_pw_file",
+            must_absolute=False, action=PasswordFileOptionAction,
+            help=_("Use contents of PASSWORD_FILE as the current user password."),
+        )
+
+        user_help = _(
+                "The user, which password in the given LDAP instance should be changed. "
+                "It may be given by its Uid (the alphanumeric POSIX name), its mail address "
+                "or its LDAP DN.")
+        if self.current_user:
+            user_help += ' ' + _(
+                "If not given, then your current user name {!r} will be used.").format(
+                self.current_user)
+        user_help += ' ' + _(
+            "If you are using a readonly LDAP instance or an instance w/o admin access, "
+            "then you must provide somehow the current password of the user to change.")
+
+        if self.current_user:
+            app_group.add_argument(
+                'user', metavar=_('USER'), nargs='?', help=user_help)
+        else:
+            app_group.add_argument(
+                'user', metavar=_('USER'), help=user_help)
+
+    # -------------------------------------------------------------------------
+    def post_init(self):
+        """
+        Method to execute before calling run().
+        """
+
+        super(SetLdapPasswordApplication, self).post_init()
+
+        given_user = getattr(self.args, 'user', None)
+        if given_user:
+            self.user_uid = given_user
+        else:
+            if self.current_user:
+                given_user = self.current_user
+                self.do_user_bind = True
+                self.user_uid = given_user
+            else:
+                msg = _("Could not detect your current login name.")
+                LOG.error(msg)
+                self.exit(1)
+
+        if given_user == 'root':
+            msg = _("The user {!r} will never be managed by LDAP.").format('root')
+            LOG.error(msg)
+            self.exit(1)
+
+        if self.args.current_pw:
+            self.current_password = self.args.current_pw
+            self.do_user_bind = True
+        elif self.args.current_pw_prompt:
+            self.do_user_bind = True
+        elif self.args.current_pw_file:
+            self.current_password = self.read_password_file(self.args.current_pw_file)
+            self.do_user_bind = True
+
+        inst = self.ldap_instances[0]
+        ldap = self.cfg.ldap_connection[inst]
+        if not ldap.is_admin or ldap.readonly:
+            self.do_user_bind = True
+
+    # -------------------------------------------------------------------------
+    def pre_run(self):
+
+        LOG.debug("Pre running tasks ...")
+        super(SetLdapPasswordApplication, self).pre_run()
+
+        if self.do_user_bind and not self.current_password:
+            first_prompt = _("Current password of user {!r}:").format(self.user_uid) + ' '
+            second_prompt = _('Repeat password:') + ' '
+            self.current_password = self.get_password(
+                first_prompt, second_prompt, may_empty=False, repeat=False)
+
+    # -------------------------------------------------------------------------
+    def _run(self):
+
+        inst = self.ldap_instances[0]
+        ldap = self.cfg.ldap_connection[inst]
+        msg = _("Using LDAP instance {inst!r} - {url}.").format(inst=inst, url=ldap.url)
+        LOG.info(msg)
+
 
 # =============================================================================
 if __name__ == "__main__":