@author: Frank Brehm
@contact: frank.brehm@pixelpark.com
@copyright: © 2017 by Frank Brehm, Berlin
-@summary: The module for a AnyUmpressFile class
+@summary: The module for a AnyUncompressFile class
$Id: $
"""
from __future__ import absolute_import
# Third party modules
import six
-__version__ = '0.2.1'
+# Own modules
+from trace_maillog import magic
+
+__version__ = '0.3.0'
+
+LOG = logging.getLogger(__name__)
# =============================================================================
"""Base class for all exceptions defined in this module."""
pass
+
# =============================================================================
class InvalidCompressionError(BaseAnyUncompressError, NotImplementedError):
"""Special exception class for not implemented compression types."""
# =============================================================================
-class AnyUmpressFile(object):
+class AnyUncompressFile(object):
compression_types = {
'7z': {
},
'zip': {
'mime_types': ('application/zip', ),
- 'supported': False,
+ 'supported': True,
'method': None,
},
}
compression_types['xz']['supported'] = True
+# =============================================================================
+def open(
+ filename, archive_file=None, text=False, buffering=None, universal_newline=False,
+ encoding=None, errors=None):
+
+ if not os.path.exists(filename):
+ raise ValueError("File {!r} does not exists.".format(filename))
+ if not os.path.isfile(filename):
+ raise ValueError("File {!r} is not a regular file.".format(filename))
+ if not os.access(filename, os.R_OK):
+ raise ValueError("No read access to File {!r}.".format(filename))
+ fstat = os.stat(filename)
+ fsize = fstat.st_size
+
+ mime_type = None
+ compress_type = None
+
+ if fsize:
+ LOG.debug("Retreiving MIME type of file {!r}...".format(filename))
+ m = magic.Magic(mime=True)
+ mime_type = m.from_file(filename)
+ LOG.debug("Got MIME type {!r}.".format(mime_type))
+ supported = False
+ for ctype in AnyUncompressFile.compression_types.keys():
+ type_inf = AnyUncompressFile.compression_types[ctype]
+ if mime_type in type_inf['mime_types']:
+ compress_type = ctype
+ supported = type_inf.get('supported', False)
+ break
+ if compress_type:
+ LOG.debug("Got compress type {!r}.".format(compress_type))
+ if not supported:
+ raise InvalidCompressionError(compress_type, filename)
+ else:
+ LOG.debug("Did not found appropriate compress type.")
+ else:
+ LOG.debug("Could not assign a compress type to {!r}, because file is empty.".format(
+ filename))
+
+ return None
+
+
# =============================================================================
if __name__ == "__main__":
import six
+HAS_BZIP2 = False
+try:
+ import bz2
+ HAS_BZIP2 = True
+except ImportError:
+ pass
+
+HAS_LZMA = False
+try:
+ import lzma
+ HAS_LZMA = True
+except ImportError:
+ pass
+
from pb_base.common import pp
from pb_base.common import bytes2human
self.appname = APPNAME
self.testdata_dir = os.path.join(os.path.dirname(__file__), 'testfiles-uncompress')
+ self.empty_file = os.path.join(self.testdata_dir, 'empty.txt')
+ self.usable_files = []
+ for bname in ('lorem-ipsum.txt', 'lorem-ipsum.txt.gz', 'lorem-ipsum.txt.zip'):
+ self.usable_files.append(os.path.join(self.testdata_dir, bname))
+ if HAS_BZIP2:
+ self.usable_files.append(os.path.join(self.testdata_dir, 'lorem-ipsum.txt.bz2'))
+ if HAS_LZMA:
+ self.usable_files.append(os.path.join(self.testdata_dir, 'lorem-ipsum.txt.lzma'))
+ self.usable_files.append(os.path.join(self.testdata_dir, 'lorem-ipsum.txt.xz'))
+
+ self.unusable_files = []
+ for bname in (
+ 'lorem-ipsum.txt.7z', 'lorem-ipsum.txt.lz4',
+ 'lorem-ipsum.txt.rar', 'lorem-ipsum.txt.Z'):
+ self.unusable_files.append(os.path.join(self.testdata_dir, bname))
+
# -------------------------------------------------------------------------
def tearDown(self):
pass
def test_import(self):
LOG.info("Testing import of trace_maillog.any_uncompress_file ...")
- import trace_maillog.any_uncompress_file # noqa
+ import trace_maillog.any_uncompress_file # noqa
+
+ LOG.info("Testing import of AnyUncompressFile from trace_maillog.any_uncompress_file ...")
+ from trace_maillog.any_uncompress_file import AnyUncompressFile # noqa
+
+ # -------------------------------------------------------------------------
+ def test_open(self):
+
+ from trace_maillog import any_uncompress_file
+ from trace_maillog.any_uncompress_file import AnyUncompressFile
+ from trace_maillog.any_uncompress_file import InvalidCompressionError
+
+ LOG.info("Testing opening of usable files...")
+
+ for fname in self.usable_files:
+ LOG.debug("Testing {!r} ...".format(fname))
+ fh = any_uncompress_file.open(fname)
+
+ LOG.info("Test opening a non file path...")
+ with self.assertRaises(ValueError) as cm:
+ fh = any_uncompress_file.open(self.testdata_dir)
+ e = cm.exception
+ LOG.debug("%s raised: %s", e.__class__.__name__, e)
- LOG.info("Testing import of AnyUmpressFile from trace_maillog.any_uncompress_file ...")
- from trace_maillog.any_uncompress_file import AnyUmpressFile # noqa
+ LOG.info("Test opening unsupported compressed files ...")
+ for fname in self.unusable_files:
+ with self.assertRaises(InvalidCompressionError) as cm:
+ fh = any_uncompress_file.open(fname)
+ e = cm.exception
+ LOG.debug("%s raised: %s", e.__class__.__name__, e)
# =============================================================================
suite = unittest.TestSuite()
suite.addTest(TestAnyUncompress('test_import', verbose))
+ suite.addTest(TestAnyUncompress('test_open', verbose))
runner = unittest.TextTestRunner(verbosity=verbose)