# Third party modules
from ldap3 import Server, Connection, DSA, IP_V4_PREFERRED, SAFE_SYNC
# from ldap3 import ALL
+from ldap3 import SUBTREE
# from ldap3 import BASE, LEVEL, SUBTREE, DEREF_NEVER, DEREF_SEARCH, DEREF_BASE, DEREF_ALWAYS
# from ldap3 import ALL_ATTRIBUTES, ALL_OPERATIONAL_ATTRIBUTES
# from ldap3 import MODIFY_ADD, MODIFY_DELETE, MODIFY_REPLACE
# rom ..config.ldap import DEFAULT_PORT_LDAP, DEFAULT_PORT_LDAPS
from ..config.ldap import DEFAULT_TIMEOUT, MAX_TIMEOUT
-__version__ = '0.3.3'
+__version__ = '0.4.0'
LOG = logging.getLogger(__name__)
_ = XLATOR.gettext
if MailAddress.valid_address(user, verbose=self.verbose):
msg = _("Trying to evaluate DN of user {u!r} as a mail address ...").format(u=user)
LOG.debug(msg)
- return 'mail={u},{b}'.format(u=user, b=base_dn)
+ return self.get_user_dn_by_mail(user, inst)
if self.re_ldap_dn.match(user):
msg = _("Trying to evaluate DN of user {u!r} as a LDAP DN ...").format(u=user)
LOG.debug(msg)
- return user
+ return [user]
if self.re_uid.match(user):
msg = _("Trying to evaluate DN of user {u!r} as a UID (Posix user name) ...").format(
u=user)
LOG.debug(msg)
- return 'uid={u},{b}'.format(u=user, b=base_dn)
+ return self.get_user_dn_by_uid(user, inst)
usr = user.strip()
if usr == '':
LOG.debug(msg)
return 'cn={u},{b}'.format(u=usr, b=base_dn)
+ # -------------------------------------------------------------------------
+ def get_user_dn_by_mail(self, mail, inst):
+
+ connect_info = self.cfg.ldap_connection[inst]
+ base_dn = connect_info.base_dn
+ ldap = self.ldap_connection[inst]
+
+ result = []
+
+ attributes = ['dn']
+
+ ldap_filter = '(&'
+ ldap_filter += '(|'
+ ldap_filter += '(objectClass=inetLocalMailRecipient)'
+ ldap_filter += '(objectClass=inetOrgPerson)'
+ ldap_filter += '(objectClass=inetMailUser)'
+ ldap_filter += ')'
+ ldap_filter += '(|'
+ ldap_filter += '(mail={})'.format(mail)
+ ldap_filter += '(mailAlternateAddress={})'.format(mail)
+ ldap_filter += '(mailEquivalentAddress={})'.format(mail)
+ ldap_filter += ')'
+ ldap_filter += ')'
+
+ if self.verbose > 1:
+ msg = _("Searching in {uri}/{bdn} with filter: {fltr}").format(
+ uri=connect_info.url, bdn=base_dn, fltr=ldap_filter)
+ LOG.debug(msg)
+
+ req_status, req_result, req_response, req_whatever = ldap.search(
+ search_base=base_dn, search_scope=SUBTREE, search_filter=ldap_filter,
+ get_operational_attributes=False, attributes=attributes,
+ time_limit=self.cfg.ldap_timeout)
+
+ if req_status:
+ if self.verbose > 4:
+ msg = _("Result of searching for mail address {m!r}:").format(m=mail)
+ LOG.debug(msg + ' ' + pp(req_result))
+ for entry in req_response:
+ if self.verbose > 4:
+ LOG.debug(_("Got a response entry:") + ' ' + pp(entry))
+ result.append(entry['dn'])
+ if self.verbose > 3:
+ LOG.debug(_("Result:") + ' ' + pp(result))
+
+ else:
+ if self.verbose > 3:
+ msg = _("User with mail address {m!r} not found in {uri}/{bdn}.").format(
+ m=mail, uri=connect_info.url, bdn=base_dn)
+ LOG.debug(msg)
+
+ return result
+
+ # -------------------------------------------------------------------------
+ def get_user_dn_by_uid(self, uid, inst):
+
+ connect_info = self.cfg.ldap_connection[inst]
+ base_dn = connect_info.base_dn
+ ldap = self.ldap_connection[inst]
+
+ result = []
+
+ attributes = ['dn']
+
+ ldap_filter = '(&'
+ ldap_filter += '(|'
+ ldap_filter += '(objectClass=account)'
+ ldap_filter += '(objectClass=inetOrgPerson)'
+ ldap_filter += '(objectClass=inetUser)'
+ ldap_filter += '(objectClass=mailRecipient)'
+ ldap_filter += '(objectClass=posixAccount)'
+ ldap_filter += '(objectClass=sambaSamAccount)'
+ ldap_filter += '(objectClass=uidObject)'
+ ldap_filter += ')'
+ ldap_filter += '(uid={})'.format(uid)
+ ldap_filter += ')'
+
+ if self.verbose > 1:
+ msg = _("Searching in {uri}/{bdn} with filter: {fltr}").format(
+ uri=connect_info.url, bdn=base_dn, fltr=ldap_filter)
+ LOG.debug(msg)
+
+ req_status, req_result, req_response, req_whatever = ldap.search(
+ search_base=base_dn, search_scope=SUBTREE, search_filter=ldap_filter,
+ get_operational_attributes=False, attributes=attributes,
+ time_limit=self.cfg.ldap_timeout)
+
+ if req_status:
+ if self.verbose > 4:
+ msg = _("Result of searching for uid {u!r}:").format(u=uid)
+ LOG.debug(msg + ' ' + pp(req_result))
+ for entry in req_response:
+ if self.verbose > 4:
+ LOG.debug(_("Got a response entry:") + ' ' + pp(entry))
+ result.append(entry['dn'])
+ if self.verbose > 3:
+ LOG.debug(_("Result:") + ' ' + pp(result))
+
+ else:
+ if self.verbose > 3:
+ msg = _("User with uid {u!r} not found in {uri}/{bdn}.").format(
+ u=uid, uri=connect_info.url, bdn=base_dn)
+ LOG.debug(msg)
+
+ return result
+
# =============================================================================
if __name__ == "__main__":
from ..app.ldap import LdapAppError
from ..app.ldap import BaseLdapApplication
-__version__ = '0.3.0'
+__version__ = '0.3.1'
LOG = logging.getLogger(__name__)
_ = XLATOR.gettext
for inst in self.ldap_instances:
connect_info = self.cfg.ldap_connection[inst]
- dn = self.get_user_dn(usr, inst)
- if dn:
+ dns = self.get_user_dn(usr, inst)
+ if dns:
+ if self.verbose > 2:
+ msg = _("Got DN {dn!r} for user {user!r} in LDAP instance {inst}.").format(
+ dn=dns, user=usr, inst=connect_info.url)
+ LOG.debug(msg)
if inst not in self.dns:
self.dns[inst] = []
- if is_sequence(dn):
- self.dns[inst] += dn
- if len(dn) > 1:
+ if is_sequence(dns):
+ if len(dns) > 1:
msg = _("Found {nr} entries for user {u!r} in LDAP instance {i}.").format(
- nr=len(dn), u=usr, i=connect_info.url)
+ nr=len(dns), u=usr, i=connect_info.url)
LOG.warn(msg)
self.wrong_users = True
return
+ for dn in dns:
+ if dn not in self.dns[inst]:
+ self.dns[inst].append(dn)
else:
- self.dns[inst].append(dn)
- if self.verbose > 2:
- msg = _("Got DN {dn!r} for user {user!r} in LDAP instance {inst}.").format(
- dn=dn, user=usr, inst=connect_info.url)
- LOG.debug(msg)
+ if dns not in self.dns[inst]:
+ self.dns[inst].append(dns)
else:
msg = _("Did not found user {user!r} in LDAP instance {inst}.").format(
user=usr, inst=connect_info.url)