from ..errors import DpxWriteLdapItemError
from ..xlate import XLATOR, format_list
-__version__ = '1.3.1'
+__version__ = '1.3.2'
LOG = logging.getLogger(__name__)
_ = XLATOR.gettext
return self.get_user_dn_by_cn(user, inst)
# -------------------------------------------------------------------------
- def get_user_dn_by_mail(self, mail, inst):
+ def get_user_dn_by_mail(self, mail, inst, base_dn=None):
"""Get the DN of the user with the given mail address in the given LDAP instance."""
connect_info = self.cfg.ldap_connection[inst]
- base_dn = connect_info.base_dn
+ if not base_dn:
+ base_dn = connect_info.base_dn
result = []
uri=connect_info.url, bdn=base_dn, fltr=ldap_filter)
LOG.debug(msg)
- if inst not in self.ldap_connection:
- self.connect_instance(inst)
- ldap = self.ldap_connection[inst]
-
- try:
- req_status, req_result, req_response, req_whatever = ldap.search(
- search_base=base_dn, search_scope=SUBTREE, search_filter=ldap_filter,
- get_operational_attributes=False, attributes=attributes,
- time_limit=self.cfg.ldap_timeout)
- finally:
- if not self.single_session:
- self.disconnect_instance(inst)
+ (search_status, search_result, search_response) = self.search(
+ inst, search_base=base_dn, ldap_filter=ldap_filter, attributes=attributes)
- if req_status:
- if self.verbose > 4:
- msg = _('Result of searching for mail address {m!r}:').format(m=mail)
- LOG.debug(msg + ' ' + pp(req_result))
- for entry in req_response:
- if self.verbose > 4:
- LOG.debug(_('Got a response entry:') + ' ' + pp(entry))
- result.append(entry['dn'])
+ if search_status:
+ for entry in search_response:
+ dn = entry['dn']
+ if self.verbose > 3:
+ LOG.debug(_('Found entry {!r}.').format(dn))
+ result.append(dn)
if self.verbose > 3:
LOG.debug(_('Result:') + ' ' + pp(result))
-
else:
if self.verbose > 3:
msg = _('User with mail address {m!r} not found in {uri}/{bdn}.').format(
return result
# -------------------------------------------------------------------------
- def get_user_dn_by_uid(self, uid, inst):
+ def get_user_dn_by_uid(self, uid, inst, base_dn=None):
"""Get the DN of the user with the given uid (POSIX name) in the given LDAP instance."""
connect_info = self.cfg.ldap_connection[inst]
- base_dn = connect_info.base_dn
+ if not base_dn:
+ base_dn = connect_info.base_dn
result = []
uri=connect_info.url, bdn=base_dn, fltr=ldap_filter)
LOG.debug(msg)
- if inst not in self.ldap_connection:
- self.connect_instance(inst)
- ldap = self.ldap_connection[inst]
-
- try:
- req_status, req_result, req_response, req_whatever = ldap.search(
- search_base=base_dn, search_scope=SUBTREE, search_filter=ldap_filter,
- get_operational_attributes=False, attributes=attributes,
- time_limit=self.cfg.ldap_timeout)
- finally:
- if not self.single_session:
- self.disconnect_instance(inst)
+ (search_status, search_result, search_response) = self.search(
+ inst, search_base=base_dn, ldap_filter=ldap_filter, attributes=attributes)
- if req_status:
- if self.verbose > 4:
- msg = _('Result of searching for uid {u!r}:').format(u=uid)
- LOG.debug(msg + ' ' + pp(req_result))
- for entry in req_response:
+ if search_status:
+ for entry in search_response:
+ dn = entry['dn']
if self.verbose > 4:
LOG.debug(_('Got a response entry:') + ' ' + pp(entry))
- result.append(entry['dn'])
+ result.append(dn)
if self.verbose > 3:
LOG.debug(_('Result:') + ' ' + pp(result))
-
else:
if self.verbose > 3:
msg = _('User with uid {u!r} not found in {uri}/{bdn}.').format(
return result
# -------------------------------------------------------------------------
- def get_user_dn_by_cn(self, cn, inst):
+ def get_user_dn_by_cn(self, cn, inst, base_dn=None):
"""Get the DN of the user with the given cn (common name) in the given LDAP instance."""
connect_info = self.cfg.ldap_connection[inst]
- base_dn = connect_info.base_dn
+ if not base_dn:
+ base_dn = connect_info.base_dn
result = []
uri=connect_info.url, bdn=base_dn, fltr=ldap_filter)
LOG.debug(msg)
- if inst not in self.ldap_connection:
- self.connect_instance(inst)
- ldap = self.ldap_connection[inst]
-
- try:
- req_status, req_result, req_response, req_whatever = ldap.search(
- search_base=base_dn, search_scope=SUBTREE, search_filter=ldap_filter,
- get_operational_attributes=False, attributes=attributes,
- time_limit=self.cfg.ldap_timeout)
- finally:
- if not self.single_session:
- self.disconnect_instance(inst)
+ (search_status, search_result, search_response) = self.search(
+ inst, search_base=base_dn, ldap_filter=ldap_filter, attributes=attributes)
- if req_status:
- if self.verbose > 4:
- msg = _('Result of searching for CN {cn!r}:').format(cn=cn)
- LOG.debug(msg + ' ' + pp(req_result))
- for entry in req_response:
+ if search_status:
+ for entry in search_response:
+ dn = entry['dn']
if self.verbose > 4:
LOG.debug(_('Got a response entry:') + ' ' + pp(entry))
- result.append(entry['dn'])
+ result.append(dn)
if self.verbose > 3:
LOG.debug(_('Result:') + ' ' + pp(result))
-
else:
if self.verbose > 3:
msg = _('User with cn {cn!r} not found in {uri}/{bdn}.').format(