--- /dev/null
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+"""
+@author: Frank Brehm
+@contact: frank.brehm@pixelpark.com
+@copyright: © 2023 by Frank Brehm, Digitas Pixelpark GmbH, Berlin
+@summary: A module for encapsulating all information about a Puppetfile
+ of a r10k environment
+"""
+from __future__ import absolute_import
+
+# Standard modules
+import logging
+import os
+import re
+import pwd
+import grp
+
+from pathlib import Path
+
+# Third party modules
+import six
+from fb_tools.common import pp, to_str, to_bool, is_sequence
+from fb_tools.obj import FbBaseObjectError, FbBaseObject
+
+# Own modules
+from .pfile_moduleinfo import PuppetfileModuleInfo
+
+from .errors import BaseModuleInfoError
+
+from .module_list import ModuleInfoDict
+
+from .xlate import XLATOR
+
+__version__ = '1.3.0'
+
+LOG = logging.getLogger(__name__)
+
+_ = XLATOR.gettext
+ngettext = XLATOR.ngettext
+
+
+# =============================================================================
+class PuppetfileError(FbBaseObjectError):
+
+ pass
+
+
+# =============================================================================
+class Puppetfile(FbBaseObject):
+ """Class for encapsulating a Puppetfile of a r10k environment."""
+
+ default_environment = 'production'
+ default_env_root_dir = os.sep + os.path.join("etc", "puppetlabs", "code", "environments")
+ default_env_root_dir = Path(os.sep) / "etc" / "puppetlabs" / "code" / "environments"
+ default_moduledir = Path('modules')
+ default_forge_url = 'https://forge.puppetlabs.com'
+
+ re_comment = re.compile(r'^\s*#')
+ re_inline_comment = re.compile(r'\s+#.*')
+ re_comma_at_end = re.compile(r',\s*$')
+
+ re_forge = re.compile(r'^\s*forge\s+[\'"]([^\'"]+)[\'"]\s*$', re.IGNORECASE)
+ re_moduledir = re.compile(r'^\s*moduledir\s+[\'"]([^\'"]+)[\'"]\s*$', re.IGNORECASE)
+
+ open_args = {}
+ if six.PY3:
+ open_args = {
+ 'encoding': 'utf-8',
+ 'errors': 'surrogateescape',
+ }
+
+ # -------------------------------------------------------------------------
+ def __init__(
+ self, env_root_dir=None, environment=None,
+ appname=None, verbose=0, version=__version__, base_dir=None, initialized=None):
+
+ self._env_root_dir = self.default_env_root_dir
+ self._environment = self.default_environment
+ self._moduledir = self.default_moduledir
+ self._forge_url = self.default_forge_url
+ self._stat = None
+ self.modules = None
+
+ super(Puppetfile, self).__init__(
+ appname=appname, verbose=verbose, version=version,
+ base_dir=base_dir, initialized=False,
+ )
+
+ self.modules = ModuleInfoDict(
+ appname=self.appname, verbose=self.verbose, base_dir=self.base_dir)
+
+ if env_root_dir is not None:
+ self.env_root_dir = env_root_dir
+
+ if environment is not None:
+ self.environment = environment
+
+ if initialized is not None:
+ self.initialized = initialized
+
+ # -------------------------------------------------------------------------
+ @property
+ def env_root_dir(self):
+ """The root directory of all puppet environments."""
+ return self._env_root_dir
+
+ @env_root_dir.setter
+ def env_root_dir(self, value):
+ if value is None:
+ msg = _("The root directory of all puppet environments may not be None.")
+ raise TypeError(msg)
+
+ val = Path(value)
+ if not val.is_absolute():
+ msg = _(
+ "Path {!r} for the root directory of all puppet environments must "
+ "be an absolute path.").format(value)
+ raise ValueError(msg)
+ self._env_root_dir = val
+
+ # -------------------------------------------------------------------------
+ @property
+ def moduledir(self):
+ """Specifies where modules from the Puppetfile will be installed. This
+ is either an absolute path or it is relative to the directory of the Puppetfile."""
+ return self._moduledir
+
+ @moduledir.setter
+ def moduledir(self, value):
+ if value is None:
+ msg = _("The module directory must not be None.")
+ raise TypeError(msg)
+
+ self._moduledir = Path(value)
+
+ # -------------------------------------------------------------------------
+ @property
+ def forge_url(self):
+ """Specifies which server that Forge based modules are fetched from."""
+ return self._forge_url
+
+ @forge_url.setter
+ def forge_url(self, value):
+ if value is None:
+ msg = _("The forge URL must not be None.")
+ raise TypeError(msg)
+ self._forge_url = str(value).strip()
+
+ # -------------------------------------------------------------------------
+ @property
+ def environment(self):
+ """The r10k environment of this Puppetfile."""
+ return self._environment
+
+ @environment.setter
+ def environment(self, value):
+ if value is None:
+ msg = "The Puppet environment may not be None."
+ raise TypeError(msg)
+
+ val = str(value).strip()
+ if val == '':
+ msg = "The Puppet environment may not be empty."
+ raise ValueError(msg)
+
+ self._environment = val
+
+ # -------------------------------------------------------------------------
+ @property
+ def env_dir(self):
+ """The directory containing the r10k environmenmt."""
+ return self.env_root_dir.joinpath(self.environment).resolve()
+
+ # -------------------------------------------------------------------------
+ @property
+ def file_path(self):
+ """The Puppetfile as a Path object."""
+ return self.env_dir / 'Puppetfile'
+
+ # -------------------------------------------------------------------------
+ @property
+ def filename(self):
+ """The complete path of the Puppetfile as a str object."""
+ return str(self.file_path)
+
+ # -------------------------------------------------------------------------
+ def _get_file_stat(self, forced=False):
+
+ if self._stat is not None and not forced:
+ return
+
+ if self.verbose > 2:
+ LOG.debug("Getting file status of {!r}.".format(self.filename))
+ if not self.exists:
+ self._stat = None
+ return
+
+ self._stat = self.file_path.stat()
+
+ # -------------------------------------------------------------------------
+ @property
+ def exists(self):
+ """A flag, whether the Puppetfile exists."""
+ return self.file_path.exists()
+
+ # -------------------------------------------------------------------------
+ @property
+ def readable(self):
+ """A flag indicating, that the puppetfile is readable."""
+ if not self.exists:
+ return False
+
+ if not os.access(self.filename, os.R_OK):
+ return False
+
+ return True
+
+ # -------------------------------------------------------------------------
+ @property
+ def stat(self):
+ """The file status of the Puppetfile."""
+ self._get_file_stat()
+ return self._stat
+
+ # -------------------------------------------------------------------------
+ @property
+ def owner(self):
+ """The owner of the Puppetfile as a string."""
+ if self.stat is None:
+ return None
+
+ owner_name = None
+ try:
+ owner_name = pwd.getpwuid(self.stat.st_uid).pw_name
+ except KeyError:
+ owner_name = "{}".format(self.stat.st_uid)
+ return owner_name
+
+ # -------------------------------------------------------------------------
+ @property
+ def group(self):
+ """The owning group of the Puppetfile as a string."""
+ if self.stat is None:
+ return None
+
+ group_name = None
+ try:
+ group_name = grp.getgrgid(self.stat.st_gid).gr_name
+ except KeyError:
+ group_name = "{}".format(self.stat.st_gid)
+ return group_name
+
+ # -------------------------------------------------------------------------
+ def as_dict(self, short=True):
+ """
+ Transforms the elements of the object into a dict
+
+ @return: structure as dict
+ @rtype: dict
+ """
+
+ res = super(Puppetfile, self).as_dict(short=short)
+
+ res['default_environment'] = self.default_environment
+ res['default_env_root_dir'] = self.default_env_root_dir
+ res['default_forge_url'] = self.default_forge_url
+ res['default_moduledir'] = self.default_moduledir
+ res['environment'] = self.environment
+ res['env_root_dir'] = self.env_root_dir
+ res['env_dir'] = self.env_dir
+ res['exists'] = self.exists
+ res['filename'] = self.filename
+ res['file_path'] = self.file_path
+ res['forge_url'] = self.forge_url
+ res['group'] = self.group
+ res['moduledir'] = self.moduledir
+ res['open_args'] = self.open_args
+ res['owner'] = self.owner
+ res['readable'] = self.readable
+ res['stat'] = self.stat
+
+ return res
+
+ # -------------------------------------------------------------------------
+ def read(self):
+ """Reads the current Puppetfile and update self.modules."""
+
+ if self.verbose > 1:
+ LOG.debug("Searching {!r} ...".format(self.filename))
+
+ if not self.exists:
+ msg = _("Puppetfile {!r} does not exists.").format(self.filename)
+ raise PuppetfileError(msg)
+
+ if not self.readable:
+ msg = _("Puppetfile {!r} is not readable.").format(self.filename)
+ raise PuppetfileError(msg)
+
+ self.modules = ModuleInfoDict(
+ appname=self.appname, verbose=self.verbose, base_dir=self.base_dir)
+
+ LOG.debug("Reading {!r} ...".format(self.filename))
+ line_nr = 0
+ prev_line = ''
+
+ with self.file_path.open('r', **self.open_args) as fh:
+
+ for line in fh.readlines():
+
+ line_nr += 1
+ line = line.strip()
+ if not line:
+ continue
+ if self.re_comment.match(line):
+ continue
+
+ if self.verbose > 3:
+ LOG.debug("Read line {!r}...".format(line))
+
+ line = self.re_inline_comment.sub('', line)
+
+ prev_line += line
+ if self.re_comma_at_end.search(line):
+ continue
+
+ if self.verbose > 3:
+ LOG.debug("Evaluating line {!r}...".format(prev_line))
+
+ match = self.re_forge(prev_line)
+ if match:
+ self.forge_url = match.group(1)
+ continue
+
+ match = self.re_moduledir(prev_line)
+ if match:
+ self.re_moduledir = match.group(1)
+ continue
+
+ try:
+ module_info = ModuleInfo.init_from_puppetfile_line(
+ appname=self.appname, verbose=self.verbose, base_dir=self.base_dir,
+ line=prev_line)
+ self.modules.append(module_info)
+ except BaseModuleInfoError as e:
+ msg = _("{ec} on reading Puppetfile {lnr!r} ({nr}): {e}").format(
+ ec=e.__class__.__name__, fn=str(self.file_path),
+ lnr=line_nr, e=e)
+ msg += _("Module definition was: {!r}").format(prev_line)
+
+ prev_line = ''
+
+ if self.verbose > 1:
+ LOG.debug("Closing {!r} ...".format(self.filename))
+
+ # after looping all lines there is something left
+ if prev_line:
+ if self.verbose > 3:
+ LOG.debug("Evaluating line {!r}...".format(prev_line))
+ try:
+ module_info = ModuleInfo.init_from_puppetfile_line(
+ appname=self.appname, verbose=self.verbose, base_dir=self.base_dir,
+ line=prev_line)
+ self.modules.append(module_info)
+ except BaseModuleInfoError as e:
+ msg = _("{ec} on reading Puppetfile {lnr!r} ({nr}): {e}").format(
+ ec=e.__class__.__name__, fn=str(self.file_path),
+ lnr=line_nr, e=e)
+ msg += _("Module definition was: {!r}").format(prev_line)
+
+
+# =============================================================================
+
+if __name__ == "__main__":
+
+ pass
+
+# =============================================================================
+
+# vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4 list