]> Frank Brehm's Git Trees - pixelpark/trace-maillog.git/commitdiff
Started wit open() in lib/trace_maillog/any_uncompress_file.py
authorFrank Brehm <frank.brehm@pixelpark.com>
Thu, 11 May 2017 13:36:32 +0000 (15:36 +0200)
committerFrank Brehm <frank.brehm@pixelpark.com>
Thu, 11 May 2017 13:36:32 +0000 (15:36 +0200)
lib/trace_maillog/any_uncompress_file.py
test/test_any_uncompress.py
test/testfiles-uncompress/empty.txt [new file with mode: 0644]
test/testfiles-uncompress/emty.txt [deleted file]

index 5cdd1bd6ef5385ce782db31f479236fcaed272f7..8d9f616054f4ad3539405838323866bdf35a15d3 100644 (file)
@@ -4,7 +4,7 @@
 @author: Frank Brehm
 @contact: frank.brehm@pixelpark.com
 @copyright: © 2017 by Frank Brehm, Berlin
-@summary: The module for a AnyUmpressFile class
+@summary: The module for a AnyUncompressFile class
 $Id: $
 """
 from __future__ import absolute_import
@@ -31,7 +31,12 @@ except ImportError:
 # Third party modules
 import six
 
-__version__ = '0.2.1'
+# Own modules
+from trace_maillog import magic
+
+__version__ = '0.3.0'
+
+LOG = logging.getLogger(__name__)
 
 
 # =============================================================================
@@ -39,6 +44,7 @@ class BaseAnyUncompressError(Exception):
     """Base class for all exceptions defined in this module."""
     pass
 
+
 # =============================================================================
 class InvalidCompressionError(BaseAnyUncompressError, NotImplementedError):
     """Special exception class for not implemented compression types."""
@@ -61,7 +67,7 @@ class InvalidCompressionError(BaseAnyUncompressError, NotImplementedError):
 
 
 # =============================================================================
-class AnyUmpressFile(object):
+class AnyUncompressFile(object):
 
     compression_types = {
         '7z': {
@@ -106,7 +112,7 @@ class AnyUmpressFile(object):
         },
         'zip': {
             'mime_types': ('application/zip', ),
-            'supported': False,
+            'supported': True,
             'method': None,
         },
     }
@@ -125,6 +131,48 @@ class AnyUmpressFile(object):
         compression_types['xz']['supported'] = True
 
 
+# =============================================================================
+def open(
+    filename, archive_file=None, text=False, buffering=None, universal_newline=False,
+        encoding=None, errors=None):
+
+    if not os.path.exists(filename):
+        raise ValueError("File {!r} does not exists.".format(filename))
+    if not os.path.isfile(filename):
+        raise ValueError("File {!r} is not a regular file.".format(filename))
+    if not os.access(filename, os.R_OK):
+        raise ValueError("No read access to File {!r}.".format(filename))
+    fstat = os.stat(filename)
+    fsize = fstat.st_size
+
+    mime_type = None
+    compress_type = None
+
+    if fsize:
+        LOG.debug("Retreiving MIME type of file {!r}...".format(filename))
+        m = magic.Magic(mime=True)
+        mime_type = m.from_file(filename)
+        LOG.debug("Got MIME type {!r}.".format(mime_type))
+        supported = False
+        for ctype in AnyUncompressFile.compression_types.keys():
+            type_inf = AnyUncompressFile.compression_types[ctype]
+            if mime_type in type_inf['mime_types']:
+                compress_type = ctype
+                supported = type_inf.get('supported', False)
+                break
+        if compress_type:
+            LOG.debug("Got compress type {!r}.".format(compress_type))
+            if not supported:
+                raise InvalidCompressionError(compress_type, filename)
+        else:
+            LOG.debug("Did not found appropriate compress type.")
+    else:
+        LOG.debug("Could not assign a compress type to {!r}, because file is empty.".format(
+            filename))
+
+    return None
+
+
 # =============================================================================
 
 if __name__ == "__main__":
index afa77f70deb1dbc272ef2c1c3f6304ae965502b8..ed9a76a4a9db50d94725e086b236445b904cf2f7 100755 (executable)
@@ -18,6 +18,20 @@ except ImportError:
 
 import six
 
+HAS_BZIP2 = False
+try:
+    import bz2
+    HAS_BZIP2 = True
+except ImportError:
+    pass
+
+HAS_LZMA = False
+try:
+    import lzma
+    HAS_LZMA = True
+except ImportError:
+    pass
+
 from pb_base.common import pp
 from pb_base.common import bytes2human
 
@@ -41,6 +55,22 @@ class TestAnyUncompress(LogtraceTestcase):
         self.appname = APPNAME
         self.testdata_dir = os.path.join(os.path.dirname(__file__), 'testfiles-uncompress')
 
+        self.empty_file = os.path.join(self.testdata_dir, 'empty.txt')
+        self.usable_files = []
+        for bname in ('lorem-ipsum.txt', 'lorem-ipsum.txt.gz', 'lorem-ipsum.txt.zip'):
+            self.usable_files.append(os.path.join(self.testdata_dir, bname))
+        if HAS_BZIP2:
+            self.usable_files.append(os.path.join(self.testdata_dir, 'lorem-ipsum.txt.bz2'))
+        if HAS_LZMA:
+            self.usable_files.append(os.path.join(self.testdata_dir, 'lorem-ipsum.txt.lzma'))
+            self.usable_files.append(os.path.join(self.testdata_dir, 'lorem-ipsum.txt.xz'))
+
+        self.unusable_files = []
+        for bname in (
+                'lorem-ipsum.txt.7z', 'lorem-ipsum.txt.lz4',
+                'lorem-ipsum.txt.rar', 'lorem-ipsum.txt.Z'):
+            self.unusable_files.append(os.path.join(self.testdata_dir, bname))
+
     # -------------------------------------------------------------------------
     def tearDown(self):
         pass
@@ -49,10 +79,36 @@ class TestAnyUncompress(LogtraceTestcase):
     def test_import(self):
 
         LOG.info("Testing import of trace_maillog.any_uncompress_file ...")
-        import trace_maillog.any_uncompress_file                        # noqa
+        import trace_maillog.any_uncompress_file                            # noqa
+
+        LOG.info("Testing import of AnyUncompressFile from trace_maillog.any_uncompress_file ...")
+        from trace_maillog.any_uncompress_file import AnyUncompressFile     # noqa
+
+    # -------------------------------------------------------------------------
+    def test_open(self):
+
+        from trace_maillog import any_uncompress_file
+        from trace_maillog.any_uncompress_file import AnyUncompressFile
+        from trace_maillog.any_uncompress_file import InvalidCompressionError
+
+        LOG.info("Testing opening of usable files...")
+
+        for fname in self.usable_files:
+            LOG.debug("Testing {!r} ...".format(fname))
+            fh = any_uncompress_file.open(fname)
+
+        LOG.info("Test opening a non file path...")
+        with self.assertRaises(ValueError) as cm:
+            fh = any_uncompress_file.open(self.testdata_dir)
+        e = cm.exception
+        LOG.debug("%s raised: %s", e.__class__.__name__, e)
 
-        LOG.info("Testing import of AnyUmpressFile from trace_maillog.any_uncompress_file ...")
-        from trace_maillog.any_uncompress_file import AnyUmpressFile    # noqa
+        LOG.info("Test opening unsupported compressed files ...")
+        for fname in self.unusable_files:
+            with self.assertRaises(InvalidCompressionError) as cm:
+                fh = any_uncompress_file.open(fname)
+            e = cm.exception
+            LOG.debug("%s raised: %s", e.__class__.__name__, e)
 
 
 # =============================================================================
@@ -71,6 +127,7 @@ if __name__ == '__main__':
     suite = unittest.TestSuite()
 
     suite.addTest(TestAnyUncompress('test_import', verbose))
+    suite.addTest(TestAnyUncompress('test_open', verbose))
 
     runner = unittest.TextTestRunner(verbosity=verbose)
 
diff --git a/test/testfiles-uncompress/empty.txt b/test/testfiles-uncompress/empty.txt
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/test/testfiles-uncompress/emty.txt b/test/testfiles-uncompress/emty.txt
deleted file mode 100644 (file)
index e69de29..0000000