# Third party modules
from ldap3 import Server, Connection, DSA, IP_V4_PREFERRED, SAFE_SYNC
# from ldap3 import ALL
-from ldap3 import SUBTREE
+from ldap3 import BASE, SUBTREE
# from ldap3 import BASE, LEVEL, SUBTREE, DEREF_NEVER, DEREF_SEARCH, DEREF_BASE, DEREF_ALWAYS
+from ldap3 import ALL_ATTRIBUTES
# from ldap3 import ALL_ATTRIBUTES, ALL_OPERATIONAL_ATTRIBUTES
# from ldap3 import MODIFY_ADD, MODIFY_DELETE, MODIFY_REPLACE
# from ldap3.core.exceptions import LDAPInvalidDnError, LDAPInvalidValueError
# from ldap3.core.exceptions import LDAPException, LDAPBindError
-from fb_tools.common import pp, to_bool
+from fb_tools.common import pp, to_bool, is_sequence
from fb_tools.cfg_app import FbConfigApplication
from fb_tools.errors import FbAppError
from fb_tools.mailaddress import MailAddress
+from fb_tools.collections import FrozenCIStringSet, CIStringSet
# Own modules
from .. import __version__ as GLOBAL_VERSION
# rom ..config.ldap import DEFAULT_PORT_LDAP, DEFAULT_PORT_LDAPS
from ..config.ldap import DEFAULT_TIMEOUT, MAX_TIMEOUT
-__version__ = '0.4.0'
+__version__ = '0.4.1'
LOG = logging.getLogger(__name__)
_ = XLATOR.gettext
pattern_re_uid = r'^[a-z](?:[a-z0-9\-_.]*[a-z0-9])?$'
re_uid = re.compile(pattern_re_uid, re.IGNORECASE)
+ person_object_classes = FrozenCIStringSet([
+ 'account', 'inetOrgPerson', 'inetUser', 'posixAccount', 'sambaSamAccount'])
+
# -------------------------------------------------------------------------
def __init__(
self, appname=None, verbose=0, version=GLOBAL_VERSION, base_dir=None,
def get_user_dn(self, user, inst):
connect_info = self.cfg.ldap_connection[inst]
- base_dn = connect_info.base_dn
if self.verbose > 1:
msg = _("Trying to evaluate DN of user {user!r} in LDAP instance {inst} ...").format(
if MailAddress.valid_address(user, verbose=self.verbose):
msg = _("Trying to evaluate DN of user {u!r} as a mail address ...").format(u=user)
LOG.debug(msg)
- return self.get_user_dn_by_mail(user, inst)
+ dns = self.get_user_dn_by_mail(user, inst)
+ if dns:
+ return dns
if self.re_ldap_dn.match(user):
msg = _("Trying to evaluate DN of user {u!r} as a LDAP DN ...").format(u=user)
LOG.debug(msg)
- return [user]
+ dns = self.get_user_dn_by_dn(user, inst)
+ if dns:
+ return dns
if self.re_uid.match(user):
msg = _("Trying to evaluate DN of user {u!r} as a UID (Posix user name) ...").format(
u=user)
LOG.debug(msg)
- return self.get_user_dn_by_uid(user, inst)
+ dns = self.get_user_dn_by_uid(user, inst)
+ if dns:
+ return dns
usr = user.strip()
if usr == '':
msg = _("Trying to evaluate DN of user {u!r} as a CN ({c}) ...").format(
u=usr, c='common name')
LOG.debug(msg)
- return 'cn={u},{b}'.format(u=usr, b=base_dn)
+ return self.get_user_dn_by_cn(user, inst)
# -------------------------------------------------------------------------
def get_user_dn_by_mail(self, mail, inst):
return result
+ # -------------------------------------------------------------------------
+ def get_user_dn_by_cn(self, cn, inst):
+
+ connect_info = self.cfg.ldap_connection[inst]
+ base_dn = connect_info.base_dn
+ ldap = self.ldap_connection[inst]
+
+ result = []
+
+ attributes = ['dn']
+
+ ldap_filter = '(&'
+ ldap_filter += '(|'
+ ldap_filter += '(objectClass=account)'
+ ldap_filter += '(objectClass=inetOrgPerson)'
+ ldap_filter += '(objectClass=inetUser)'
+ ldap_filter += '(objectClass=mailRecipient)'
+ ldap_filter += '(objectClass=posixAccount)'
+ ldap_filter += '(objectClass=sambaSamAccount)'
+ ldap_filter += '(objectClass=uidObject)'
+ ldap_filter += ')'
+ ldap_filter += '(cn={})'.format(cn)
+ ldap_filter += ')'
+
+ if self.verbose > 1:
+ msg = _("Searching in {uri}/{bdn} with filter: {fltr}").format(
+ uri=connect_info.url, bdn=base_dn, fltr=ldap_filter)
+ LOG.debug(msg)
+
+ req_status, req_result, req_response, req_whatever = ldap.search(
+ search_base=base_dn, search_scope=SUBTREE, search_filter=ldap_filter,
+ get_operational_attributes=False, attributes=attributes,
+ time_limit=self.cfg.ldap_timeout)
+
+ if req_status:
+ if self.verbose > 4:
+ msg = _("Result of searching for CN {cn!r}:").format(cn=cn)
+ LOG.debug(msg + ' ' + pp(req_result))
+ for entry in req_response:
+ if self.verbose > 4:
+ LOG.debug(_("Got a response entry:") + ' ' + pp(entry))
+ result.append(entry['dn'])
+ if self.verbose > 3:
+ LOG.debug(_("Result:") + ' ' + pp(result))
+
+ else:
+ if self.verbose > 3:
+ msg = _("User with cn {cn!r} not found in {uri}/{bdn}.").format(
+ cn=cn, uri=connect_info.url, bdn=base_dn)
+ LOG.debug(msg)
+
+ return result
+
+ # -------------------------------------------------------------------------
+ def get_user_dn_by_dn(self, dn, inst, strict=True):
+
+ connect_info = self.cfg.ldap_connection[inst]
+
+ attributes = ['objectClass']
+
+ entry = self.get_entry(dn, inst, attributes=attributes)
+
+ if not entry:
+ if self.verbose > 3:
+ msg = _("User with DN {dn!r} not found in {uri}.").format(
+ dn=dn, uri=connect_info.url)
+ LOG.debug(msg)
+ return None
+
+ found_dn = entry['dn']
+
+ object_classes = CIStringSet()
+ for attr in entry['attributes']:
+ if attr.lower() == 'objectclass':
+ values = entry['attributes'][attr]
+ if is_sequence(values):
+ for val in values:
+ object_classes.add(val)
+ else:
+ object_classes.add(values)
+
+ if self.verbose > 3:
+ msg = _("ObjectClasses of {dn!r}:").format(dn=found_dn)
+ LOG.debug(msg + '\n' + pp(object_classes.as_list()))
+
+ is_person = False
+ for oclass in object_classes:
+ if oclass in self.person_object_classes:
+ is_person = True
+ break
+
+ if not is_person:
+ msg = _("Entry {dn!r} in {uri} seems not to be an account.").format(
+ dn=found_dn, uri=connect_info.url)
+ LOG.warn(msg)
+ if strict:
+ return None
+
+ return [found_dn]
+
+ # -------------------------------------------------------------------------
+ def get_entry(self, dn, inst, attributes=None, operational_attributes=False):
+
+ connect_info = self.cfg.ldap_connection[inst]
+ ldap = self.ldap_connection[inst]
+
+ if attributes is None:
+ attributes = [ALL_ATTRIBUTES]
+
+ sfilter = '(objectClass=*)'
+
+ result = None
+
+ if self.verbose > 1:
+ msg = _("Searching DN {dn!r} in {uri}.").format(dn=dn, uri=connect_info.url)
+ LOG.debug(msg)
+
+ req_status, req_result, req_response, req_whatever = ldap.search(
+ search_base=dn, search_scope=BASE, attributes=attributes,
+ get_operational_attributes=operational_attributes, search_filter=sfilter,
+ time_limit=self.cfg.ldap_timeout)
+
+ if req_status:
+ if self.verbose > 4:
+ msg = _("Result of searching for DN {dn!r}:").format(dn=dn)
+ LOG.debug(msg + ' ' + pp(req_result))
+ result = req_response[0]
+ if self.verbose > 3:
+ LOG.debug(_("Got a response entry:") + ' ' + pp(result))
+
+ else:
+ if self.verbose > 3:
+ msg = _("Entry with DN {dn!r} not found in {uri}.").format(
+ dn=dn, uri=connect_info.url)
+ LOG.debug(msg)
+
+ return result
+
# =============================================================================
if __name__ == "__main__":