]> Frank Brehm's Git Trees - pixelpark/trace-maillog.git/commitdiff
Adding some more opening functions
authorFrank Brehm <frank.brehm@pixelpark.com>
Thu, 11 May 2017 15:23:07 +0000 (17:23 +0200)
committerFrank Brehm <frank.brehm@pixelpark.com>
Thu, 11 May 2017 15:23:07 +0000 (17:23 +0200)
lib/trace_maillog/any_uncompress_file.py
test/test_any_uncompress.py

index 83b5403d9bf74d6ac434d38be2cb9674d1d89c41..0d079e675490b2d9d50ea513c8771713cd578cb6 100644 (file)
@@ -36,7 +36,7 @@ import six
 # Own modules
 from trace_maillog import magic
 
-__version__ = '0.3.1'
+__version__ = '0.3.2'
 
 LOG = logging.getLogger(__name__)
 
@@ -131,6 +131,7 @@ class AnyUncompressFile(object):
 
         self._fh = fh
         self.compress_type = compress_type
+        self.container = None
 
     # -------------------------------------------------------------------------
     def __enter__(self):
@@ -140,8 +141,7 @@ class AnyUncompressFile(object):
     def __exit__(self, exc_type, exc_value, traceback):
 
         if self._fh:
-            LOG.debug("Closing compressed file.")
-            self._fh.close()
+            self.close()
 
         if exc_type is None and exc_value is None and traceback is None:
             return True
@@ -161,6 +161,10 @@ class AnyUncompressFile(object):
             self._fh.close()
             self._fh = None
 
+        if self.container:
+            self.container.close()
+            self.container = None
+
     # -------------------------------------------------------------------------
     if HAS_BZIP2:
         compression_types['bz2']['supported'] = True
@@ -206,6 +210,14 @@ def open(
                 return _open_gzip(
                     filename, text=text, universal_newline=universal_newline,
                     encoding=encoding, errors=errors)
+            elif mime_type in AnyUncompressFile.compression_types['bz2']['mime_types']:
+                return _open_bzip2(
+                    filename, text=text, buffering=buffering, universal_newline=universal_newline,
+                    encoding=encoding, errors=errors)
+            elif mime_type in AnyUncompressFile.compression_types['zip']['mime_types']:
+                return _open_zip(
+                    filename, archive_file=archive_file, text=text,
+                    universal_newline=universal_newline)
         else:
             LOG.debug("Did not found appropriate compress type.")
             return _open_raw(
@@ -246,6 +258,65 @@ def _open_gzip(
     fh = gzip.GzipFile(filename, mode, **open_args)
     return AnyUncompressFile(fh, 'gzip')
 
+# =============================================================================
+def _open_bzip2(
+    filename, text=False, buffering=None, universal_newline=False,
+        encoding=None, errors=None):
+
+    open_args = {}
+    mode = 'r'
+    if six.PY2:
+        if text:
+            if universal_newline:
+                mode += 'U'
+        if buffering is not None:
+            open_args['buffering'] = buffering
+    else:
+        if text:
+            if universal_newline:
+                open_args['newline'] = os.linesep
+            if encoding is None:
+                open_args['encoding'] = 'utf-8'
+            else:
+                open_args['encoding'] = encoding
+            if errors is None:
+                open_args['errors'] = 'surrogatescape'
+            else:
+                open_args['errors'] = errors
+        else:
+            mode += 'b'
+
+    LOG.debug("Opening Bzip2 file {!r} with mod {!r}, other open arguments: {}".format(
+        filename, mode, pp(open_args)))
+
+    if six.PY2:
+        fh = bz2.BZ2File(filename, mode, **open_args)
+    else:
+        fh = bz2.open(filename, mode, **open_args)
+    return AnyUncompressFile(fh, 'bzip2')
+
+# =============================================================================
+def _open_zip(filename, archive_file=None, text=False, universal_newline=False):
+
+    if archive_file is None:
+        raise ValueError("A filename for the file inside the zipfile container is needed.")
+
+    zipfile_container = zipfile.ZipFile(filename, 'r')
+
+    mode = 'r'
+    if six.PY2:
+        if text and universal_newline:
+            mode += 'U'
+
+    LOG.debug("Opening file {!r} inside zip file {!r} with mode {!r}".format(
+        archive_file, filename, mode))
+
+    fh = zipfile_container.open(archive_file, mode)
+    ret = AnyUncompressFile(fh, 'zip')
+    ret.container = zipfile_container
+
+    return ret
+
 # =============================================================================
 def _open_raw(
     filename, text=False, buffering=None, universal_newline=False,
index 37aed46aa3c95f23cddf84e270b3349686677f94..0d97b349d2cf0c1bf6e104e3d2fc042bafedbed3 100755 (executable)
@@ -99,8 +99,11 @@ class TestAnyUncompress(LogtraceTestcase):
 
         for fname in self.usable_files:
             LOG.debug("Testing {!r} ...".format(fname))
-            fh = any_uncompress_file.open(fname)
-            fh.close()
+            afile = None
+            if fname.endswith('.zip'):
+                afile = os.path.basename(fname).replace('.zip', '')
+            with any_uncompress_file.open(fname, afile) as fh:
+                pass
 
         LOG.info("Test opening a non file path...")
         with self.assertRaises(ValueError) as cm: