import argparse
import signal
import re
+import getpass
# Third party modules
from fb_tools.common import to_bool
_ = XLATOR.gettext
ngettext = XLATOR.ngettext
-__version__ = '0.2.1'
+__version__ = '0.3.1'
# =============================================================================
self.yes = self.args.yes
+ # -------------------------------------------------------------------------
+ def get_password(self, first_prompt=None, second_prompt=None, may_empty=True, repeat=True):
+ """
+ Ask the user for a password on the console.
+
+ @raise AbortAppError: if the user presses Ctrl-D (EOF)
+ @raise TimeoutOnPromptError: if the user does not finishing after a time
+
+ @param first_prompt: the prompt for the first password question
+ @type first_prompt: str
+ @param second_prompt: the prompt for the second password question
+ @type second_prompt: str
+ @param may_empty: The behaviour, if the user inputs an empty password:
+ if True, an empty password will be returned
+ if False, the question will be repeated.
+ @type may_empty: bool
+ @param repeat: Asking for the password a second time, which must be equal
+ to the first given password.
+ @type repeat: bool
+
+ @return: The entered password
+ @rtype: str
+
+ """
+
+ if not first_prompt:
+ first_prompt = _('Password:') + ' '
+
+ if not second_prompt:
+ second_prompt = _('Repeat password:') + ' '
+
+ ret_passwd = None
+ second_passwd = None
+
+ while True:
+
+ ret_passwd = self._get_password(first_prompt, may_empty)
+ if ret_passwd:
+ if repeat:
+ second_passwd = self._get_password(second_prompt, may_empty=False)
+ if ret_passwd != second_passwd:
+ msg = _("The entered passwords does not match.")
+ LOG.error(msg)
+ continue
+ break
+
+ return ret_passwd
+
+ # -------------------------------------------------------------------------
+ def _get_password(self, prompt, may_empty=True):
+
+ def passwd_alarm_caller(signum, sigframe):
+ raise TimeoutOnPromptError(self.prompt_timeout)
+
+ msg_intr = _("Interrupted on demand.")
+ ret_passwd = ''
+
+ try:
+ signal.signal(signal.SIGALRM, passwd_alarm_caller)
+ signal.alarm(self.prompt_timeout)
+
+ while True:
+
+ try:
+ ret_passwd = getpass.getpass(prompt)
+ except EOFError:
+ raise AbortAppError(msg_intr)
+
+ signal.alarm(self.prompt_timeout)
+
+ if ret_passwd == '':
+ if may_empty:
+ return ''
+ else:
+ continue
+ else:
+ break
+
+ except (TimeoutOnPromptError, AbortAppError) as e:
+ msg = _("Got a {}:").format(e.__class__.__name__) + ' ' + str(e)
+ LOG.error(msg)
+ self.exit(10)
+
+ except KeyboardInterrupt:
+ msg = _("Got a {}:").format('KeyboardInterrupt') + ' ' + msg_intr
+ LOG.error(msg)
+ self.exit(10)
+
+ finally:
+ signal.alarm(0)
+
+ return ret_passwd
+
# -------------------------------------------------------------------------
def ask_for_yes_or_no(self, prompt, default_on_empty=None):
"""
Ask the user for yes or no.
- @raise PjdInterruptError: if the user presses Ctrl-D (EOF)
+ @raise AbortAppError: if the user presses Ctrl-D (EOF)
+ @raise TimeoutOnPromptError: if the user does not correct answer after a time
@param prompt: the prompt for the question
@type prompt: str
# rom ..config.ldap import DEFAULT_PORT_LDAP, DEFAULT_PORT_LDAPS
from ..config.ldap import DEFAULT_TIMEOUT
-__version__ = '0.6.1'
+__version__ = '0.6.2'
LOG = logging.getLogger(__name__)
_ = XLATOR.gettext
class PasswordFileOptionAction(argparse.Action):
# -------------------------------------------------------------------------
- def __init__(self, option_strings, must_exists=True, *args, **kwargs):
+ def __init__(self, option_strings, must_exists=True, must_absolute=True, *args, **kwargs):
self.must_exists = bool(must_exists)
+ self.must_absolute = bool(must_absolute)
super(PasswordFileOptionAction, self).__init__(
option_strings=option_strings, *args, **kwargs)
def __call__(self, parser, namespace, given_path, option_string=None):
path = Path(given_path)
- if not path.is_absolute():
- msg = _("The path {!r} must be an absolute path.").format(given_path)
- raise argparse.ArgumentError(self, msg)
+ if must_absolute:
+ if not path.is_absolute():
+ msg = _("The path {!r} must be an absolute path.").format(given_path)
+ raise argparse.ArgumentError(self, msg)
if self.must_exists:
if insts is None:
insts = []
+ elif not self.use_multiple_ldap_connections:
+ insts = [insts]
- if self.use_multiple_ldap_connections and len(insts) == 1 and insts[0].lower() == 'list':
+ if len(insts) == 1 and insts[0].lower() == 'list':
self._show_ldap_instances()
self.exit(0)
return
instances.insert(0, 'default')
max_key_len = 1
+ max_tier_len = 1
+ max_url_len = 1
+ max_bind_dn_len = 1
for inst in instances:
if len(inst) > max_key_len:
max_key_len = len(inst)
+ cfg = self.cfg.ldap_connection[inst]
+ if len(cfg.tier) > max_tier_len:
+ max_tier_len = len(cfg.tier)
+ url = '{url}/{base}'.format(url=cfg.url, base=cfg.base_dn)
+ if len(url) > max_url_len:
+ max_url_len = len(url)
+ if len(cfg.bind_dn) > max_bind_dn_len:
+ max_bind_dn_len = len(cfg.bind_dn)
+
max_key_len += 1
title = _("Configured LDAP instances:")
print(title)
print('-' * len(title))
print()
+ tpl = "{inst:<{width}} {url:<{url_l}} {bind_dn:<{bind_dn_l}} {tier}"
for inst in instances:
cfg = self.cfg.ldap_connection[inst]
- print("{inst:<{width}} {url}/{base}".format(
- inst=(inst + ':'), width=max_key_len, url=cfg.url, base=cfg.base_dn))
+ url = '{url}/{base}'.format(url=cfg.url, base=cfg.base_dn)
+ print(tpl.format(
+ inst=(inst + ':'), width=max_key_len, url=url, bind_dn=cfg.bind_dn,
+ bind_dn_l=max_bind_dn_len, url_l=max_url_len, tier=cfg.tier))
print()
# -------------------------------------------------------------------------
return result
+ # -------------------------------------------------------------------------
+ def read_password_file(self, pw_file):
+ """Reading a password from the given password file. It returns the first
+ stripped non empty line."""
+
+ if self.verbose > 1:
+ abs_path = pw_file.resolve()
+ LOG.debug(_("Reading password file {!r} ...").format(str(abs_path)))
+
+ if not pw_file.exists():
+ msg = _("The file {!r} does not exists.").format(str(pw_file))
+ LOG.warn(msg)
+ return None
+
+ if not pw_file.is_file():
+ msg = _("The given path {!r} exists, but is not a regular file.").format(str(pw_file))
+ LOG.warn(msg)
+ return None
+
+ if not os.access(str(pw_file), os.R_OK):
+ msg = _("The given file {!r} is not readable.").format(str(pw_file))
+ LOG.warn(msg)
+ return None
+
+ content = pw_file.read_text(encoding='utf-8')
+ for line in content.splitlines():
+ if line.strip() != '':
+ return line.strip()
+ return None
# =============================================================================
if __name__ == "__main__":
# Standard modules
import logging
-import time
-import crypt
+import os
+import pwd
+import getpass
# Third party modules
from ldap3 import MODIFY_REPLACE, MODIFY_ADD, MODIFY_DELETE
+import passlib.apps
+
# Own modules
from fb_tools.common import to_bool, is_sequence, pp
from .ldap import LdapAppError, FatalLDAPError
from .ldap import BaseLdapApplication
+from .ldap import PasswordFileOptionAction
-__version__ = '0.1.0'
+__version__ = '0.2.0'
LOG = logging.getLogger(__name__)
_ = XLATOR.gettext
class SetLdapPasswordApplication(BaseLdapApplication):
"""Application class for setting a LDAP password."""
+ current_userid = os.getuid()
+ current_user = None
+ try:
+ current_user = getpass.getuser()
+ except KeyError:
+ pass
+
+ ldap_context = passlib.apps.ldap_context
+ available_schemes = list(ldap_context.schemes())
+ available_schemes.append('ldap_pbkdf2_sha1')
+ available_schemes.append('ldap_pbkdf2_sha256')
+ available_schemes.append('ldap_pbkdf2_sha512')
+
+ passlib_context = passlib.context.CryptContext(schemes=available_schemes)
+ default_schema = 'ldap_salted_sha256'
+ passlib_context.update(default=default_schema)
+
+ # -------------------------------------------------------------------------
+ def __init__(self, appname=None, base_dir=None):
+
+ self.use_default_ldap_connection = False
+ self.use_multiple_ldap_connections = False
+ self.show_cmdline_ldap_timeout = True
+
+ self.current_password = None
+ self.need_current_password = False
+ self.do_user_bind = False
+ self.ask_for_password = False
+ self.new_password = None
+ self.user_uid = None
+ self.user_dn = None
+ self.schema = None
+
+ my_appname = self.get_generic_appname(appname)
+
+ desc = _(
+ "Changing the password of the given user. If no user was given, then "
+ "{app} tries to use the name of the user logged in on the controlling terminal."
+ ).format(app=my_appname)
+
+ super(SetLdapPasswordApplication, self).__init__(
+ appname=appname, description=desc, base_dir=base_dir, initialized=False)
+
+ self.initialized = True
+
+ # -------------------------------------------------------------------------
+ def as_dict(self, short=True):
+ """
+ Transforms the elements of the object into a dict
+
+ @param short: don't include local properties in resulting dict.
+ @type short: bool
+
+ @return: structure as dict
+ @rtype: dict
+ """
+
+ res = super(SetLdapPasswordApplication, self).as_dict(short=short)
+
+ res['available_schemes'] = self.available_schemes
+ res['default_schema'] = self.passlib_context.default_scheme()
+ if self.current_password and self.verbose < 5:
+ res['current_password'] = '******'
+
+ return res
+
+ # -------------------------------------------------------------------------
+ def init_arg_parser(self):
+
+ super(SetLdapPasswordApplication, self).init_arg_parser()
+
+ app_group = self.arg_parser.add_argument_group(_("Options for {}").format(
+ self.appname))
+
+ pw_group = app_group.add_mutually_exclusive_group()
+
+ pw_group.add_argument(
+ '-w', '--password', metavar=_("PASSWORD"), dest="current_pw",
+ help=_("Use PASSWORD as the current user password."),
+ )
+
+ pw_group.add_argument(
+ '-W', '--password-prompt', action="store_true", dest="current_pw_prompt",
+ help=_(
+ "Prompt for current user password. This is used instead of "
+ "specifying the password on the command line."),
+ )
+
+ pw_group.add_argument(
+ '-y', '--password-file', metavar=_('PASSWORD_FILE'), dest="current_pw_file",
+ must_absolute=False, action=PasswordFileOptionAction,
+ help=_("Use contents of PASSWORD_FILE as the current user password."),
+ )
+
+ user_help = _(
+ "The user, which password in the given LDAP instance should be changed. "
+ "It may be given by its Uid (the alphanumeric POSIX name), its mail address "
+ "or its LDAP DN.")
+ if self.current_user:
+ user_help += ' ' + _(
+ "If not given, then your current user name {!r} will be used.").format(
+ self.current_user)
+ user_help += ' ' + _(
+ "If you are using a readonly LDAP instance or an instance w/o admin access, "
+ "then you must provide somehow the current password of the user to change.")
+
+ if self.current_user:
+ app_group.add_argument(
+ 'user', metavar=_('USER'), nargs='?', help=user_help)
+ else:
+ app_group.add_argument(
+ 'user', metavar=_('USER'), help=user_help)
+
+ # -------------------------------------------------------------------------
+ def post_init(self):
+ """
+ Method to execute before calling run().
+ """
+
+ super(SetLdapPasswordApplication, self).post_init()
+
+ given_user = getattr(self.args, 'user', None)
+ if given_user:
+ self.user_uid = given_user
+ else:
+ if self.current_user:
+ given_user = self.current_user
+ self.do_user_bind = True
+ self.user_uid = given_user
+ else:
+ msg = _("Could not detect your current login name.")
+ LOG.error(msg)
+ self.exit(1)
+
+ if given_user == 'root':
+ msg = _("The user {!r} will never be managed by LDAP.").format('root')
+ LOG.error(msg)
+ self.exit(1)
+
+ if self.args.current_pw:
+ self.current_password = self.args.current_pw
+ self.do_user_bind = True
+ elif self.args.current_pw_prompt:
+ self.do_user_bind = True
+ elif self.args.current_pw_file:
+ self.current_password = self.read_password_file(self.args.current_pw_file)
+ self.do_user_bind = True
+
+ inst = self.ldap_instances[0]
+ ldap = self.cfg.ldap_connection[inst]
+ if not ldap.is_admin or ldap.readonly:
+ self.do_user_bind = True
+
+ # -------------------------------------------------------------------------
+ def pre_run(self):
+
+ LOG.debug("Pre running tasks ...")
+ super(SetLdapPasswordApplication, self).pre_run()
+
+ if self.do_user_bind and not self.current_password:
+ first_prompt = _("Current password of user {!r}:").format(self.user_uid) + ' '
+ second_prompt = _('Repeat password:') + ' '
+ self.current_password = self.get_password(
+ first_prompt, second_prompt, may_empty=False, repeat=False)
+
+ # -------------------------------------------------------------------------
+ def _run(self):
+
+ inst = self.ldap_instances[0]
+ ldap = self.cfg.ldap_connection[inst]
+ msg = _("Using LDAP instance {inst!r} - {url}.").format(inst=inst, url=ldap.url)
+ LOG.info(msg)
+
# =============================================================================
if __name__ == "__main__":