]> Frank Brehm's Git Trees - pixelpark/trace-maillog.git/commitdiff
Defining first opening functions
authorFrank Brehm <frank.brehm@pixelpark.com>
Thu, 11 May 2017 14:43:41 +0000 (16:43 +0200)
committerFrank Brehm <frank.brehm@pixelpark.com>
Thu, 11 May 2017 14:43:41 +0000 (16:43 +0200)
lib/trace_maillog/any_uncompress_file.py
test/test_any_uncompress.py

index 8d9f616054f4ad3539405838323866bdf35a15d3..83b5403d9bf74d6ac434d38be2cb9674d1d89c41 100644 (file)
@@ -12,6 +12,8 @@ from __future__ import absolute_import
 # Standard modules
 import os
 import logging
+import pprint
+import zipfile
 import gzip
 
 HAS_BZIP2 = False
@@ -34,7 +36,7 @@ import six
 # Own modules
 from trace_maillog import magic
 
-__version__ = '0.3.0'
+__version__ = '0.3.1'
 
 LOG = logging.getLogger(__name__)
 
@@ -66,6 +68,13 @@ class InvalidCompressionError(BaseAnyUncompressError, NotImplementedError):
         return msg
 
 
+# =============================================================================
+def pp(value, indent=4, width=99, depth=None):
+    pretty_printer = pprint.PrettyPrinter(
+        indent=indent, width=width, depth=depth)
+    return pretty_printer.pformat(value)
+
+
 # =============================================================================
 class AnyUncompressFile(object):
 
@@ -120,9 +129,38 @@ class AnyUncompressFile(object):
     # -------------------------------------------------------------------------
     def __init__(self, fh, compress_type=None):
 
-        self.fh = fh
+        self._fh = fh
         self.compress_type = compress_type
 
+    # -------------------------------------------------------------------------
+    def __enter__(self):
+        return self._fh
+
+    # -------------------------------------------------------------------------
+    def __exit__(self, exc_type, exc_value, traceback):
+
+        if self._fh:
+            LOG.debug("Closing compressed file.")
+            self._fh.close()
+
+        if exc_type is None and exc_value is None and traceback is None:
+            return True
+
+        return False
+
+    # -------------------------------------------------------------------------
+    def __del__(self):
+
+        self.close()
+
+    # -------------------------------------------------------------------------
+    def close(self):
+
+        if self._fh:
+            LOG.debug("Closing compressed file.")
+            self._fh.close()
+            self._fh = None
+
     # -------------------------------------------------------------------------
     if HAS_BZIP2:
         compression_types['bz2']['supported'] = True
@@ -164,15 +202,93 @@ def open(
             LOG.debug("Got compress type {!r}.".format(compress_type))
             if not supported:
                 raise InvalidCompressionError(compress_type, filename)
+            if mime_type in AnyUncompressFile.compression_types['gzip']['mime_types']:
+                return _open_gzip(
+                    filename, text=text, universal_newline=universal_newline,
+                    encoding=encoding, errors=errors)
         else:
             LOG.debug("Did not found appropriate compress type.")
+            return _open_raw(
+                filename, text=text, buffering=buffering, universal_newline=universal_newline,
+                encoding=encoding, errors=errors)
     else:
-        LOG.debug("Could not assign a compress type to {!r}, because file is empty.".format(
+        LOG.debug("Could not assign a compress type to {!r}, because file it is empty.".format(
             filename))
+        return _open_raw(
+            filename, text=text, buffering=buffering, universal_newline=universal_newline,
+            encoding=encoding, errors=errors)
 
     return None
 
 
+# =============================================================================
+def _open_gzip(
+    filename, text=False, universal_newline=False,
+        encoding=None, errors=None):
+
+    open_args = {}
+    mode = 'rb'
+    if text and six.PY3:
+        mode = 'r'
+        if universal_newline:
+            open_args['newline'] = os.linesep
+        if encoding is None:
+            open_args['encoding'] = 'utf-8'
+        else:
+            open_args['encoding'] = encoding
+        if errors is None:
+            open_args['errors'] = 'surrogatescape'
+        else:
+            open_args['errors'] = errors
+
+    LOG.debug("Opening gzip file {!r} with mod {!r}, other open arguments: {}".format(
+        filename, mode, pp(open_args)))
+    fh = gzip.GzipFile(filename, mode, **open_args)
+    return AnyUncompressFile(fh, 'gzip')
+
+# =============================================================================
+def _open_raw(
+    filename, text=False, buffering=None, universal_newline=False,
+        encoding=None, errors=None):
+
+    open_args = {}
+    if buffering is not None:
+        open_args['buffering'] = buffering
+    mode = 'r'
+    if six.PY2:
+        if text:
+            if universal_newline:
+                mode += 'U'
+        else:
+            mode += 'b'
+        import __builtin__
+        builtin_open = __builtin__.open
+    else:
+        if text:
+            if universal_newline:
+                open_args['newline'] = os.linesep
+            if encoding is None:
+                open_args['encoding'] = 'utf-8'
+            else:
+                open_args['encoding'] = encoding
+            if errors is None:
+                open_args['errors'] = 'surrogatescape'
+            else:
+                open_args['errors'] = errors
+        else:
+            mode += 'b'
+        if buffering is None:
+            open_args['buffering'] = -1
+        import builtins
+        builtin_open = builtins.open
+
+    LOG.debug("Opening raw file {!r} with mod {!r}, other open arguments: {}".format(
+        filename, mode, pp(open_args)))
+
+    fh = builtin_open(filename, mode, **open_args)
+    return AnyUncompressFile(fh)
+
+
 # =============================================================================
 
 if __name__ == "__main__":
index ed9a76a4a9db50d94725e086b236445b904cf2f7..37aed46aa3c95f23cddf84e270b3349686677f94 100755 (executable)
@@ -93,9 +93,14 @@ class TestAnyUncompress(LogtraceTestcase):
 
         LOG.info("Testing opening of usable files...")
 
+        LOG.debug("Testing {!r} ...".format(self.empty_file))
+        with any_uncompress_file.open(self.empty_file) as fh:
+            pass
+
         for fname in self.usable_files:
             LOG.debug("Testing {!r} ...".format(fname))
             fh = any_uncompress_file.open(fname)
+            fh.close()
 
         LOG.info("Test opening a non file path...")
         with self.assertRaises(ValueError) as cm: