--- /dev/null
+---
+ldap:
+ timeout: 5
+ default:
+ host: 'prd-ds.pixelpark.com'
+ ldaps: true
+ port: 636
+ base_dn: 'o=isp'
+ bind_dn: 'uid=readonly,ou=People,o=isp'
+ # bind_pw: ******
+ is_admin: false
+ readonly: true
+ tier: 'prod'
+ dpx-dev:
+ host: 'dev-ldap2.pixelpark.com'
+ ldaps: true
+ port: 636
+ base_dn: 'o=isp'
+ bind_dn: 'cn=admin'
+ # bind_pw: ******
+ is_admin: true
+ readonly: false
+ tier: 'dev'
+ dpx-dev-ro:
+ host: 'dev-ldap2.pixelpark.com'
+ ldaps: true
+ port: 636
+ base_dn: 'o=isp'
+ bind_dn: 'uid=readonly,ou=People,o=isp'
+ # bind_pw: ******
+ is_admin: false
+ readonly: true
+ tier: 'dev'
+ dpx-prod:
+ host: 'prd-ds.pixelpark.com'
+ ldaps: true
+ port: 636
+ base_dn: 'o=isp'
+ bind_dn: 'cn=admin'
+ # bind_pw: ******
+ is_admin: true
+ readonly: false
+ tier: 'prod'
+ dpx-prod-ro:
+ host: 'prd-ds.pixelpark.com'
+ ldaps: true
+ port: 636
+ base_dn: 'o=isp'
+ bind_dn: 'uid=readonly,ou=People,o=isp'
+ # bind_pw: ******
+ is_admin: false
+ readonly: true
+ tier: 'prod'
+ dpx-legacy:
+ host: 'ldap-legacy.pixelpark.com'
+ ldaps: false
+ port: 389
+ base_dn: 'o=isp'
+ bind_dn: 'cn=admin'
+ # bind_pw: ******
+ is_admin: true
+ readonly: false
+ tier: 'prod'
+ spk-live:
+ host: 'live-ldap.spk.pixelpark.net'
+ ldaps: true
+ port: 636
+ base_dn: 'dc=spk,dc=pixelpark,dc=net'
+ bind_dn: 'cn=admin'
+ # bind_pw: ******
+ is_admin: true
+ readonly: false
+ tier: 'prod'
+ spk-stage:
+ host: 'stage-ldap.spk.pixelpark.net'
+ ldaps: true
+ port: 636
+ base_dn: 'dc=spk,dc=pixelpark,dc=net'
+ bind_dn: 'cn=admin'
+ # bind_pw: ******
+ is_admin: true
+ readonly: false
+ tier: 'test'
from ldap3.core.exceptions import LDAPException
# from ldap3.core.exceptions import LDAPException, LDAPBindError
-from fb_tools.common import pp, is_sequence
+from fb_tools.common import pp, is_sequence, to_bool
from fb_tools.mailaddress import MailAddress
from fb_tools.collections import FrozenCIStringSet, CIStringSet, CIDict
# Own modules
from .. import __version__ as GLOBAL_VERSION
-from ..xlate import XLATOR
+from ..xlate import XLATOR, format_list
from .. import MAX_PORT_NUMBER, DEFAULT_CONFIG_DIR
# rom ..config.ldap import DEFAULT_PORT_LDAP, DEFAULT_PORT_LDAPS
from ..config.ldap import DEFAULT_TIMEOUT
-__version__ = '0.5.1'
+__version__ = '0.6.1'
LOG = logging.getLogger(__name__)
_ = XLATOR.gettext
print()
# -------------------------------------------------------------------------
- def _verify_instances(self):
+ def _verify_instances(self, is_admin=None, readonly=None, tier=None):
if self.verbose > 1:
LOG.debug(_("Verifying given instances ..."))
+ if self.verbose > 2:
+ show_filter = []
+ show_filter.append("inst != 'default'")
+ if is_admin is not None:
+ show_filter.append('is_admin = {!r}'.format(is_admin))
+ if readonly is not None:
+ show_filter.append('readonly = {!r}'.format(readonly))
+ if tier is not None:
+ show_filter.append('tier = {!r}'.format(tier))
+ msg = _("Used filter:") + ' ' + format_list(show_filter)
+ LOG.debug(msg)
+
instances = []
for inst in self.cfg.ldap_connection.keys():
- if inst != 'default':
- instances.append(inst.lower())
+ if inst == 'default':
+ continue
+ instance = self.cfg.ldap_connection[inst]
+ if is_admin is not None:
+ if to_bool(is_admin) != instance.is_admin:
+ continue
+ if readonly is not None:
+ if to_bool(readonly) != instance.readonly:
+ continue
+ if tier is not None:
+ if tier.strip().lower() != instance.tier:
+ continue
+ instances.append(inst.lower())
if len(self.ldap_instances) == 1 and self.ldap_instances[0].lower() == 'all':
self.ldap_instances = instances
"different in the particular LDAP instances).")
)
+ # -------------------------------------------------------------------------
+ def _verify_instances(self):
+
+ super(RemoveLdapUserApplication, self)._verify_instances(is_admin=True, readonly=False)
+
# -------------------------------------------------------------------------
def post_init(self):
"""
self.given_users = given_users
+ self.check_instances()
+
+ # -------------------------------------------------------------------------
+ def check_instances(self):
+ """Checking given instances for admin and read/write access."""
+
+ msg = _("Checking given instances for admin and read/write access.")
+ LOG.debug(msg)
+
+ all_ok = True
+
+ for inst_name in self.ldap_instances:
+ if inst_name not in self.cfg.ldap_connection:
+ msg = _("LDAP instance {!r} not found in configuration.").format(inst_name)
+ LOG.error(msg)
+ all_ok = False
+ continue
+
+ inst = self.cfg.ldap_connection[inst_name]
+
+ if inst.readonly:
+ msg = _("LDAP instance {!r} has only readonly access.").format(inst_name)
+ LOG.error(msg)
+ all_ok = False
+
+ if not inst.is_admin:
+ msg = _("No admin access to LDAP instance {!r}.").format(inst_name)
+ LOG.error(msg)
+ all_ok = False
+
+ if not all_ok:
+ self.exit(8)
+
# -------------------------------------------------------------------------
def _run(self):
from .. import MAX_PORT_NUMBER, DEFAULT_CONFIG_DIR
+from . import VALID_TIERS, DEFAULT_TIER
+
from ..xlate import XLATOR
-__version__ = '0.2.6'
+__version__ = '0.3.1'
LOG = logging.getLogger(__name__)
_ = XLATOR.gettext
def __init__(
self, appname=None, verbose=0, version=__version__, base_dir=None,
host=None, use_ldaps=False, port=DEFAULT_PORT_LDAP, base_dn=None,
- bind_dn=None, bind_pw=None, initialized=False):
+ bind_dn=None, bind_pw=None, is_admin=None, readonly=None, tier=None,
+ initialized=False):
self._host = None
self._use_ldaps = False
self._base_dn = None
self._bind_dn = None
self._bind_pw = None
+ self._is_admin = False
+ self._readonly = True
+ self._tier = DEFAULT_TIER
super(LdapConnectionInfo, self).__init__(
appname=appname, verbose=verbose, version=version, base_dir=base_dir,
self.bind_dn = bind_dn
if bind_pw is not None:
self.bind_pw = bind_pw
+ self.is_admin = is_admin
+ self.readonly = readonly
+ self.tier = tier
if initialized:
self.initialized = True
res['bind_pw'] = None
res['schema'] = self.schema
res['url'] = self.url
+ res['is_admin'] = self.is_admin
+ res['readonly'] = self.readonly
+ res['tier'] = self.tier
if self.bind_pw:
if self.verbose > 4:
return '{s}://{h}{p}'.format(s=self.schema, h=self.host, p=port)
+ # -----------------------------------------------------------
+ @property
+ def is_admin(self):
+ """Is this an admin connection with all permissions."""
+ return self._is_admin
+
+ @is_admin.setter
+ def is_admin(self, value):
+ if value is not None:
+ self._is_admin = to_bool(value)
+
+ # -----------------------------------------------------------
+ @property
+ def readonly(self):
+ """Is this a readonly connection."""
+ return self._readonly
+
+ @readonly.setter
+ def readonly(self, value):
+ if value is not None:
+ self._readonly = to_bool(value)
+
+ # -----------------------------------------------------------
+ @property
+ def tier(self):
+ """The tier of production level of the LDAP instance (prod, test or dev)."""
+ return self._tier
+
+ @tier.setter
+ def tier(self, value):
+ if value is None:
+ return
+ val_lc = str(value).strip().lower()
+ if val_lc not in VALID_TIERS:
+ msg = _("Invalid production tier {!r} given.").format(value)
+ raise LdapConfigError(msg)
+ self._tier = val_lc
+
# -------------------------------------------------------------------------
def __repr__(self):
"""Typecasting into a string for reproduction."""
fields.append("base_dn={!r}".format(self.base_dn))
fields.append("bind_dn={!r}".format(self.bind_dn))
fields.append("bind_pw={!r}".format(self.bind_pw))
+ fields.append("is_admin={!r}".format(self.is_admin))
+ fields.append("readonly={!r}".format(self.readonly))
+ fields.append("tier={!r}".format(self.tier))
fields.append("initialized={!r}".format(self.initialized))
out += ", ".join(fields) + ")>"
new = self.__class__(
appname=self.appname, verbose=self.verbose, base_dir=self.base_dir, host=self.host,
use_ldaps=self.use_ldaps, port=self.port, base_dn=self.base_dn, bind_dn=self.bind_dn,
- bind_pw=self.bind_pw, initialized=self.initialized)
+ bind_pw=self.bind_pw, is_admin=self.is_admin, readonly=self.readonly, tier=self.tier,
+ initialized=self.initialized)
return new
re_ldap_base_dn_key = re.compile(r'^\s*base[_-]*dn\s*$', re.IGNORECASE)
re_ldap_bind_dn_key = re.compile(r'^\s*bind[_-]*dn\s*$', re.IGNORECASE)
re_ldap_bind_pw_key = re.compile(r'^\s*bind[_-]*pw\s*$', re.IGNORECASE)
+ re_ldap_is_admin_key = re.compile(r'^\s*(?:is[_-]*)?admin\s*$', re.IGNORECASE)
+ re_ldap_readonly_key = re.compile(r'^\s*read[_-]*only\s*$', re.IGNORECASE)
# -------------------------------------------------------------------------
def __init__(
connection.bind_pw = value
continue
+ if self.re_ldap_is_admin_key.match(key):
+ connection.is_admin = value
+ continue
+
+ if self.re_ldap_readonly_key.match(key):
+ connection.readonly = value
+ continue
+
+ if key.strip().lower() == 'tier':
+ connection.tier = value
+ continue
+
msg = _("Unknown LDAP configuration key {key} found in section {sec!r}.").format(
key=key, sec=section_name)
LOG.error(msg)