# from ldap3 import ALL_ATTRIBUTES, ALL_OPERATIONAL_ATTRIBUTES
# from ldap3 import MODIFY_ADD, MODIFY_DELETE, MODIFY_REPLACE
# from ldap3.core.exceptions import LDAPInvalidDnError, LDAPInvalidValueError
+from ldap3.core.exceptions import LDAPException
# from ldap3.core.exceptions import LDAPException, LDAPBindError
from fb_tools.common import pp, is_sequence
# rom ..config.ldap import DEFAULT_PORT_LDAP, DEFAULT_PORT_LDAPS
from ..config.ldap import DEFAULT_TIMEOUT
-__version__ = '0.4.2'
+__version__ = '0.4.4'
LOG = logging.getLogger(__name__)
_ = XLATOR.gettext
pass
+# =============================================================================
+class FatalLDAPError(LdapAppError):
+ """Fatal errors leading to interrupt the current application."""
+ pass
+
+
+# =============================================================================
+class WriteLDAPItemError(FatalLDAPError):
+ """Error class in case, a LDAP item could not be written."""
+ pass
+
+
# =============================================================================
class PasswordFileOptionAction(argparse.Action):
# -------------------------------------------------------------------------
def modify_entry(self, inst, dn, changes):
+ connect_info = self.cfg.ldap_connection[inst]
+ ldap = self.ldap_connection[inst]
+
# connect_info = self.cfg.ldap_connection[inst]
if self.verbose > 1:
- LOG.debug(_("Applying changes:") + '\n' + pp(changes))
+ msg = _("Applying changes on {uri} to DN {dn!r}:").format(
+ uri=connect_info.url, dn=dn)
+ LOG.debug(msg + '\n' + pp(changes))
+
+ if self.simulate:
+ LOG.info(_("Simulation mode - changes are not applied."))
+ return True
+
+ try:
+ req_status, req_result, req_response, req_whatever = ldap.modify(dn, changes)
+ except LDAPException as e:
+ msg = _("Modification NOT successfull - {c}: {e}").format(c=e.__class__.__name__, e=e)
+ msg += '\n' + _("Changes:") + '\n' + pp(changes)
+ raise WriteLDAPItemError(msg)
+ LOG.debug(_('Modification successful.'))
+ if self.verbose > 2:
+ LOG.debug(_("Result of modifying:") + '\n' + pp(req_result))
+
+ return True
+
+ # -------------------------------------------------------------------------
+ def get_group_memberships(self, inst, dn, base_dn=None):
+
+ connect_info = self.cfg.ldap_connection[inst]
+ ldap = self.ldap_connection[inst]
+
+ if not base_dn:
+ base_dn = connect_info.base_dn
+
+ result = []
+ attributes = ['dn']
+
+ ldap_filter = '(member={})'.format(dn)
+
+ if self.verbose > 1:
+ msg = _("Searching in {uri}/{bdn} with filter: {fltr}").format(
+ uri=connect_info.url, bdn=base_dn, fltr=ldap_filter)
+ LOG.debug(msg)
+
+ req_status, req_result, req_response, req_whatever = ldap.search(
+ search_base=base_dn, search_scope=SUBTREE, search_filter=ldap_filter,
+ get_operational_attributes=False, attributes=attributes,
+ time_limit=self.cfg.ldap_timeout)
+
+ if req_status:
+ for entry in req_response:
+ if self.verbose > 4:
+ LOG.debug(_("Got a response entry:") + ' ' + pp(entry))
+ result.append(entry['dn'])
+ if self.verbose > 3:
+ LOG.debug(_("Result:") + ' ' + pp(result))
+
+ return result
+
+ # -------------------------------------------------------------------------
+ def get_unique_group_memberships(self, inst, dn, base_dn=None):
+
+ connect_info = self.cfg.ldap_connection[inst]
+ ldap = self.ldap_connection[inst]
+
+ if not base_dn:
+ base_dn = connect_info.base_dn
+
+ result = []
+ attributes = ['dn']
+
+ ldap_filter = '(uniqueMember={})'.format(dn)
+
+ if self.verbose > 1:
+ msg = _("Searching in {uri}/{bdn} with filter: {fltr}").format(
+ uri=connect_info.url, bdn=base_dn, fltr=ldap_filter)
+ LOG.debug(msg)
+
+ req_status, req_result, req_response, req_whatever = ldap.search(
+ search_base=base_dn, search_scope=SUBTREE, search_filter=ldap_filter,
+ get_operational_attributes=False, attributes=attributes,
+ time_limit=self.cfg.ldap_timeout)
+
+ if req_status:
+ for entry in req_response:
+ if self.verbose > 4:
+ LOG.debug(_("Got a response entry:") + ' ' + pp(entry))
+ result.append(entry['dn'])
+ if self.verbose > 3:
+ LOG.debug(_("Result:") + ' ' + pp(result))
+
+ return result
# =============================================================================
import crypt
# Third party modules
-# from ldap3 import MODIFY_ADD, MODIFY_DELETE, MODIFY_REPLACE
-from ldap3 import MODIFY_REPLACE, MODIFY_ADD
+from ldap3 import MODIFY_REPLACE, MODIFY_ADD, MODIFY_DELETE
# Own modules
from fb_tools.common import to_bool, is_sequence, pp
from .ldap import LdapAppError
from .ldap import BaseLdapApplication
-__version__ = '0.4.1'
+__version__ = '0.4.2'
LOG = logging.getLogger(__name__)
_ = XLATOR.gettext
LOG.debug(msg + '\n' + pp(attributes.as_dict()))
self.setting_user_status(inst, dn, attributes)
+ self.remove_all_memberships(inst, dn)
+ self.remove_all_unique_memberships(inst, dn)
# -------------------------------------------------------------------------
def setting_user_status(self, inst, dn, attributes):
if 'userPassword' in attributes:
old_pwd_hash = attributes['userPassword'][0]
changes['carLicense'] = [(MODIFY_ADD, old_pwd_hash)]
- changes['userPassword'] = [(MODIFY_REPLACE, self.empty_passwd)]
+ changes['userPassword'] = [(MODIFY_REPLACE, self.raw_empty_passwd)]
changes['shadowExpire'] = [(MODIFY_REPLACE, self.shadow_expire)]
changes['loginShell'] = [(MODIFY_REPLACE, self.nologin_shell)]
dn=dn, inst=connect_info.url))
self.modify_entry(inst, dn, changes)
+ # -------------------------------------------------------------------------
+ def remove_all_memberships(self, inst, dn):
+
+ connect_info = self.cfg.ldap_connection[inst]
+
+ group_dns = self.get_group_memberships(inst, dn)
+
+ if not group_dns:
+ msg = _("Did not found any group memberships of {dn!r} in {inst}.".format(
+ dn=dn, inst=connect_info.url))
+ LOG.debug(msg)
+ return True
+
+ for group_dn in group_dns:
+ LOG.info(_("Removing user {u!r} from group {g!r} ...").format(u=dn, g=group_dn))
+ changes = {'member': [(MODIFY_DELETE, dn)], }
+ self.modify_entry(inst, group_dn, changes)
+
+ # -------------------------------------------------------------------------
+ def remove_all_unique_memberships(self, inst, dn):
+
+ connect_info = self.cfg.ldap_connection[inst]
+
+ group_dns = self.get_unique_group_memberships(inst, dn)
+
+ if not group_dns:
+ msg = _("Did not found any unique group memberships of {dn!r} in {inst}.".format(
+ dn=dn, inst=connect_info.url))
+ LOG.debug(msg)
+ return True
+
+ for group_dn in group_dns:
+ LOG.info(_("Removing user {u!r} from group {g!r} ...").format(u=dn, g=group_dn))
+ changes = {'uniqueMember': [(MODIFY_DELETE, dn)], }
+ self.modify_entry(inst, group_dn, changes)
+
# =============================================================================
if __name__ == "__main__":