# rom ..config.ldap import DEFAULT_PORT_LDAP, DEFAULT_PORT_LDAPS
from ..config.ldap import DEFAULT_TIMEOUT
-__version__ = '0.4.4'
+__version__ = '0.4.5'
LOG = logging.getLogger(__name__)
_ = XLATOR.gettext
return result
+ # -------------------------------------------------------------------------
+ def get_posix_group_memberships(self, inst, uid, base_dn=None):
+
+ connect_info = self.cfg.ldap_connection[inst]
+ ldap = self.ldap_connection[inst]
+
+ if not base_dn:
+ base_dn = connect_info.base_dn
+
+ result = []
+ attributes = ['dn']
+
+ ldap_filter = '(memberUid={})'.format(uid)
+
+ if self.verbose > 1:
+ msg = _("Searching in {uri}/{bdn} with filter: {fltr}").format(
+ uri=connect_info.url, bdn=base_dn, fltr=ldap_filter)
+ LOG.debug(msg)
+
+ req_status, req_result, req_response, req_whatever = ldap.search(
+ search_base=base_dn, search_scope=SUBTREE, search_filter=ldap_filter,
+ get_operational_attributes=False, attributes=attributes,
+ time_limit=self.cfg.ldap_timeout)
+
+ if req_status:
+ for entry in req_response:
+ if self.verbose > 4:
+ LOG.debug(_("Got a response entry:") + ' ' + pp(entry))
+ result.append(entry['dn'])
+ if self.verbose > 3:
+ LOG.debug(_("Result:") + ' ' + pp(result))
+
+ return result
+
+ # -------------------------------------------------------------------------
+ def get_sudo_group_memberships(self, inst, uid, base_dn=None):
+
+ connect_info = self.cfg.ldap_connection[inst]
+ ldap = self.ldap_connection[inst]
+
+ if not base_dn:
+ base_dn = connect_info.base_dn
+
+ result = []
+ attributes = ['dn']
+
+ ldap_filter = '(sudoUser={})'.format(uid)
+
+ if self.verbose > 1:
+ msg = _("Searching in {uri}/{bdn} with filter: {fltr}").format(
+ uri=connect_info.url, bdn=base_dn, fltr=ldap_filter)
+ LOG.debug(msg)
+
+ req_status, req_result, req_response, req_whatever = ldap.search(
+ search_base=base_dn, search_scope=SUBTREE, search_filter=ldap_filter,
+ get_operational_attributes=False, attributes=attributes,
+ time_limit=self.cfg.ldap_timeout)
+
+ if req_status:
+ for entry in req_response:
+ if self.verbose > 4:
+ LOG.debug(_("Got a response entry:") + ' ' + pp(entry))
+ result.append(entry['dn'])
+ if self.verbose > 3:
+ LOG.debug(_("Result:") + ' ' + pp(result))
+
+ return result
+
# =============================================================================
if __name__ == "__main__":
from .ldap import LdapAppError
from .ldap import BaseLdapApplication
-__version__ = '0.4.2'
+__version__ = '0.4.3'
LOG = logging.getLogger(__name__)
_ = XLATOR.gettext
self.remove_all_memberships(inst, dn)
self.remove_all_unique_memberships(inst, dn)
+ if 'uid' in attributes:
+ for uid in attributes['uid']:
+ self.remove_all_posixgroup_memberships(inst, uid)
+ self.remove_all_sudogroup_memberships(inst, uid)
+
# -------------------------------------------------------------------------
def setting_user_status(self, inst, dn, attributes):
changes = {'uniqueMember': [(MODIFY_DELETE, dn)], }
self.modify_entry(inst, group_dn, changes)
+ # -------------------------------------------------------------------------
+ def remove_all_posixgroup_memberships(self, inst, uid):
+
+ connect_info = self.cfg.ldap_connection[inst]
+
+ msg = _("Deleting user {uid!r} from all POSIX groups in {inst}.").format(
+ uid=uid, inst=connect_info.url)
+ LOG.debug(msg)
+
+ group_dns = self.get_posix_group_memberships(inst, uid)
+
+ if not group_dns:
+ msg = _("Did not found any POSIX group memberships of {uid!r} in {inst}.".format(
+ uid=uid, inst=connect_info.url))
+ LOG.debug(msg)
+ return True
+
+ for group_dn in group_dns:
+ LOG.info(_("Removing user {u!r} from group {g!r} ...").format(u=uid, g=group_dn))
+ changes = {'memberUid': [(MODIFY_DELETE, uid)], }
+ self.modify_entry(inst, group_dn, changes)
+
+ # -------------------------------------------------------------------------
+ def remove_all_sudogroup_memberships(self, inst, uid):
+
+ connect_info = self.cfg.ldap_connection[inst]
+
+ msg = _("Deleting user {uid!r} from all sudo groups in {inst}.").format(
+ uid=uid, inst=connect_info.url)
+ LOG.debug(msg)
+
+ group_dns = self.get_sudo_group_memberships(inst, uid)
+
+ if not group_dns:
+ msg = _("Did not found any sudo group memberships of {uid!r} in {inst}.".format(
+ uid=uid, inst=connect_info.url))
+ LOG.debug(msg)
+ return True
+
+ for group_dn in group_dns:
+ LOG.info(_("Removing user {u!r} from group {g!r} ...").format(u=uid, g=group_dn))
+ changes = {'sudoUser': [(MODIFY_DELETE, uid)], }
+ self.modify_entry(inst, group_dn, changes)
+
# =============================================================================
if __name__ == "__main__":