--- /dev/null
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+"""
+@author: Frank Brehm
+@contact: frank.brehm@pixelpark.com
+@copyright: © 2023 by Frank Brehm, Digitas Pixelpark GmbH, Berlin
+@summary: A module for a dictionary of BaseModuleInfo objects.
+"""
+from __future__ import absolute_import
+
+# Standard modules
+import logging
+
+from collections.abc import MutableMapping
+
+from functools import cmp_to_key
+
+# Third party modules
+from fb_tools.common import to_bool
+from fb_tools.obj import FbBaseObject
+
+# Own modules
+
+from .base_moduleinfo import BaseModuleInfo
+
+from .xlate import XLATOR
+
+__version__ = '1.4.0'
+
+LOG = logging.getLogger(__name__)
+
+_ = XLATOR.gettext
+ngettext = XLATOR.ngettext
+
+
+# =============================================================================
+class ModuleInfoDict(MutableMapping, FbBaseObject):
+ """
+ A dictionary containing BaseModuleInfo objects.
+ It works like a dict.
+ i.e.:
+ modules = ModuleInfoDict(BaseModuleInfo(full_name='puppet-mongodb', ...))
+ and
+ modules['puppet-mongodb'] returns a BaseModuleInfo object for puppet module 'puppet-mongodb'
+ """
+
+ msg_invalid_modinfo_type = _("Invalid value type {{!r}} to set, only {} allowed.").format(
+ 'BaseModuleInfo')
+ msg_key_not_name = _("The key {k!r} must be equal to the full name {n!r} of the module.")
+ msg_none_type_error = _("None type as key is not allowed.")
+ msg_empty_key_error = _("Empty key {!r} is not allowed.")
+ msg_no_modinfo_dict = _("Object {{!r}} is not a {} object.").format('ModuleInfoDict')
+
+ # -------------------------------------------------------------------------
+ def __init__(
+ self, appname=None, verbose=0, version=__version__, base_dir=None,
+ sort_by_name=False, *args, **kwargs):
+
+ self._map = dict()
+ self._sort_by_name = False
+
+ super(ModuleInfoDict, self).__init__(
+ appname=appname, verbose=verbose, version=version,
+ base_dir=base_dir, initialized=False,
+ )
+
+ self.sort_by_name = sort_by_name
+
+ for arg in args:
+ self.append(arg)
+
+ # -----------------------------------------------------------
+ @property
+ def sort_by_name(self):
+ """Sorting modules by name and vendor, instead of the full name."""
+ return self._sort_by_name
+
+ @sort_by_name.setter
+ def sort_by_name(self, value):
+ self._sort_by_name = to_bool(value)
+
+ # -------------------------------------------------------------------------
+ def _set_item(self, key, module_info):
+
+ if not isinstance(module_info, BaseModuleInfo):
+ raise TypeError(self.msg_invalid_modinfo_type.format(module_info.__class__.__name__))
+
+ full_name = module_info.full_name
+ if full_name != key.lower():
+ raise KeyError(self.msg_key_not_name.format(k=key, n=full_name))
+
+ self._map[full_name] = module_info
+
+ # -------------------------------------------------------------------------
+ def append(self, module_info):
+
+ if not isinstance(module_info, BaseModuleInfo):
+ raise TypeError(self.msg_invalid_modinfo_type.format(module_info.__class__.__name__))
+ self._set_item(module_info.full_name, module_info)
+
+ # -------------------------------------------------------------------------
+ def as_dict(self, short=True):
+
+ res = super(ModuleInfoDict, self).as_dict(short=short)
+
+ res['sort_by_name'] = self.sort_by_name
+ res['items'] = {}
+ res['keys'] = []
+ for full_name in self.keys():
+ res['items'][full_name] = self._map[full_name].as_dict(short)
+ res['keys'].append(str(full_name))
+
+ return res
+
+ # -------------------------------------------------------------------------
+ def _get_item(self, key):
+
+ if key is None:
+ raise TypeError(self.msg_none_type_error)
+
+ full_name = str(key).lower().strip()
+ if full_name == '':
+ raise ValueError(self.msg_empty_key_error.format(key))
+
+ return self._map[full_name]
+
+ # -------------------------------------------------------------------------
+ def get(self, key):
+ return self._get_item(key)
+
+ # -------------------------------------------------------------------------
+ def _del_item(self, key, strict=True):
+
+ if key is None:
+ raise TypeError(self.msg_none_type_error)
+
+ full_name = str(key).lower().strip()
+ if full_name == '':
+ raise ValueError(self.msg_empty_key_error.format(key))
+
+ if not strict and full_name not in self._map:
+ return
+
+ del self._map[full_name]
+
+ # -------------------------------------------------------------------------
+ def merge(self, item):
+
+ if not isinstance(item, BaseModuleInfo):
+ raise TypeError(self.msg_invalid_modinfo_type.format(item.__class__.__name__))
+
+ full_name = item.full_name
+ if full_name in self._map.keys():
+ if self.verbose > 2:
+ LOG.debug("Merging module {!r}.".format(full_name))
+ self._map[full_name].merge_in(item)
+ else:
+ if self.verbose > 2:
+ LOG.debug("New module {!r}.".format(full_name))
+ self._set_item(full_name, item)
+
+ # -------------------------------------------------------------------------
+ # The next five methods are requirements of the ABC.
+ def __setitem__(self, key, value):
+ self._set_item(key, value)
+
+ # -------------------------------------------------------------------------
+ def __getitem__(self, key):
+ return self._get_item(key)
+
+ # -------------------------------------------------------------------------
+ def __delitem__(self, key):
+ self._del_item(key)
+
+ # -------------------------------------------------------------------------
+ def __iter__(self):
+
+ for full_name in self.keys():
+ yield full_name
+
+ # -------------------------------------------------------------------------
+ def __len__(self):
+ return len(self._map)
+
+ # -------------------------------------------------------------------------
+ # The next methods aren't required, but nice for different purposes:
+ def __str__(self):
+ '''returns simple dict representation of the mapping'''
+ return str(self._map)
+
+ # -------------------------------------------------------------------------
+ def __contains__(self, key):
+ if key is None:
+ raise TypeError(self.msg_none_type_error)
+
+ full_name = str(key).lower().strip()
+ if full_name == '':
+ raise ValueError(self.msg_empty_key_error.format(key))
+
+ if self.verbose > 4:
+ LOG.debug("Searching for key {!r} ...".format(key))
+
+ return full_name in self._map
+
+ # -------------------------------------------------------------------------
+ def keys(self):
+
+ def compare_items(x, y):
+ if self.sort_by_name:
+ if self.verbose > 4:
+ LOG.debug("Comparing names {!r} > {!r}".format(x.name, y.name))
+ if x.name.lower() != y.name.lower():
+ if x.name.lower() > y.name.lower():
+ return 1
+ return -1
+ if x.name != y.name:
+ if x.name < y.name:
+ return 1
+ return -1
+ # From here the names are completely identic.
+ if self.verbose > 4:
+ LOG.debug("Comparing vendor {!r} > {!r}".format(x.vendor, y.vendor))
+ if x.vendor is None and y.vendor is None:
+ return 0
+ if x.vendor is None:
+ return -1
+ if y.vendor is None:
+ return -1
+ if x.vendor.lower() != y.vendor.lower():
+ if x.vendor.lower() > y.vendor.lower():
+ return 1
+ return -1
+ if x.vendor > y.vendor:
+ return 1
+ if x.vendor < y.vendor:
+ return -1
+ return 0
+ if self.verbose > 4:
+ LOG.debug("Comparing by full names {!r} > {!r}".format(x.full_name, y.full_name))
+ if x != y:
+ if x > y:
+ return 1
+ return -1
+ return 0
+
+ return sorted(
+ self._map.keys(),
+ key=lambda x: cmp_to_key(compare_items)(self._map[x]))
+
+ # -------------------------------------------------------------------------
+ def items(self):
+
+ item_list = []
+
+ for full_name in self.keys():
+ item_list.append((full_name, self._map[full_name]))
+
+ return item_list
+
+ # -------------------------------------------------------------------------
+ def values(self):
+
+ value_list = []
+ for full_name in self.keys():
+ value_list.append(self._map[full_name])
+ return value_list
+
+ # -------------------------------------------------------------------------
+ def __eq__(self, other):
+
+ if not isinstance(other, ModuleInfoDict):
+ raise TypeError(self.msg_no_modinfo_dict.format(other))
+
+ return self._map == other._map
+
+ # -------------------------------------------------------------------------
+ def __ne__(self, other):
+
+ if not isinstance(other, ModuleInfoDict):
+ raise TypeError(self.msg_no_modinfo_dict.format(other))
+
+ return self._map != other._map
+
+ # -------------------------------------------------------------------------
+ def pop(self, key, *args):
+
+ if key is None:
+ raise TypeError(self.msg_none_type_error)
+
+ full_name = str(key).lower().strip()
+ if full_name == '':
+ raise ValueError(self.msg_empty_key_error.format(key))
+
+ return self._map.pop(full_name, *args)
+
+ # -------------------------------------------------------------------------
+ def popitem(self):
+
+ if not len(self._map):
+ return None
+
+ full_name = self.keys()[0]
+ zone = self._map[full_name]
+ del self._map[full_name]
+ return (full_name, zone)
+
+ # -------------------------------------------------------------------------
+ def clear(self):
+ self._map = dict()
+
+ # -------------------------------------------------------------------------
+ def setdefault(self, key, default):
+
+ if key is None:
+ raise TypeError(self.msg_none_type_error)
+
+ full_name = str(key).lower().strip()
+ if full_name == '':
+ raise ValueError(self.msg_empty_key_error.format(key))
+
+ if not isinstance(default, BaseModuleInfo):
+ raise TypeError(self.msg_invalid_modinfo_type.format(default.__class__.__name__))
+
+ if full_name in self._map:
+ return self._map[full_name]
+
+ self._set_item(full_name, default)
+ return default
+
+ # -------------------------------------------------------------------------
+ def update(self, other):
+
+ if isinstance(other, ModuleInfoDict) or isinstance(other, dict):
+ for full_name in other.keys():
+ self._set_item(full_name, other[full_name])
+ return
+
+ for tokens in other:
+ key = tokens[0]
+ value = tokens[1]
+ self._set_item(key, value)
+
+ # -------------------------------------------------------------------------
+ def as_list(self, short=True):
+
+ res = []
+ for full_name in self.keys():
+ res.append(self._map[full_name].as_dict(short))
+ return res
+
+
+# =============================================================================
+
+if __name__ == "__main__":
+
+ pass
+
+# =============================================================================
+# vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4 list
--- /dev/null
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+'''
+@author: Frank Brehm
+@contact: frank@brehm-online.com
+@copyright: © 2023 Frank Brehm, Digitas Pixelpark GmbH Berlin
+@license: GNU AGPL3
+@summary: test script (and module) for unit tests on module dicts
+'''
+
+import os
+import sys
+import logging
+
+from pathlib import Path
+
+try:
+ import unittest2 as unittest
+except ImportError:
+ import unittest
+
+libdir = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', 'lib'))
+sys.path.insert(0, libdir)
+
+from general import DpxPuppetToolsTestcase, get_arg_verbose, init_root_logger
+
+LOG = logging.getLogger('test_moddict')
+
+# =============================================================================
+class TestBaseModdict(DpxPuppetToolsTestcase):
+
+ # -------------------------------------------------------------------------
+ def setUp(self):
+ self.test_dir = Path(__file__).parent.resolve()
+ self.base_dir = self.test_dir.parent
+ self.test_cfg_dir = self.test_dir / 'test-config'
+ self._appname = 'test_moddict'
+
+ # -------------------------------------------------------------------------
+ def test_import(self):
+
+ LOG.info("Testing import of dpx_puppettools.module_list ...")
+ import dpx_puppettools.module_list
+ ver = dpx_puppettools.module_list.__version__
+ LOG.debug(
+ "Version of dpx_puppettools.module_list: " + ver)
+
+ # -------------------------------------------------------------------------
+ def test_init(self):
+
+ LOG.info("Testing init of a ModuleInfoDict object ...")
+ from dpx_puppettools.module_list import ModuleInfoDict
+ from dpx_puppettools.base_moduleinfo import BaseModuleInfo
+ from dpx_puppettools import pp
+
+ module = 'Pixelpark-infra'
+
+ modinfo = BaseModuleInfo(
+ appname=self.appname, verbose=self.verbose, full_name=module)
+
+ moddict = ModuleInfoDict(
+ appname=self.appname, verbose=self.verbose)
+ moddict.append(modinfo)
+
+ LOG.debug("ModuleInfoDict %%r: {!r}".format(moddict))
+ if self.verbose > 2:
+ LOG.debug("ModuleInfoDict %%s:\n{}".format(pp(moddict.as_dict())))
+
+ # -------------------------------------------------------------------------
+ def test_sorting(self):
+
+ LOG.info("Testing sorting of the keys of a ModuleInfoDict object ...")
+
+ from dpx_puppettools.module_list import ModuleInfoDict
+ from dpx_puppettools.base_moduleinfo import BaseModuleInfo
+ from dpx_puppettools import pp
+
+ moddict = ModuleInfoDict(
+ appname=self.appname, verbose=self.verbose)
+
+ mods = ('puppetlabs-apache', 'puppet-uhu', 'pixelpark-infra',
+ 'Puppetlabs-MySQL', 'Pixelpark-Infra', 'hieradata')
+
+ for mod in mods:
+ modinfo = BaseModuleInfo(
+ appname=self.appname, verbose=self.verbose, full_name=mod)
+ moddict.append(modinfo)
+
+ moddict.sort_by_name = False
+ keys = moddict.keys()
+ LOG.debug("Keys sorted by vendor + name:\n{}".format(pp(keys)))
+
+ moddict.sort_by_name = True
+ keys = moddict.keys()
+ LOG.debug("Keys sorted by name + vendor:\n{}".format(pp(keys)))
+
+
+# =============================================================================
+if __name__ == '__main__':
+
+ verbose = get_arg_verbose()
+ if verbose is None:
+ verbose = 0
+ init_root_logger(verbose)
+
+ LOG.info("Starting tests ...")
+
+ suite = unittest.TestSuite()
+
+ suite.addTest(TestBaseModdict('test_import', verbose))
+ suite.addTest(TestBaseModdict('test_init', verbose))
+ suite.addTest(TestBaseModdict('test_sorting', verbose))
+
+ runner = unittest.TextTestRunner(verbosity=verbose)
+
+ result = runner.run(suite)
+
+# =============================================================================
+
+# vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4