]> Frank Brehm's Git Trees - pixelpark/pp-admin-tools.git/commitdiff
Extending LDAP search methods
authorFrank Brehm <frank.brehm@pixelpark.com>
Wed, 7 Sep 2022 12:34:13 +0000 (14:34 +0200)
committerFrank Brehm <frank.brehm@pixelpark.com>
Wed, 7 Sep 2022 12:34:13 +0000 (14:34 +0200)
lib/pp_admintools/app/ldap.py

index 6be0bbb2c74b2bcd9355694e4818eb4e74fa75f6..12c3219d627007b5634df99deae9ef87ab655f3b 100644 (file)
@@ -21,17 +21,19 @@ except ImportError:
 # Third party modules
 from ldap3 import Server, Connection, DSA, IP_V4_PREFERRED, SAFE_SYNC
 # from ldap3 import ALL
-from ldap3 import SUBTREE
+from ldap3 import BASE, SUBTREE
 # from ldap3 import BASE, LEVEL, SUBTREE, DEREF_NEVER, DEREF_SEARCH, DEREF_BASE, DEREF_ALWAYS
+from ldap3 import ALL_ATTRIBUTES
 # from ldap3 import ALL_ATTRIBUTES, ALL_OPERATIONAL_ATTRIBUTES
 # from ldap3 import MODIFY_ADD, MODIFY_DELETE, MODIFY_REPLACE
 # from ldap3.core.exceptions import LDAPInvalidDnError, LDAPInvalidValueError
 # from ldap3.core.exceptions import LDAPException, LDAPBindError
 
-from fb_tools.common import pp, to_bool
+from fb_tools.common import pp, to_bool, is_sequence
 from fb_tools.cfg_app import FbConfigApplication
 from fb_tools.errors import FbAppError
 from fb_tools.mailaddress import MailAddress
+from fb_tools.collections import FrozenCIStringSet, CIStringSet
 
 # Own modules
 from .. import __version__ as GLOBAL_VERSION
@@ -47,7 +49,7 @@ from ..config.ldap import LdapConnectionInfo, LdapConfiguration
 # rom ..config.ldap import DEFAULT_PORT_LDAP, DEFAULT_PORT_LDAPS
 from ..config.ldap import DEFAULT_TIMEOUT, MAX_TIMEOUT
 
-__version__ = '0.4.0'
+__version__ = '0.4.1'
 LOG = logging.getLogger(__name__)
 
 _ = XLATOR.gettext
@@ -173,6 +175,9 @@ class BaseLdapApplication(FbConfigApplication):
     pattern_re_uid = r'^[a-z](?:[a-z0-9\-_.]*[a-z0-9])?$'
     re_uid = re.compile(pattern_re_uid, re.IGNORECASE)
 
+    person_object_classes = FrozenCIStringSet([
+        'account', 'inetOrgPerson', 'inetUser', 'posixAccount', 'sambaSamAccount'])
+
     # -------------------------------------------------------------------------
     def __init__(
         self, appname=None, verbose=0, version=GLOBAL_VERSION, base_dir=None,
@@ -619,7 +624,6 @@ class BaseLdapApplication(FbConfigApplication):
     def get_user_dn(self, user, inst):
 
         connect_info = self.cfg.ldap_connection[inst]
-        base_dn = connect_info.base_dn
 
         if self.verbose > 1:
             msg = _("Trying to evaluate DN of user {user!r} in LDAP instance {inst} ...").format(
@@ -629,18 +633,24 @@ class BaseLdapApplication(FbConfigApplication):
         if MailAddress.valid_address(user, verbose=self.verbose):
             msg = _("Trying to evaluate DN of user {u!r} as a mail address ...").format(u=user)
             LOG.debug(msg)
-            return self.get_user_dn_by_mail(user, inst)
+            dns = self.get_user_dn_by_mail(user, inst)
+            if dns:
+                return dns
 
         if self.re_ldap_dn.match(user):
             msg = _("Trying to evaluate DN of user {u!r} as a LDAP DN ...").format(u=user)
             LOG.debug(msg)
-            return [user]
+            dns = self.get_user_dn_by_dn(user, inst)
+            if dns:
+                return dns
 
         if self.re_uid.match(user):
             msg = _("Trying to evaluate DN of user {u!r} as a UID (Posix user name) ...").format(
                 u=user)
             LOG.debug(msg)
-            return self.get_user_dn_by_uid(user, inst)
+            dns = self.get_user_dn_by_uid(user, inst)
+            if dns:
+                return dns
 
         usr = user.strip()
         if usr == '':
@@ -650,7 +660,7 @@ class BaseLdapApplication(FbConfigApplication):
         msg = _("Trying to evaluate DN of user {u!r} as a CN ({c}) ...").format(
             u=usr, c='common name')
         LOG.debug(msg)
-        return 'cn={u},{b}'.format(u=usr, b=base_dn)
+        return self.get_user_dn_by_cn(user, inst)
 
     # -------------------------------------------------------------------------
     def get_user_dn_by_mail(self, mail, inst):
@@ -758,6 +768,144 @@ class BaseLdapApplication(FbConfigApplication):
 
         return result
 
+    # -------------------------------------------------------------------------
+    def get_user_dn_by_cn(self, cn, inst):
+
+        connect_info = self.cfg.ldap_connection[inst]
+        base_dn = connect_info.base_dn
+        ldap = self.ldap_connection[inst]
+
+        result = []
+
+        attributes = ['dn']
+
+        ldap_filter = '(&'
+        ldap_filter += '(|'
+        ldap_filter += '(objectClass=account)'
+        ldap_filter += '(objectClass=inetOrgPerson)'
+        ldap_filter += '(objectClass=inetUser)'
+        ldap_filter += '(objectClass=mailRecipient)'
+        ldap_filter += '(objectClass=posixAccount)'
+        ldap_filter += '(objectClass=sambaSamAccount)'
+        ldap_filter += '(objectClass=uidObject)'
+        ldap_filter += ')'
+        ldap_filter += '(cn={})'.format(cn)
+        ldap_filter += ')'
+
+        if self.verbose > 1:
+            msg = _("Searching in {uri}/{bdn} with filter: {fltr}").format(
+                uri=connect_info.url, bdn=base_dn, fltr=ldap_filter)
+            LOG.debug(msg)
+
+        req_status, req_result, req_response, req_whatever = ldap.search(
+            search_base=base_dn, search_scope=SUBTREE, search_filter=ldap_filter,
+            get_operational_attributes=False, attributes=attributes,
+            time_limit=self.cfg.ldap_timeout)
+
+        if req_status:
+            if self.verbose > 4:
+                msg = _("Result of searching for CN {cn!r}:").format(cn=cn)
+                LOG.debug(msg + ' ' + pp(req_result))
+            for entry in req_response:
+                if self.verbose > 4:
+                    LOG.debug(_("Got a response entry:") + ' ' + pp(entry))
+                result.append(entry['dn'])
+            if self.verbose > 3:
+                LOG.debug(_("Result:") + ' ' + pp(result))
+
+        else:
+            if self.verbose > 3:
+                msg = _("User with cn {cn!r} not found in {uri}/{bdn}.").format(
+                    cn=cn, uri=connect_info.url, bdn=base_dn)
+                LOG.debug(msg)
+
+        return result
+
+    # -------------------------------------------------------------------------
+    def get_user_dn_by_dn(self, dn, inst, strict=True):
+
+        connect_info = self.cfg.ldap_connection[inst]
+
+        attributes = ['objectClass']
+
+        entry = self.get_entry(dn, inst, attributes=attributes)
+
+        if not entry:
+            if self.verbose > 3:
+                msg = _("User with DN {dn!r} not found in {uri}.").format(
+                    dn=dn, uri=connect_info.url)
+            LOG.debug(msg)
+            return None
+
+        found_dn = entry['dn']
+
+        object_classes = CIStringSet()
+        for attr in entry['attributes']:
+            if attr.lower() == 'objectclass':
+                values = entry['attributes'][attr]
+                if is_sequence(values):
+                    for val in values:
+                        object_classes.add(val)
+                else:
+                    object_classes.add(values)
+
+        if self.verbose > 3:
+            msg = _("ObjectClasses of {dn!r}:").format(dn=found_dn)
+            LOG.debug(msg + '\n' + pp(object_classes.as_list()))
+
+        is_person = False
+        for oclass in object_classes:
+            if oclass in self.person_object_classes:
+                is_person = True
+                break
+
+        if not is_person:
+            msg = _("Entry {dn!r} in {uri} seems not to be an account.").format(
+                dn=found_dn, uri=connect_info.url)
+            LOG.warn(msg)
+            if strict:
+                return None
+
+        return [found_dn]
+
+    # -------------------------------------------------------------------------
+    def get_entry(self, dn, inst, attributes=None, operational_attributes=False):
+
+        connect_info = self.cfg.ldap_connection[inst]
+        ldap = self.ldap_connection[inst]
+
+        if attributes is None:
+            attributes = [ALL_ATTRIBUTES]
+
+        sfilter = '(objectClass=*)'
+
+        result = None
+
+        if self.verbose > 1:
+            msg = _("Searching DN {dn!r} in {uri}.").format(dn=dn, uri=connect_info.url)
+            LOG.debug(msg)
+
+        req_status, req_result, req_response, req_whatever = ldap.search(
+            search_base=dn, search_scope=BASE, attributes=attributes,
+            get_operational_attributes=operational_attributes, search_filter=sfilter,
+            time_limit=self.cfg.ldap_timeout)
+
+        if req_status:
+            if self.verbose > 4:
+                msg = _("Result of searching for DN {dn!r}:").format(dn=dn)
+                LOG.debug(msg + ' ' + pp(req_result))
+            result = req_response[0]
+            if self.verbose > 3:
+                LOG.debug(_("Got a response entry:") + ' ' + pp(result))
+
+        else:
+            if self.verbose > 3:
+                msg = _("Entry with DN {dn!r} not found in {uri}.").format(
+                    dn=dn, uri=connect_info.url)
+                LOG.debug(msg)
+
+        return result
+
 
 # =============================================================================
 if __name__ == "__main__":