# Own modules
-from fb_tools.common import pp, to_str
+from fb_tools.common import pp, to_str, is_sequence
from fb_tools.errors import HandlerError, ExpectedHandlerError
from .xlate import XLATOR
-__version__ = '2.2.0'
+__version__ = '2.2.1'
LOG = logging.getLogger(__name__)
TZ = pytz.timezone('Europe/Berlin')
auto_close=True, simulate=self.simulate, force=self.force, tz=TZ,
terminal_has_colors=self.terminal_has_colors, initialized=False)
-# self.vsphere = VsphereServer(
-# appname=self.appname, verbose=self.verbose, base_dir=self.base_dir,
-# host=self.cfg.vsphere_host, port=self.cfg.vsphere_port,
-# user=self.cfg.vsphere_user, password=self.cfg.password,
-# dc=self.cfg.dc, cluster=self.cfg.vsphere_cluster,
-# auto_close=True, simulate=self.simulate, force=self.force, tz=TZ,
-# terminal_has_colors=self.terminal_has_colors, initialized=False)
self.cluster = None
self.cobbler = Cobbler(
try:
self.connect_ldap()
+
+ line = ('#' * 60) + '\n'
+ auth_keys = line
+
+ admins = self.get_ldap_admins()
+ if not admins:
+ msg = _("Did not found any admins below base DN {!r} with filter:")
+ msg = msg.format(self.cfg.ldap_connection['default'].base_dn)
+ msg += '\n' + fltr
+ raise HandlerError(msg)
+
+ for uid in sorted(admins.keys(), key=str.lower):
+
+ admin = admins[uid]
+
+ for ssh_key in sorted(admin['keys']):
+
+ parts = ssh_key.split()
+ used_key = parts[0] + ' ' + parts[1] + ' '
+ used_key += admin['cn'] + ' <' + admin['mail'] + '>'
+ auth_keys += used_key + '\n'
+ auth_keys += line
+
+ if self.verbose > 1:
+ msg = _("Generated authorized keys for root:") + '\n' + auth_keys
+ LOG.debug(msg)
+
finally:
self.disconnect_ldap()
+ # -------------------------------------------------------------------------
+ def get_ldap_admins(self):
+
+ if not self.ldap:
+ msg = _("No LDAP connection initialized.")
+ raise HandlerError(msg)
+
+ admins = {}
+
+ attrs = ['cn', 'dn', 'mail', 'sshPublicKey', 'uid']
+ ldap_config = self.cfg.ldap_connection['default']
+ fltr = ldap_config.admin_filter
+
+ msg = _("Trying to get a list of all DPX admins with their public SSH keys ...")
+ LOG.debug(msg)
+
+ if self.verbose > 1:
+ msg = _("LDAP search starting in {!r} with filter:").format(ldap_config.base_dn)
+ msg += '\n' + fltr
+ LOG.debug(msg)
+
+ status, result, response, request = self.ldap.search(
+ search_base=ldap_config.base_dn, search_scope=ldap3.SUBTREE, search_filter=fltr,
+ attributes=attrs, time_limit=self.cfg.ldap_timeout)
+
+ if not status:
+ msg = _("Error retrieving DPX admin list from LDAP:")
+ msg += ' ' + result
+ raise HandlerError(msg)
+
+ for entry in response:
+
+ uid = None
+ admin = {
+ 'cn': None,
+ 'dn': None,
+ 'mail': None,
+ 'keys': [],
+ 'uid': None,
+ }
+
+ admin['dn'] = entry['dn']
+
+ for attr in entry['attributes']:
+
+ val = entry['attributes'][attr]
+
+ if attr.lower() == 'uid':
+ if is_sequence(val):
+ uid = val[0]
+ else:
+ uid = val
+ admin['uid'] = uid
+
+ if attr.lower() == 'cn':
+ if is_sequence(val):
+ admin['cn'] = val[0]
+ else:
+ admin['cn'] = val
+
+ if attr.lower() == 'mail':
+ if is_sequence(val):
+ admin['mail'] = val[0]
+ else:
+ admin['mail'] = val
+
+ if attr.lower() == 'sshpublickey':
+ if is_sequence(val):
+ for key in val:
+ admin['keys'].append(key)
+ else:
+ admin['keys'].append(val)
+
+ if self.verbose == 2:
+ msg = _("Got an admin {cn} <{mail}>.").format(cn=admin['cn'], mail=admin['mail'])
+ LOG.debug(msg)
+ elif self.verbose > 2:
+ msg = _("Got an admin:") + '\n' + pp(admin)
+ LOG.debug(msg)
+
+ admins[uid] = admin
+
+ return admins
+
# =============================================================================
if __name__ == "__main__":