from . import BaseDPXPuppetApplication
from ..forge.mod_info import ForgeModuleInfo
+from ..forge.mod_info import ReadForgeModuleInfoError, WriteForgeModuleInfoError
LOG = logging.getLogger(__name__)
_ = XLATOR.gettext
ngettext = XLATOR.ngettext
-__version__ = '0.4.0'
+__version__ = '0.5.0'
# =============================================================================
show_assume_options = False
show_console_timeout_option = False
- # show_force_option = False
+ show_force_option = True
show_simulate_option = False
# -------------------------------------------------------------------------
self.module_info = None
self.var_dir = None
+ self._force_desc_msg = _(
+ "Write the cache file also, if the data was successful read from chache file.")
+
desc = _(
"This application retrieves information about the given module "
"from Puppet forge, writes the found information into its cache file and "
"Don't read the forge information from a existing cache file, "
"get it always straight from Puppet forge.")
read_cache_group.add_argument(
- '-R', '--no-read-cache', dest="read_cache", action="store_false", help=help_txt)
+ '-R', '--no-read-cache', dest="no_read_cache", action="store_true", help=help_txt)
write_cache_group = forge_group.add_mutually_exclusive_group()
help_txt += ' ' + _("(default)")
help_txt += '.'
write_cache_group.add_argument(
- '-W', '--no-write-cache', dest='write_cache', action="store_false", help=help_txt)
+ '-W', '--no-write-cache', dest='no_write_cache', action="store_true", help=help_txt)
help_txt = _(
"The directory containing variable data from DPX puppet tools, Default: {vd!r}. "
super(GetForgeModuleApplication, self).post_init()
- read_cache = getattr(self.args, 'read_cache', None)
- if read_cache is not None:
- if read_cache:
- self.read_cache = True
- else:
+ read_cache = getattr(self.args, 'read_cache', False)
+ if read_cache:
+ self.read_cache = True
+ else:
+ no_read_cache = getattr(self.args, 'no_read_cache', False)
+ if no_read_cache:
self.read_cache = False
write_cache = getattr(self.args, 'write_cache', None)
- if write_cache is not None:
- if write_cache:
- self.write_cache = True
- else:
+ if write_cache:
+ self.write_cache = True
+ else:
+ no_write_cache = getattr(self.args, 'no_write_cache', False)
+ if no_write_cache:
self.write_cache = False
var_dir = getattr(self.args, 'var_dir', None)
LOG.error(msg)
self.exit(1)
else:
- var_dir = DEFAULT_VAR_DIR
+ var_dir = self.cfg.var_dir
self.module_name = self.args.module_name[0]
def _run(self):
LOG.info("Run, baby run, baby run run run ...")
- self.module_info.retrieve_forge_data()
+
+ has_read = False
+ if self.read_cache:
+ try:
+ self.module_info.read()
+ has_read = True
+ except ReadForgeModuleInfoError as e:
+ LOG.debug(_("Could not read cache: {}").format(e))
+ if not has_read:
+ self.module_info.retrieve_forge_data()
+
+ self.show_modinfo()
+
+ do_write_cache = self.write_cache
+ if has_read and not self.force:
+ do_write_cache = False
+
+ if do_write_cache:
+ try:
+ self.module_info.write()
+ has_read = True
+ except WriteForgeModuleInfoError as e:
+ LOG.warn(_("Could not write cache: {}").format(e))
+
+ # -------------------------------------------------------------------------
+ def show_modinfo(self):
+
+ msg = _("Information about module {!r}:").format(self.module_name)
+ self.empty_line()
+ print(self.colored(msg, 'CYAN'))
+ self.line(width=len(msg), color='CYAN')
+ self.empty_line()
+
+ LOG.debug(_("Module-Info:") + '\n' + pp(self.module_info.as_dict()))
+
+
# =============================================================================
import datetime
import collections
import time
+import os
from pathlib import Path
from .owner_info import ForgeOwnerInfo
-__version__ = '0.4.1'
+__version__ = '0.5.0'
LOG = logging.getLogger(__name__)
self._updated_at = None
self._uri = None
+ self._exists_on_forge = None
self._forge_uri = self.default_forge_uri
self._http_timeout = self.default_http_timeout
self._response_code = None
res['deprecated_for'] = self.deprecated_for
res['downloads'] = self.downloads
res['endorsement'] = self.endorsement
+ res['exists_on_forge'] = self.exists_on_forge
res['feedback_score'] = self.feedback_score
+ res['forge_cache_dir'] = self.forge_cache_dir
+ res['forge_cache_file'] = self.forge_cache_file
res['forge_uri'] = self.forge_uri
res['homepage_url'] = self.homepage_url
res['http_timeout'] = self.http_timeout
raise ValueError(msg)
self._http_timeout = v
+ # -------------------------------------------------------------------------
+ @property
+ def exists_on_forge(self):
+ """The giben module exists in Puppet forge."""
+ return self._exists_on_forge
+
# -------------------------------------------------------------------------
@property
def response_code(self):
raise BaseModuleInfoError(msg)
self._var_dir = v
+ # -------------------------------------------------------------------------
+ @property
+ def forge_cache_dir(self):
+ """The directory containg the cache YAML file."""
+ if not self.var_dir:
+ return None
+ return self.var_dir / 'forge'
+
+ # -------------------------------------------------------------------------
+ @property
+ def forge_cache_file(self):
+ """The filename of the YAML-file, containing the cached forge data."""
+ if not self.forge_cache_dir:
+ return None
+ return self.forge_cache_dir / (self.full_name + '.yaml')
+
# -------------------------------------------------------------------------
def to_data(self):
"""Returning a dict, which can be used to re-instantiate this module info."""
res['forge_data']['updated_at'] = self.updated_at.strftime('%Y-%m-%d %H:%M:%S %z')
res['forge_data']['releases'] = []
- for release in self.releases:
- res['forge_data']['releases'].append(release.to_data())
+ if self.releases:
+ for release in self.releases:
+ res['forge_data']['releases'].append(release.to_data())
res['forge_data']['current_release'] = None
if self.current_release:
res['ts_checked'] = self.ts_checked
res['response_code'] = self.response_code
res['response_msg'] = self.response_msg
+ res['exists_on_forge'] = self.exists_on_forge
return res
self._updated_at = None
self._uri = None
+ self._exists_on_forge = None
self._response_code = None
self._response_msg = None
self._ts_checked = None
"""Reading the forge data from given .yaml-file."""
if not data_file:
- for ext in ('.yaml', 'yml'):
- fn = self.var_dir / 'forge' / (self.full_name + ext)
- if fn.exists():
- data_file = fn
- break
+ data_file = self.forge_cache_file
if not data_file:
- fn = str(self.var_dir / 'forge' / (self.full_name + '.y(a?)ml'))
- msg = _("Did not found the forge module info file {!r}.").format(fn)
+ msg = _("Could not evaluate name of the forge cache file.").format(fn)
raise ReadForgeModuleInfoError(msg)
+ if not data_file.exists():
+ msg = _("Forge cache file {!r} does not exists.").format(str(data_file))
+ raise ReadForgeModuleInfoError(msg)
if not data_file.is_file():
- msg = _("Frge module info file {!r} is not a regular file.").format(str(data_file))
+ msg = _("Forge cache file {!r} is not a regular file.").format(str(data_file))
raise ReadForgeModuleInfoError(msg)
if not os.access(data_file, os.R_OK):
- msg = _("Frge module info file {!r} is not readable.").format(str(data_file))
+ msg = _("Forge cache file {!r} is not readable.").format(str(data_file))
raise ReadForgeModuleInfoError(msg)
+ LOG.debug(_("Reading forge cache file {!r} ...").format(str(data_file)))
+
data = None
try:
with data_file.open('r', **self.open_args) as fh:
fd['owner'], appname=self.appname, verbose=self.verbose, base_dir=self.base_dir)
for prop_name in ('response_code', 'response_msg', 'ts_checked'):
- if prop_name in data and data[prop_name]:
- setattr(self, prop_name, data[prop_name])
+ pname = '_' + prop_name
+ if prop_name in data:
+ setattr(self, pname, data[prop_name])
+
+ if 'exists_on_forge' in data:
+ self._exists_on_forge = to_bool(data['exists_on_forge'])
# -------------------------------------------------------------------------
def write(self, data_file=None):
"""Writing the forge data into the given .yaml-file."""
if not data_file:
- data_file = self.var_dir / 'forge' / (self.full_name + '.yaml')
-
- forge_dir = data_file.parent
- if not forge_dir.is_dir():
- LOG.info(_("Creating directory {!r} ...").format(str(forge_dir)))
- os.mkdir(str(forge_dir), mode=0o755)
-
- else:
- forge_dir = data_file.parent
+ data_file = self.forge_cache_file
+ if not data_file:
+ msg = _("Could not evaluate name of the forge cache file.").format(fn)
+ raise WriteForgeModuleInfoError(msg)
+
+ forge_dir = data_file.parent
+ if not forge_dir.is_dir():
+ forge_dir_parent = forge_dir.parent
+ if not forge_dir_parent.exists():
+ msg = _("Directory {!r} does not exists.").format(str(forge_dir_parent))
+ raise WriteForgeModuleInfoError(msg)
+ if not forge_dir_parent.is_dir():
+ msg = _("Path {!r} is not a directory.").format(str(forge_dir_parent))
+ raise WriteForgeModuleInfoError(msg)
+ if not os.access(forge_dir_parent, os.W_OK):
+ msg = _("Directory {!r} is not writeable.").format(str(forge_dir_parent))
+ raise WriteForgeModuleInfoError(msg)
+
+ LOG.info(_("Creating directory {!r} ...").format(str(forge_dir)))
+ os.mkdir(str(forge_dir), mode=0o755)
+
+ if not os.access(forge_dir, os.W_OK):
+ msg = _("Directory {!r} is not writeable.").format(str(forge_dir))
+ raise WriteForgeModuleInfoError(msg)
data = self.to_data()
+ LOG.debug(_("Writing forge cache file {!r} ...").format(str(data_file)))
+
try:
with data_file.open('w', **self.open_args) as fh:
yaml.safe_dump(data, fh)
LOG.debug("Got status code: {}.".format(response.status_code))
self._response_code = response.status_code
if not response.ok:
+ self.set_ts_checked()
+ self._exists_on_forge = False
err = response.json()
err_msg = '\n'.join(err['errors'])
if self._response_msg:
self.set_ts_checked()
data = response.json()
+ self._exists_on_forge = True
if self.verbose > 3:
LOG.debug("Performing forge data:\n" + pp(data))
self.apply_data(data, from_cache=False)