]> Frank Brehm's Git Trees - pixelpark/puppet-tools.git/commitdiff
Adding module dpx_puppettools.module_list for class ModuleInfoDict
authorFrank Brehm <frank.brehm@pixelpark.com>
Wed, 8 Feb 2023 15:45:11 +0000 (16:45 +0100)
committerFrank Brehm <frank.brehm@pixelpark.com>
Wed, 8 Feb 2023 15:45:11 +0000 (16:45 +0100)
lib/dpx_puppettools/module_list.py [new file with mode: 0644]
test/test_23_module_dict.py [new file with mode: 0755]

diff --git a/lib/dpx_puppettools/module_list.py b/lib/dpx_puppettools/module_list.py
new file mode 100644 (file)
index 0000000..28c9205
--- /dev/null
@@ -0,0 +1,359 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+"""
+@author: Frank Brehm
+@contact: frank.brehm@pixelpark.com
+@copyright: © 2023 by Frank Brehm, Digitas Pixelpark GmbH, Berlin
+@summary: A module for a dictionary of BaseModuleInfo objects.
+"""
+from __future__ import absolute_import
+
+# Standard modules
+import logging
+
+from collections.abc import MutableMapping
+
+from functools import cmp_to_key
+
+# Third party modules
+from fb_tools.common import to_bool
+from fb_tools.obj import FbBaseObject
+
+# Own modules
+
+from .base_moduleinfo import BaseModuleInfo
+
+from .xlate import XLATOR
+
+__version__ = '1.4.0'
+
+LOG = logging.getLogger(__name__)
+
+_ = XLATOR.gettext
+ngettext = XLATOR.ngettext
+
+
+# =============================================================================
+class ModuleInfoDict(MutableMapping, FbBaseObject):
+    """
+    A dictionary containing BaseModuleInfo objects.
+    It works like a dict.
+    i.e.:
+    modules = ModuleInfoDict(BaseModuleInfo(full_name='puppet-mongodb', ...))
+    and
+    modules['puppet-mongodb'] returns a BaseModuleInfo object for puppet module 'puppet-mongodb'
+    """
+
+    msg_invalid_modinfo_type = _("Invalid value type {{!r}} to set, only {} allowed.").format(
+        'BaseModuleInfo')
+    msg_key_not_name = _("The key {k!r} must be equal to the full name {n!r} of the module.")
+    msg_none_type_error = _("None type as key is not allowed.")
+    msg_empty_key_error = _("Empty key {!r} is not allowed.")
+    msg_no_modinfo_dict = _("Object {{!r}} is not a {} object.").format('ModuleInfoDict')
+
+    # -------------------------------------------------------------------------
+    def __init__(
+        self, appname=None, verbose=0, version=__version__, base_dir=None,
+            sort_by_name=False, *args, **kwargs):
+
+        self._map = dict()
+        self._sort_by_name = False
+
+        super(ModuleInfoDict, self).__init__(
+            appname=appname, verbose=verbose, version=version,
+            base_dir=base_dir, initialized=False,
+        )
+
+        self.sort_by_name = sort_by_name
+
+        for arg in args:
+            self.append(arg)
+
+    # -----------------------------------------------------------
+    @property
+    def sort_by_name(self):
+        """Sorting modules by name and vendor, instead of the full name."""
+        return self._sort_by_name
+
+    @sort_by_name.setter
+    def sort_by_name(self, value):
+        self._sort_by_name = to_bool(value)
+
+    # -------------------------------------------------------------------------
+    def _set_item(self, key, module_info):
+
+        if not isinstance(module_info, BaseModuleInfo):
+            raise TypeError(self.msg_invalid_modinfo_type.format(module_info.__class__.__name__))
+
+        full_name = module_info.full_name
+        if full_name != key.lower():
+            raise KeyError(self.msg_key_not_name.format(k=key, n=full_name))
+
+        self._map[full_name] = module_info
+
+    # -------------------------------------------------------------------------
+    def append(self, module_info):
+
+        if not isinstance(module_info, BaseModuleInfo):
+            raise TypeError(self.msg_invalid_modinfo_type.format(module_info.__class__.__name__))
+        self._set_item(module_info.full_name, module_info)
+
+    # -------------------------------------------------------------------------
+    def as_dict(self, short=True):
+
+        res = super(ModuleInfoDict, self).as_dict(short=short)
+
+        res['sort_by_name'] = self.sort_by_name
+        res['items'] = {}
+        res['keys'] = []
+        for full_name in self.keys():
+            res['items'][full_name] = self._map[full_name].as_dict(short)
+            res['keys'].append(str(full_name))
+
+        return res
+
+    # -------------------------------------------------------------------------
+    def _get_item(self, key):
+
+        if key is None:
+            raise TypeError(self.msg_none_type_error)
+
+        full_name = str(key).lower().strip()
+        if full_name == '':
+            raise ValueError(self.msg_empty_key_error.format(key))
+
+        return self._map[full_name]
+
+    # -------------------------------------------------------------------------
+    def get(self, key):
+        return self._get_item(key)
+
+    # -------------------------------------------------------------------------
+    def _del_item(self, key, strict=True):
+
+        if key is None:
+            raise TypeError(self.msg_none_type_error)
+
+        full_name = str(key).lower().strip()
+        if full_name == '':
+            raise ValueError(self.msg_empty_key_error.format(key))
+
+        if not strict and full_name not in self._map:
+            return
+
+        del self._map[full_name]
+
+    # -------------------------------------------------------------------------
+    def merge(self, item):
+
+        if not isinstance(item, BaseModuleInfo):
+            raise TypeError(self.msg_invalid_modinfo_type.format(item.__class__.__name__))
+
+        full_name = item.full_name
+        if full_name in self._map.keys():
+            if self.verbose > 2:
+                LOG.debug("Merging module {!r}.".format(full_name))
+            self._map[full_name].merge_in(item)
+        else:
+            if self.verbose > 2:
+                LOG.debug("New module {!r}.".format(full_name))
+            self._set_item(full_name, item)
+
+    # -------------------------------------------------------------------------
+    # The next five methods are requirements of the ABC.
+    def __setitem__(self, key, value):
+        self._set_item(key, value)
+
+    # -------------------------------------------------------------------------
+    def __getitem__(self, key):
+        return self._get_item(key)
+
+    # -------------------------------------------------------------------------
+    def __delitem__(self, key):
+        self._del_item(key)
+
+    # -------------------------------------------------------------------------
+    def __iter__(self):
+
+        for full_name in self.keys():
+            yield full_name
+
+    # -------------------------------------------------------------------------
+    def __len__(self):
+        return len(self._map)
+
+    # -------------------------------------------------------------------------
+    # The next methods aren't required, but nice for different purposes:
+    def __str__(self):
+        '''returns simple dict representation of the mapping'''
+        return str(self._map)
+
+    # -------------------------------------------------------------------------
+    def __contains__(self, key):
+        if key is None:
+            raise TypeError(self.msg_none_type_error)
+
+        full_name = str(key).lower().strip()
+        if full_name == '':
+            raise ValueError(self.msg_empty_key_error.format(key))
+
+        if self.verbose > 4:
+            LOG.debug("Searching for key {!r} ...".format(key))
+
+        return full_name in self._map
+
+    # -------------------------------------------------------------------------
+    def keys(self):
+
+        def compare_items(x, y):
+            if self.sort_by_name:
+                if self.verbose > 4:
+                    LOG.debug("Comparing names {!r} > {!r}".format(x.name, y.name))
+                if x.name.lower() != y.name.lower():
+                    if x.name.lower() > y.name.lower():
+                        return 1
+                    return -1
+                if x.name != y.name:
+                    if x.name < y.name:
+                        return 1
+                    return -1
+                # From here the names are completely identic.
+                if self.verbose > 4:
+                    LOG.debug("Comparing vendor {!r} > {!r}".format(x.vendor, y.vendor))
+                if x.vendor is None and y.vendor is None:
+                    return 0
+                if x.vendor is None:
+                    return -1
+                if y.vendor is None:
+                    return -1
+                if x.vendor.lower() != y.vendor.lower():
+                    if x.vendor.lower() > y.vendor.lower():
+                        return 1
+                    return -1
+                if x.vendor > y.vendor:
+                    return 1
+                if x.vendor < y.vendor:
+                    return -1
+                return 0
+            if self.verbose > 4:
+                LOG.debug("Comparing by full names {!r} > {!r}".format(x.full_name, y.full_name))
+            if x != y:
+                if x > y:
+                    return 1
+                return -1
+            return 0
+
+        return sorted(
+            self._map.keys(),
+            key=lambda x: cmp_to_key(compare_items)(self._map[x]))
+
+    # -------------------------------------------------------------------------
+    def items(self):
+
+        item_list = []
+
+        for full_name in self.keys():
+            item_list.append((full_name, self._map[full_name]))
+
+        return item_list
+
+    # -------------------------------------------------------------------------
+    def values(self):
+
+        value_list = []
+        for full_name in self.keys():
+            value_list.append(self._map[full_name])
+        return value_list
+
+    # -------------------------------------------------------------------------
+    def __eq__(self, other):
+
+        if not isinstance(other, ModuleInfoDict):
+            raise TypeError(self.msg_no_modinfo_dict.format(other))
+
+        return self._map == other._map
+
+    # -------------------------------------------------------------------------
+    def __ne__(self, other):
+
+        if not isinstance(other, ModuleInfoDict):
+            raise TypeError(self.msg_no_modinfo_dict.format(other))
+
+        return self._map != other._map
+
+    # -------------------------------------------------------------------------
+    def pop(self, key, *args):
+
+        if key is None:
+            raise TypeError(self.msg_none_type_error)
+
+        full_name = str(key).lower().strip()
+        if full_name == '':
+            raise ValueError(self.msg_empty_key_error.format(key))
+
+        return self._map.pop(full_name, *args)
+
+    # -------------------------------------------------------------------------
+    def popitem(self):
+
+        if not len(self._map):
+            return None
+
+        full_name = self.keys()[0]
+        zone = self._map[full_name]
+        del self._map[full_name]
+        return (full_name, zone)
+
+    # -------------------------------------------------------------------------
+    def clear(self):
+        self._map = dict()
+
+    # -------------------------------------------------------------------------
+    def setdefault(self, key, default):
+
+        if key is None:
+            raise TypeError(self.msg_none_type_error)
+
+        full_name = str(key).lower().strip()
+        if full_name == '':
+            raise ValueError(self.msg_empty_key_error.format(key))
+
+        if not isinstance(default, BaseModuleInfo):
+            raise TypeError(self.msg_invalid_modinfo_type.format(default.__class__.__name__))
+
+        if full_name in self._map:
+            return self._map[full_name]
+
+        self._set_item(full_name, default)
+        return default
+
+    # -------------------------------------------------------------------------
+    def update(self, other):
+
+        if isinstance(other, ModuleInfoDict) or isinstance(other, dict):
+            for full_name in other.keys():
+                self._set_item(full_name, other[full_name])
+            return
+
+        for tokens in other:
+            key = tokens[0]
+            value = tokens[1]
+            self._set_item(key, value)
+
+    # -------------------------------------------------------------------------
+    def as_list(self, short=True):
+
+        res = []
+        for full_name in self.keys():
+            res.append(self._map[full_name].as_dict(short))
+        return res
+
+
+# =============================================================================
+
+if __name__ == "__main__":
+
+    pass
+
+# =============================================================================
+# vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4 list
diff --git a/test/test_23_module_dict.py b/test/test_23_module_dict.py
new file mode 100755 (executable)
index 0000000..25963a8
--- /dev/null
@@ -0,0 +1,120 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+'''
+@author: Frank Brehm
+@contact: frank@brehm-online.com
+@copyright: © 2023 Frank Brehm, Digitas Pixelpark GmbH Berlin
+@license: GNU AGPL3
+@summary: test script (and module) for unit tests on module dicts
+'''
+
+import os
+import sys
+import logging
+
+from pathlib import Path
+
+try:
+    import unittest2 as unittest
+except ImportError:
+    import unittest
+
+libdir = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', 'lib'))
+sys.path.insert(0, libdir)
+
+from general import DpxPuppetToolsTestcase, get_arg_verbose, init_root_logger
+
+LOG = logging.getLogger('test_moddict')
+
+# =============================================================================
+class TestBaseModdict(DpxPuppetToolsTestcase):
+
+    # -------------------------------------------------------------------------
+    def setUp(self):
+        self.test_dir = Path(__file__).parent.resolve()
+        self.base_dir = self.test_dir.parent
+        self.test_cfg_dir = self.test_dir / 'test-config'
+        self._appname = 'test_moddict'
+
+    # -------------------------------------------------------------------------
+    def test_import(self):
+
+        LOG.info("Testing import of dpx_puppettools.module_list ...")
+        import dpx_puppettools.module_list
+        ver = dpx_puppettools.module_list.__version__
+        LOG.debug(
+            "Version of dpx_puppettools.module_list: " + ver)
+
+    # -------------------------------------------------------------------------
+    def test_init(self):
+
+        LOG.info("Testing init of a ModuleInfoDict object ...")
+        from dpx_puppettools.module_list import ModuleInfoDict
+        from dpx_puppettools.base_moduleinfo import BaseModuleInfo
+        from dpx_puppettools import pp
+
+        module = 'Pixelpark-infra'
+
+        modinfo = BaseModuleInfo(
+            appname=self.appname, verbose=self.verbose, full_name=module)
+
+        moddict = ModuleInfoDict(
+            appname=self.appname, verbose=self.verbose)
+        moddict.append(modinfo)
+
+        LOG.debug("ModuleInfoDict %%r: {!r}".format(moddict))
+        if self.verbose > 2:
+            LOG.debug("ModuleInfoDict %%s:\n{}".format(pp(moddict.as_dict())))
+
+    # -------------------------------------------------------------------------
+    def test_sorting(self):
+
+        LOG.info("Testing sorting of the keys of a ModuleInfoDict object ...")
+
+        from dpx_puppettools.module_list import ModuleInfoDict
+        from dpx_puppettools.base_moduleinfo import BaseModuleInfo
+        from dpx_puppettools import pp
+
+        moddict = ModuleInfoDict(
+            appname=self.appname, verbose=self.verbose)
+
+        mods = ('puppetlabs-apache', 'puppet-uhu', 'pixelpark-infra',
+            'Puppetlabs-MySQL', 'Pixelpark-Infra', 'hieradata')
+
+        for mod in mods:
+            modinfo = BaseModuleInfo(
+                appname=self.appname, verbose=self.verbose, full_name=mod)
+            moddict.append(modinfo)
+
+        moddict.sort_by_name = False
+        keys = moddict.keys()
+        LOG.debug("Keys sorted by vendor + name:\n{}".format(pp(keys)))
+
+        moddict.sort_by_name = True
+        keys = moddict.keys()
+        LOG.debug("Keys sorted by name + vendor:\n{}".format(pp(keys)))
+
+
+# =============================================================================
+if __name__ == '__main__':
+
+    verbose = get_arg_verbose()
+    if verbose is None:
+        verbose = 0
+    init_root_logger(verbose)
+
+    LOG.info("Starting tests ...")
+
+    suite = unittest.TestSuite()
+
+    suite.addTest(TestBaseModdict('test_import', verbose))
+    suite.addTest(TestBaseModdict('test_init', verbose))
+    suite.addTest(TestBaseModdict('test_sorting', verbose))
+
+    runner = unittest.TextTestRunner(verbosity=verbose)
+
+    result = runner.run(suite)
+
+# =============================================================================
+
+# vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4