]> Frank Brehm's Git Trees - pixelpark/puppet-tools.git/commitdiff
Adding module dpx_puppettools.puppetfile for class Puppetfile
authorFrank Brehm <frank.brehm@pixelpark.com>
Wed, 8 Feb 2023 16:24:53 +0000 (17:24 +0100)
committerFrank Brehm <frank.brehm@pixelpark.com>
Wed, 8 Feb 2023 16:24:53 +0000 (17:24 +0100)
lib/dpx_puppettools/puppetfile.py [new file with mode: 0644]

diff --git a/lib/dpx_puppettools/puppetfile.py b/lib/dpx_puppettools/puppetfile.py
new file mode 100644 (file)
index 0000000..64dd9cb
--- /dev/null
@@ -0,0 +1,380 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+"""
+@author: Frank Brehm
+@contact: frank.brehm@pixelpark.com
+@copyright: © 2023 by Frank Brehm, Digitas Pixelpark GmbH, Berlin
+@summary: A module for encapsulating all information about a Puppetfile
+          of a r10k environment
+"""
+from __future__ import absolute_import
+
+# Standard modules
+import logging
+import os
+import re
+import pwd
+import grp
+
+from pathlib import Path
+
+# Third party modules
+import six
+from fb_tools.common import pp, to_str, to_bool, is_sequence
+from fb_tools.obj import FbBaseObjectError, FbBaseObject
+
+# Own modules
+from .pfile_moduleinfo import PuppetfileModuleInfo
+
+from .errors import BaseModuleInfoError
+
+from .module_list import ModuleInfoDict
+
+from .xlate import XLATOR
+
+__version__ = '1.3.0'
+
+LOG = logging.getLogger(__name__)
+
+_ = XLATOR.gettext
+ngettext = XLATOR.ngettext
+
+
+# =============================================================================
+class PuppetfileError(FbBaseObjectError):
+
+    pass
+
+
+# =============================================================================
+class Puppetfile(FbBaseObject):
+    """Class for encapsulating a Puppetfile of a r10k environment."""
+
+    default_environment = 'production'
+    default_env_root_dir = os.sep + os.path.join("etc", "puppetlabs", "code", "environments")
+    default_env_root_dir = Path(os.sep) / "etc" / "puppetlabs" / "code" / "environments"
+    default_moduledir = Path('modules')
+    default_forge_url = 'https://forge.puppetlabs.com'
+
+    re_comment = re.compile(r'^\s*#')
+    re_inline_comment = re.compile(r'\s+#.*')
+    re_comma_at_end = re.compile(r',\s*$')
+
+    re_forge = re.compile(r'^\s*forge\s+[\'"]([^\'"]+)[\'"]\s*$', re.IGNORECASE)
+    re_moduledir = re.compile(r'^\s*moduledir\s+[\'"]([^\'"]+)[\'"]\s*$', re.IGNORECASE)
+
+    open_args = {}
+    if six.PY3:
+        open_args = {
+            'encoding': 'utf-8',
+            'errors': 'surrogateescape',
+        }
+
+    # -------------------------------------------------------------------------
+    def __init__(
+        self, env_root_dir=None, environment=None,
+            appname=None, verbose=0, version=__version__, base_dir=None, initialized=None):
+
+        self._env_root_dir = self.default_env_root_dir
+        self._environment = self.default_environment
+        self._moduledir = self.default_moduledir
+        self._forge_url = self.default_forge_url
+        self._stat = None
+        self.modules = None
+
+        super(Puppetfile, self).__init__(
+            appname=appname, verbose=verbose, version=version,
+            base_dir=base_dir, initialized=False,
+        )
+
+        self.modules = ModuleInfoDict(
+            appname=self.appname, verbose=self.verbose, base_dir=self.base_dir)
+
+        if env_root_dir is not None:
+            self.env_root_dir = env_root_dir
+
+        if environment is not None:
+            self.environment = environment
+
+        if initialized is not None:
+            self.initialized = initialized
+
+    # -------------------------------------------------------------------------
+    @property
+    def env_root_dir(self):
+        """The root directory of all puppet environments."""
+        return self._env_root_dir
+
+    @env_root_dir.setter
+    def env_root_dir(self, value):
+        if value is None:
+            msg = _("The root directory of all puppet environments may not be None.")
+            raise TypeError(msg)
+
+        val = Path(value)
+        if not val.is_absolute():
+            msg = _(
+                "Path {!r} for the root directory of all puppet environments must "
+                "be an absolute path.").format(value)
+            raise ValueError(msg)
+        self._env_root_dir = val
+
+    # -------------------------------------------------------------------------
+    @property
+    def moduledir(self):
+        """Specifies where modules from the Puppetfile will be installed. This
+        is either an absolute path or it is relative to the directory of the Puppetfile."""
+        return self._moduledir
+
+    @moduledir.setter
+    def moduledir(self, value):
+        if value is None:
+            msg = _("The module directory must not be None.")
+            raise TypeError(msg)
+
+        self._moduledir = Path(value)
+
+    # -------------------------------------------------------------------------
+    @property
+    def forge_url(self):
+        """Specifies which server that Forge based modules are fetched from."""
+        return self._forge_url
+
+    @forge_url.setter
+    def forge_url(self, value):
+        if value is None:
+            msg = _("The forge URL must not be None.")
+            raise TypeError(msg)
+        self._forge_url = str(value).strip()
+
+    # -------------------------------------------------------------------------
+    @property
+    def environment(self):
+        """The r10k environment of this Puppetfile."""
+        return self._environment
+
+    @environment.setter
+    def environment(self, value):
+        if value is None:
+            msg = "The Puppet environment may not be None."
+            raise TypeError(msg)
+
+        val = str(value).strip()
+        if val == '':
+            msg = "The Puppet environment may not be empty."
+            raise ValueError(msg)
+
+        self._environment = val
+
+    # -------------------------------------------------------------------------
+    @property
+    def env_dir(self):
+        """The directory containing the r10k environmenmt."""
+        return self.env_root_dir.joinpath(self.environment).resolve()
+
+    # -------------------------------------------------------------------------
+    @property
+    def file_path(self):
+        """The Puppetfile as a Path object."""
+        return self.env_dir / 'Puppetfile'
+
+    # -------------------------------------------------------------------------
+    @property
+    def filename(self):
+        """The complete path of the Puppetfile as a str object."""
+        return str(self.file_path)
+
+    # -------------------------------------------------------------------------
+    def _get_file_stat(self, forced=False):
+
+        if self._stat is not None and not forced:
+            return
+
+        if self.verbose > 2:
+            LOG.debug("Getting file status of {!r}.".format(self.filename))
+        if not self.exists:
+            self._stat = None
+            return
+
+        self._stat = self.file_path.stat()
+
+    # -------------------------------------------------------------------------
+    @property
+    def exists(self):
+        """A flag, whether the Puppetfile exists."""
+        return self.file_path.exists()
+
+    # -------------------------------------------------------------------------
+    @property
+    def readable(self):
+        """A flag indicating, that the puppetfile is readable."""
+        if not self.exists:
+            return False
+
+        if not os.access(self.filename, os.R_OK):
+            return False
+
+        return True
+
+    # -------------------------------------------------------------------------
+    @property
+    def stat(self):
+        """The file status of the Puppetfile."""
+        self._get_file_stat()
+        return self._stat
+
+    # -------------------------------------------------------------------------
+    @property
+    def owner(self):
+        """The owner of the Puppetfile as a string."""
+        if self.stat is None:
+            return None
+
+        owner_name = None
+        try:
+            owner_name = pwd.getpwuid(self.stat.st_uid).pw_name
+        except KeyError:
+            owner_name = "{}".format(self.stat.st_uid)
+        return owner_name
+
+    # -------------------------------------------------------------------------
+    @property
+    def group(self):
+        """The owning group of the Puppetfile as a string."""
+        if self.stat is None:
+            return None
+
+        group_name = None
+        try:
+            group_name = grp.getgrgid(self.stat.st_gid).gr_name
+        except KeyError:
+            group_name = "{}".format(self.stat.st_gid)
+        return group_name
+
+    # -------------------------------------------------------------------------
+    def as_dict(self, short=True):
+        """
+        Transforms the elements of the object into a dict
+
+        @return: structure as dict
+        @rtype:  dict
+        """
+
+        res = super(Puppetfile, self).as_dict(short=short)
+
+        res['default_environment'] = self.default_environment
+        res['default_env_root_dir'] = self.default_env_root_dir
+        res['default_forge_url'] = self.default_forge_url
+        res['default_moduledir'] = self.default_moduledir
+        res['environment'] = self.environment
+        res['env_root_dir'] = self.env_root_dir
+        res['env_dir'] = self.env_dir
+        res['exists'] = self.exists
+        res['filename'] = self.filename
+        res['file_path'] = self.file_path
+        res['forge_url'] = self.forge_url
+        res['group'] = self.group
+        res['moduledir'] = self.moduledir
+        res['open_args'] = self.open_args
+        res['owner'] = self.owner
+        res['readable'] = self.readable
+        res['stat'] = self.stat
+
+        return res
+
+    # -------------------------------------------------------------------------
+    def read(self):
+        """Reads the current Puppetfile and update self.modules."""
+
+        if self.verbose > 1:
+            LOG.debug("Searching {!r} ...".format(self.filename))
+
+        if not self.exists:
+            msg = _("Puppetfile {!r} does not exists.").format(self.filename)
+            raise PuppetfileError(msg)
+
+        if not self.readable:
+            msg = _("Puppetfile {!r} is not readable.").format(self.filename)
+            raise PuppetfileError(msg)
+
+        self.modules = ModuleInfoDict(
+            appname=self.appname, verbose=self.verbose, base_dir=self.base_dir)
+
+        LOG.debug("Reading {!r} ...".format(self.filename))
+        line_nr = 0
+        prev_line = ''
+
+        with self.file_path.open('r', **self.open_args) as fh:
+
+            for line in fh.readlines():
+
+                line_nr += 1
+                line = line.strip()
+                if not line:
+                    continue
+                if self.re_comment.match(line):
+                    continue
+
+                if self.verbose > 3:
+                    LOG.debug("Read line {!r}...".format(line))
+
+                line = self.re_inline_comment.sub('', line)
+
+                prev_line += line
+                if self.re_comma_at_end.search(line):
+                    continue
+
+                if self.verbose > 3:
+                    LOG.debug("Evaluating line {!r}...".format(prev_line))
+
+                match = self.re_forge(prev_line)
+                if match:
+                    self.forge_url = match.group(1)
+                    continue
+
+                match = self.re_moduledir(prev_line)
+                if match:
+                    self.re_moduledir = match.group(1)
+                    continue
+
+                try:
+                    module_info = ModuleInfo.init_from_puppetfile_line(
+                        appname=self.appname, verbose=self.verbose, base_dir=self.base_dir,
+                        line=prev_line)
+                    self.modules.append(module_info)
+                except BaseModuleInfoError as e:
+                    msg = _("{ec} on reading Puppetfile {lnr!r} ({nr}): {e}").format(
+                        ec=e.__class__.__name__, fn=str(self.file_path),
+                        lnr=line_nr, e=e)
+                    msg += _("Module definition was: {!r}").format(prev_line)
+
+                prev_line = ''
+
+        if self.verbose > 1:
+            LOG.debug("Closing {!r} ...".format(self.filename))
+
+        # after looping all lines there is something left
+        if prev_line:
+            if self.verbose > 3:
+                LOG.debug("Evaluating line {!r}...".format(prev_line))
+            try:
+                module_info = ModuleInfo.init_from_puppetfile_line(
+                    appname=self.appname, verbose=self.verbose, base_dir=self.base_dir,
+                    line=prev_line)
+                self.modules.append(module_info)
+            except BaseModuleInfoError as e:
+                msg = _("{ec} on reading Puppetfile {lnr!r} ({nr}): {e}").format(
+                    ec=e.__class__.__name__, fn=str(self.file_path),
+                    lnr=line_nr, e=e)
+                msg += _("Module definition was: {!r}").format(prev_line)
+
+
+# =============================================================================
+
+if __name__ == "__main__":
+
+    pass
+
+# =============================================================================
+
+# vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4 list