# Standard modules
import logging
import argparse
+import signal
+import re
# Third party modules
from fb_tools.common import to_bool
from fb_tools.cfg_app import FbConfigApplication
-from fb_tools.errors import FbAppError
+from fb_tools.errors import FbAppError, IoTimeoutError
from fb_tools.multi_config import BaseMultiConfig
# Own modules
+# =============================================================================
+class AbortAppError(DPXAppError):
+ """Special exception class interrupting the application."""
+ pass
+# =============================================================================
+class TimeoutOnPromptError(AbortAppError, IoTimeoutError):
+ """Special exception class on timout on a prompt."""
+ # -------------------------------------------------------------------------
+ def __init__(self, timeout):
+ strerror = _("Timeout on answering on the console.")
+ super(TimeoutOnPromptError, self).__init__(strerror, timeout)
# =============================================================================
class TimeoutOptionAction(argparse.Action):
default_prompt_timeout = 10
max_prompt_timeout = 600
+ re_yes_no = re.compile(r'^\s*(y|yes|n|no)?\s*$', re.IGNORECASE)
# -------------------------------------------------------------------------
def __init__(
self.yes = self.args.yes
+ # -------------------------------------------------------------------------
+ def ask_for_yes_or_no(self, prompt, default_on_empty=None):
+ """
+ Ask the user for yes or no.
+ @raise PjdInterruptError: if the user presses Ctrl-D (EOF)
+ @param prompt: the prompt for the question
+ @type prompt: str
+ @param default_on_empty: behaviour on an empty reply:
+ * if None, repeat the question
+ * if True, return True
+ * else return False
+ @type default_on_empty: bool or None
+ @return: True, if the user answered Yes, else False
+ @rtype: bool
+ """
+ if not prompt:
+ prompt = _('Yes/No') + ' '
+ def prompt_alarm_caller(signum, sigframe):
+ raise TimeoutOnPromptError(self.prompt_timeout)
+ try:
+ signal.signal(signal.SIGALRM, prompt_alarm_caller)
+ signal.alarm(self.prompt_timeout)
+ reply = ''
+ msg_intr = _("Interrupted on demand.")
+ while True:
+ try:
+ reply = input(prompt)
+ except EOFError:
+ raise AbortAppError(msg_intr)
+ match = self.re_yes_no.match(reply)
+ if match:
+ if match.group(1) is None:
+ if default_on_empty is None:
+ continue
+ return bool(default_on_empty)
+ # There is an answer
+ r = match.group(1).lower()
+ if r == 'n' or r == 'no':
+ # Continue == no
+ return False
+ elif r == 'y' or r == 'yes':
+ # Continue == yes
+ return True
+ else:
+ continue
+ else:
+ continue
+ # Repeat the question
+ except (TimeoutOnPromptError, AbortAppError) as e:
+ msg = _("Got a {}:").format(e.__class__.__name__) + ' ' + str(e)
+ LOG.error(msg)
+ self.exit(10)
+ finally:
+ signal.alarm(0)
# vim: ts=4 et list
from fb_tools.common import pp, is_sequence
from fb_tools.mailaddress import MailAddress
-from fb_tools.collections import FrozenCIStringSet, CIStringSet
+from fb_tools.collections import FrozenCIStringSet, CIStringSet, CIDict
# Own modules
from .. import __version__ as GLOBAL_VERSION
return result
+ # -------------------------------------------------------------------------
+ def normalized_attributes(self, entry):
+ attribs = CIDict()
+ for attrib in entry['attributes']:
+ values = entry['attributes'][attrib]
+ if attrib.lower() == 'objectclass':
+ if 'objectClass' not in attribs:
+ attribs['objectClass'] = CIStringSet()
+ if is_sequence(values):
+ for val in values:
+ attribs['objectClass'].add(val)
+ else:
+ attribs['objectClass'].add(values)
+ else:
+ if attrib not in attribs:
+ attribs[attrib] = []
+ if is_sequence(values):
+ for val in values:
+ attribs[attrib].append(val)
+ else:
+ attribs[attrib].append(values)
+ return attribs
+ # -------------------------------------------------------------------------
+ def modify_entry(self, inst, dn, changes):
+ # connect_info = self.cfg.ldap_connection[inst]
+ if self.verbose > 1:
+ LOG.debug(_("Applying changes:") + '\n' + pp(changes))
# =============================================================================
if __name__ == "__main__":
# Standard modules
import logging
+import time
+import crypt
# Third party modules
+from ldap3 import MODIFY_REPLACE, MODIFY_ADD
# Own modules
from fb_tools.common import to_bool, is_sequence, pp
from ..app.ldap import LdapAppError
from ..app.ldap import BaseLdapApplication
-__version__ = '0.3.2'
+__version__ = '0.4.0'
LOG = logging.getLogger(__name__)
_ = XLATOR.gettext
default_nologin_shell = "/usr/sbin/nologin"
value_inactive = 'inactive'
+ raw_empty_passwd = 'none'
# -------------------------------------------------------------------------
def __init__(self, appname=None, base_dir=None):
self.dns = {}
self.wrong_users = False
+ self.empty_passwd = '{SSHA256}' + crypt.crypt(
+ self.raw_empty_passwd, crypt.mksalt(crypt.METHOD_SHA256))
self.given_users = []
self.nologin_shell = self.default_nologin_shell
self._deactivate = False
+ self.shadow_expire = int(time.time() / 3600 / 24) - 100
desc = _(
"Disables or removes the given users from LDAP. "
"If disabling, then the user will not be really removed, but disabled "
LOG.error(_("No users to remove given."))
+ self.deactivate = getattr(self.args, 'deactivate', False)
self.given_users = given_users
# -------------------------------------------------------------------------
msg = _("Evaluated DNs to remove:")
LOG.debug(msg + '\n' + pp(self.dns))
+ for inst in self.dns:
+ self.remove_users_from_inst(inst)
# -------------------------------------------------------------------------
def eval_user_dns(self, user):
user=usr, inst=connect_info.url)
+ # -------------------------------------------------------------------------
+ def remove_users_from_inst(self, inst):
+ connect_info = self.cfg.ldap_connection[inst]
+ if self.deactivate:
+ msg = _("Deactivating all given users from {} ...").format(connect_info.url)
+ else:
+ msg = _("Removing all given users from {} ...").format(connect_info.url)
+ LOG.info(msg)
+ for dn in self.dns[inst]:
+ self.remove_user(inst, dn)
+ # -------------------------------------------------------------------------
+ def remove_user(self, inst, dn):
+ connect_info = self.cfg.ldap_connection[inst]
+ if self.deactivate:
+ msg = _("Deactivating user {dn!r} from {inst} ...").format(
+ dn=dn, inst=connect_info.url)
+ else:
+ msg = _("Removing user {dn!r} from {inst} ...").format(dn=dn, inst=connect_info.url)
+ LOG.info(msg)
+ entry = self.get_entry(dn, inst)
+ attributes = self.normalized_attributes(entry)
+ if self.verbose > 1:
+ msg = _("Attributes of {!r}:").format(dn)
+ LOG.debug(msg + '\n' + pp(attributes.as_dict()))
+ self.setting_user_status(inst, dn, attributes)
+ # -------------------------------------------------------------------------
+ def setting_user_status(self, inst, dn, attributes):
+ connect_info = self.cfg.ldap_connection[inst]
+ changes = {}
+ is_mail_user = False
+ if 'inetMailUser' in attributes['objectClass']:
+ is_mail_user = True
+ LOG.debug(_("User {!r} is a mail user.").format(dn))
+ else:
+ LOG.debug(_("User {!r} is not a mail user.").format(dn))
+ changes['inetUserStatus'] = [(MODIFY_REPLACE, 'inactive')]
+ if is_mail_user:
+ changes['mailUserStatus'] = [(MODIFY_REPLACE, 'inactive')]
+ if 'userPassword' in attributes:
+ old_pwd_hash = attributes['userPassword'][0]
+ changes['carLicense'] = [(MODIFY_ADD, old_pwd_hash)]
+ changes['userPassword'] = [(MODIFY_REPLACE, self.empty_passwd)]
+ changes['shadowExpire'] = [(MODIFY_REPLACE, self.shadow_expire)]
+ changes['loginShell'] = [(MODIFY_REPLACE, self.nologin_shell)]
+ LOG.info(_("Updating user info for {dn!r} on from {inst} ...").format(
+ dn=dn, inst=connect_info.url))
+ self.modify_entry(inst, dn, changes)
# =============================================================================
if __name__ == "__main__":