]> Frank Brehm's Git Trees - pixelpark/pp-admin-tools.git/commitdiff
Rewriting specialized search methods to use the general search() method.
authorFrank Brehm <frank.brehm@pixelpark.com>
Wed, 31 Jan 2024 11:38:52 +0000 (12:38 +0100)
committerFrank Brehm <frank.brehm@pixelpark.com>
Wed, 31 Jan 2024 11:38:52 +0000 (12:38 +0100)
lib/pp_admintools/app/ldap.py

index 32f4acd232f0ea74e419036956a9d1ba747f1abd..9b9cd8b11efd881ccbb459d9afcd055075c4f52b 100644 (file)
@@ -60,7 +60,7 @@ from ..errors import DpxLdapSessionError
 from ..errors import DpxWriteLdapItemError
 from ..xlate import XLATOR, format_list
 
-__version__ = '1.3.1'
+__version__ = '1.3.2'
 LOG = logging.getLogger(__name__)
 
 _ = XLATOR.gettext
@@ -1197,10 +1197,11 @@ class BaseLdapApplication(BaseDPXApplication):
         return self.get_user_dn_by_cn(user, inst)
 
     # -------------------------------------------------------------------------
-    def get_user_dn_by_mail(self, mail, inst):
+    def get_user_dn_by_mail(self, mail, inst, base_dn=None):
         """Get the DN of the user with the given mail address in the given LDAP instance."""
         connect_info = self.cfg.ldap_connection[inst]
-        base_dn = connect_info.base_dn
+        if not base_dn:
+            base_dn = connect_info.base_dn
 
         result = []
 
@@ -1224,30 +1225,17 @@ class BaseLdapApplication(BaseDPXApplication):
                 uri=connect_info.url, bdn=base_dn, fltr=ldap_filter)
             LOG.debug(msg)
 
-        if inst not in self.ldap_connection:
-            self.connect_instance(inst)
-        ldap = self.ldap_connection[inst]
-
-        try:
-            req_status, req_result, req_response, req_whatever = ldap.search(
-                search_base=base_dn, search_scope=SUBTREE, search_filter=ldap_filter,
-                get_operational_attributes=False, attributes=attributes,
-                time_limit=self.cfg.ldap_timeout)
-        finally:
-            if not self.single_session:
-                self.disconnect_instance(inst)
+        (search_status, search_result, search_response) = self.search(
+            inst, search_base=base_dn, ldap_filter=ldap_filter, attributes=attributes)
 
-        if req_status:
-            if self.verbose > 4:
-                msg = _('Result of searching for mail address {m!r}:').format(m=mail)
-                LOG.debug(msg + ' ' + pp(req_result))
-            for entry in req_response:
-                if self.verbose > 4:
-                    LOG.debug(_('Got a response entry:') + ' ' + pp(entry))
-                result.append(entry['dn'])
+        if search_status:
+            for entry in search_response:
+                dn = entry['dn']
+                if self.verbose > 3:
+                    LOG.debug(_('Found entry {!r}.').format(dn))
+                result.append(dn)
             if self.verbose > 3:
                 LOG.debug(_('Result:') + ' ' + pp(result))
-
         else:
             if self.verbose > 3:
                 msg = _('User with mail address {m!r} not found in {uri}/{bdn}.').format(
@@ -1257,10 +1245,11 @@ class BaseLdapApplication(BaseDPXApplication):
         return result
 
     # -------------------------------------------------------------------------
-    def get_user_dn_by_uid(self, uid, inst):
+    def get_user_dn_by_uid(self, uid, inst, base_dn=None):
         """Get the DN of the user with the given uid (POSIX name) in the given LDAP instance."""
         connect_info = self.cfg.ldap_connection[inst]
-        base_dn = connect_info.base_dn
+        if not base_dn:
+            base_dn = connect_info.base_dn
 
         result = []
 
@@ -1284,30 +1273,17 @@ class BaseLdapApplication(BaseDPXApplication):
                 uri=connect_info.url, bdn=base_dn, fltr=ldap_filter)
             LOG.debug(msg)
 
-        if inst not in self.ldap_connection:
-            self.connect_instance(inst)
-        ldap = self.ldap_connection[inst]
-
-        try:
-            req_status, req_result, req_response, req_whatever = ldap.search(
-                search_base=base_dn, search_scope=SUBTREE, search_filter=ldap_filter,
-                get_operational_attributes=False, attributes=attributes,
-                time_limit=self.cfg.ldap_timeout)
-        finally:
-            if not self.single_session:
-                self.disconnect_instance(inst)
+        (search_status, search_result, search_response) = self.search(
+            inst, search_base=base_dn, ldap_filter=ldap_filter, attributes=attributes)
 
-        if req_status:
-            if self.verbose > 4:
-                msg = _('Result of searching for uid {u!r}:').format(u=uid)
-                LOG.debug(msg + ' ' + pp(req_result))
-            for entry in req_response:
+        if search_status:
+            for entry in search_response:
+                dn = entry['dn']
                 if self.verbose > 4:
                     LOG.debug(_('Got a response entry:') + ' ' + pp(entry))
-                result.append(entry['dn'])
+                result.append(dn)
             if self.verbose > 3:
                 LOG.debug(_('Result:') + ' ' + pp(result))
-
         else:
             if self.verbose > 3:
                 msg = _('User with uid {u!r} not found in {uri}/{bdn}.').format(
@@ -1317,10 +1293,11 @@ class BaseLdapApplication(BaseDPXApplication):
         return result
 
     # -------------------------------------------------------------------------
-    def get_user_dn_by_cn(self, cn, inst):
+    def get_user_dn_by_cn(self, cn, inst, base_dn=None):
         """Get the DN of the user with the given cn (common name) in the given LDAP instance."""
         connect_info = self.cfg.ldap_connection[inst]
-        base_dn = connect_info.base_dn
+        if not base_dn:
+            base_dn = connect_info.base_dn
 
         result = []
 
@@ -1344,30 +1321,17 @@ class BaseLdapApplication(BaseDPXApplication):
                 uri=connect_info.url, bdn=base_dn, fltr=ldap_filter)
             LOG.debug(msg)
 
-        if inst not in self.ldap_connection:
-            self.connect_instance(inst)
-        ldap = self.ldap_connection[inst]
-
-        try:
-            req_status, req_result, req_response, req_whatever = ldap.search(
-                search_base=base_dn, search_scope=SUBTREE, search_filter=ldap_filter,
-                get_operational_attributes=False, attributes=attributes,
-                time_limit=self.cfg.ldap_timeout)
-        finally:
-            if not self.single_session:
-                self.disconnect_instance(inst)
+        (search_status, search_result, search_response) = self.search(
+            inst, search_base=base_dn, ldap_filter=ldap_filter, attributes=attributes)
 
-        if req_status:
-            if self.verbose > 4:
-                msg = _('Result of searching for CN {cn!r}:').format(cn=cn)
-                LOG.debug(msg + ' ' + pp(req_result))
-            for entry in req_response:
+        if search_status:
+            for entry in search_response:
+                dn = entry['dn']
                 if self.verbose > 4:
                     LOG.debug(_('Got a response entry:') + ' ' + pp(entry))
-                result.append(entry['dn'])
+                result.append(dn)
             if self.verbose > 3:
                 LOG.debug(_('Result:') + ' ' + pp(result))
-
         else:
             if self.verbose > 3:
                 msg = _('User with cn {cn!r} not found in {uri}/{bdn}.').format(