]> Frank Brehm's Git Trees - pixelpark/pp-admin-tools.git/commitdiff
Continue with lib/pp_admintools/app/remove_ldap_user.py
authorFrank Brehm <frank@brehm-online.com>
Wed, 7 Sep 2022 16:20:33 +0000 (18:20 +0200)
committerFrank Brehm <frank@brehm-online.com>
Wed, 7 Sep 2022 16:20:33 +0000 (18:20 +0200)
lib/pp_admintools/app/__init__.py
lib/pp_admintools/app/ldap.py
lib/pp_admintools/app/remove_ldap_user.py

index 3177b6519432d329b7e24d996f76b0698f58c541..89e8a357fcd54389cc468c3e62f0a10c2e5bd42d 100644 (file)
@@ -10,11 +10,13 @@ from __future__ import absolute_import
 # Standard modules
 import logging
 import argparse
+import signal
+import re
 
 # Third party modules
 from fb_tools.common import to_bool
 from fb_tools.cfg_app import FbConfigApplication
-from fb_tools.errors import FbAppError
+from fb_tools.errors import FbAppError, IoTimeoutError
 from fb_tools.multi_config import BaseMultiConfig
 
 # Own modules
@@ -39,6 +41,23 @@ class DPXAppError(FbAppError):
     pass
 
 
+# =============================================================================
+class AbortAppError(DPXAppError):
+    """Special exception class interrupting the application."""
+    pass
+
+
+# =============================================================================
+class TimeoutOnPromptError(AbortAppError, IoTimeoutError):
+    """Special exception class on timout on a prompt."""
+
+    # -------------------------------------------------------------------------
+    def __init__(self, timeout):
+
+        strerror = _("Timeout on answering on the console.")
+        super(TimeoutOnPromptError, self).__init__(strerror, timeout)
+
+
 # =============================================================================
 class TimeoutOptionAction(argparse.Action):
 
@@ -73,6 +92,7 @@ class BaseDPXApplication(FbConfigApplication):
 
     default_prompt_timeout = 10
     max_prompt_timeout = 600
+    re_yes_no = re.compile(r'^\s*(y|yes|n|no)?\s*$', re.IGNORECASE)
 
     # -------------------------------------------------------------------------
     def __init__(
@@ -165,5 +185,70 @@ class BaseDPXApplication(FbConfigApplication):
 
         self.yes = self.args.yes
 
+    # -------------------------------------------------------------------------
+    def ask_for_yes_or_no(self, prompt, default_on_empty=None):
+        """
+        Ask the user for yes or no.
+
+        @raise PjdInterruptError: if the user presses Ctrl-D (EOF)
+
+        @param prompt: the prompt for the question
+        @type prompt: str
+        @param default_on_empty: behaviour on an empty reply:
+                                 * if None, repeat the question
+                                 * if True, return True
+                                 * else return False
+        @type default_on_empty: bool or None
+
+        @return: True, if the user answered Yes, else False
+        @rtype: bool
+
+        """
+
+        if not prompt:
+            prompt = _('Yes/No') + ' '
+
+        def prompt_alarm_caller(signum, sigframe):
+            raise TimeoutOnPromptError(self.prompt_timeout)
+
+        try:
+            signal.signal(signal.SIGALRM, prompt_alarm_caller)
+            signal.alarm(self.prompt_timeout)
+
+            reply = ''
+            msg_intr = _("Interrupted on demand.")
+            while True:
+                try:
+                    reply = input(prompt)
+                except EOFError:
+                    raise AbortAppError(msg_intr)
+                match = self.re_yes_no.match(reply)
+                if match:
+                    if match.group(1) is None:
+                        if default_on_empty is None:
+                            continue
+                        return bool(default_on_empty)
+                    # There is an answer
+                    r = match.group(1).lower()
+                    if r == 'n' or r == 'no':
+                        # Continue == no
+                        return False
+                    elif r == 'y' or r == 'yes':
+                        # Continue == yes
+                        return True
+                    else:
+                        continue
+                else:
+                    continue
+                # Repeat the question
+
+        except (TimeoutOnPromptError, AbortAppError) as e:
+            msg = _("Got a {}:").format(e.__class__.__name__) + ' ' + str(e)
+            LOG.error(msg)
+            self.exit(10)
+
+        finally:
+            signal.alarm(0)
+
 
 # vim: ts=4 et list
index 9f08a9ad7c991e3891c893f108101946724e69fb..848182582d4b20782410ef970e29bd51adf17ab6 100644 (file)
@@ -31,7 +31,7 @@ from ldap3 import ALL_ATTRIBUTES
 
 from fb_tools.common import pp, is_sequence
 from fb_tools.mailaddress import MailAddress
-from fb_tools.collections import FrozenCIStringSet, CIStringSet
+from fb_tools.collections import FrozenCIStringSet, CIStringSet, CIDict
 
 # Own modules
 from .. import __version__ as GLOBAL_VERSION
@@ -864,6 +864,39 @@ class BaseLdapApplication(BaseDPXApplication):
 
         return result
 
+    # -------------------------------------------------------------------------
+    def normalized_attributes(self, entry):
+
+        attribs = CIDict()
+
+        for attrib in entry['attributes']:
+            values = entry['attributes'][attrib]
+            if attrib.lower() == 'objectclass':
+                if 'objectClass' not in attribs:
+                    attribs['objectClass'] = CIStringSet()
+                if is_sequence(values):
+                    for val in values:
+                        attribs['objectClass'].add(val)
+                else:
+                    attribs['objectClass'].add(values)
+            else:
+                if attrib not in attribs:
+                    attribs[attrib] = []
+                if is_sequence(values):
+                    for val in values:
+                        attribs[attrib].append(val)
+                else:
+                    attribs[attrib].append(values)
+
+        return attribs
+
+    # -------------------------------------------------------------------------
+    def modify_entry(self, inst, dn, changes):
+
+        # connect_info = self.cfg.ldap_connection[inst]
+        if self.verbose > 1:
+            LOG.debug(_("Applying changes:") + '\n' + pp(changes))
+
 
 # =============================================================================
 if __name__ == "__main__":
index c69c8bcd3ebb77969966cac6ddbb17757e3fdfd3..9b5e353c2452595a7bc5783d79b7ac6b7664c0f7 100644 (file)
@@ -9,8 +9,12 @@ from __future__ import absolute_import
 
 # Standard modules
 import logging
+import time
+import crypt
 
 # Third party modules
+# from ldap3 import MODIFY_ADD, MODIFY_DELETE, MODIFY_REPLACE
+from ldap3 import MODIFY_REPLACE, MODIFY_ADD
 
 # Own modules
 from fb_tools.common import to_bool, is_sequence, pp
@@ -20,7 +24,7 @@ from ..xlate import XLATOR
 from ..app.ldap import LdapAppError
 from ..app.ldap import BaseLdapApplication
 
-__version__ = '0.3.2'
+__version__ = '0.4.0'
 LOG = logging.getLogger(__name__)
 
 _ = XLATOR.gettext
@@ -40,6 +44,7 @@ class RemoveLdapUserApplication(BaseLdapApplication):
 
     default_nologin_shell = "/usr/sbin/nologin"
     value_inactive = 'inactive'
+    raw_empty_passwd = 'none'
 
     # -------------------------------------------------------------------------
     def __init__(self, appname=None, base_dir=None):
@@ -50,11 +55,15 @@ class RemoveLdapUserApplication(BaseLdapApplication):
 
         self.dns = {}
         self.wrong_users = False
+        self.empty_passwd = '{SSHA256}' + crypt.crypt(
+            self.raw_empty_passwd, crypt.mksalt(crypt.METHOD_SHA256))
 
         self.given_users = []
         self.nologin_shell = self.default_nologin_shell
         self._deactivate = False
 
+        self.shadow_expire = int(time.time() / 3600 / 24) - 100
+
         desc = _(
             "Disables or removes the given users from LDAP. "
             "If disabling, then the user will not be really removed, but disabled "
@@ -142,6 +151,8 @@ class RemoveLdapUserApplication(BaseLdapApplication):
             LOG.error(_("No users to remove given."))
             self.exit(1)
 
+        self.deactivate = getattr(self.args, 'deactivate', False)
+
         self.given_users = given_users
 
     # -------------------------------------------------------------------------
@@ -161,6 +172,9 @@ class RemoveLdapUserApplication(BaseLdapApplication):
         msg = _("Evaluated DNs to remove:")
         LOG.debug(msg + '\n' + pp(self.dns))
 
+        for inst in self.dns:
+            self.remove_users_from_inst(inst)
+
     # -------------------------------------------------------------------------
     def eval_user_dns(self, user):
 
@@ -201,6 +215,67 @@ class RemoveLdapUserApplication(BaseLdapApplication):
                     user=usr, inst=connect_info.url)
                 LOG.info(msg)
 
+    # -------------------------------------------------------------------------
+    def remove_users_from_inst(self, inst):
+
+        connect_info = self.cfg.ldap_connection[inst]
+        if self.deactivate:
+            msg = _("Deactivating all given users from {} ...").format(connect_info.url)
+        else:
+            msg = _("Removing all given users from {} ...").format(connect_info.url)
+        LOG.info(msg)
+
+        for dn in self.dns[inst]:
+            self.remove_user(inst, dn)
+
+    # -------------------------------------------------------------------------
+    def remove_user(self, inst, dn):
+
+        connect_info = self.cfg.ldap_connection[inst]
+
+        if self.deactivate:
+            msg = _("Deactivating user {dn!r} from {inst} ...").format(
+                dn=dn, inst=connect_info.url)
+        else:
+            msg = _("Removing user {dn!r} from {inst} ...").format(dn=dn, inst=connect_info.url)
+        LOG.info(msg)
+
+        entry = self.get_entry(dn, inst)
+        attributes = self.normalized_attributes(entry)
+        if self.verbose > 1:
+            msg = _("Attributes of {!r}:").format(dn)
+            LOG.debug(msg + '\n' + pp(attributes.as_dict()))
+
+        self.setting_user_status(inst, dn, attributes)
+
+    # -------------------------------------------------------------------------
+    def setting_user_status(self, inst, dn, attributes):
+
+        connect_info = self.cfg.ldap_connection[inst]
+        changes = {}
+
+        is_mail_user = False
+        if 'inetMailUser' in attributes['objectClass']:
+            is_mail_user = True
+            LOG.debug(_("User {!r} is a mail user.").format(dn))
+        else:
+            LOG.debug(_("User {!r} is not a mail user.").format(dn))
+
+        changes['inetUserStatus'] = [(MODIFY_REPLACE, 'inactive')]
+        if is_mail_user:
+            changes['mailUserStatus'] = [(MODIFY_REPLACE, 'inactive')]
+
+        if 'userPassword' in attributes:
+            old_pwd_hash = attributes['userPassword'][0]
+            changes['carLicense'] = [(MODIFY_ADD, old_pwd_hash)]
+        changes['userPassword'] = [(MODIFY_REPLACE, self.empty_passwd)]
+        changes['shadowExpire'] = [(MODIFY_REPLACE, self.shadow_expire)]
+        changes['loginShell'] = [(MODIFY_REPLACE, self.nologin_shell)]
+
+        LOG.info(_("Updating user info for {dn!r} on from {inst} ...").format(
+            dn=dn, inst=connect_info.url))
+        self.modify_entry(inst, dn, changes)
+
 
 # =============================================================================
 if __name__ == "__main__":