# Own modules
from trace_maillog import magic
-__version__ = '0.3.1'
+__version__ = '0.3.2'
LOG = logging.getLogger(__name__)
self._fh = fh
self.compress_type = compress_type
+ self.container = None
# -------------------------------------------------------------------------
def __enter__(self):
def __exit__(self, exc_type, exc_value, traceback):
if self._fh:
- LOG.debug("Closing compressed file.")
- self._fh.close()
+ self.close()
if exc_type is None and exc_value is None and traceback is None:
return True
self._fh.close()
self._fh = None
+ if self.container:
+ self.container.close()
+ self.container = None
+
# -------------------------------------------------------------------------
if HAS_BZIP2:
compression_types['bz2']['supported'] = True
return _open_gzip(
filename, text=text, universal_newline=universal_newline,
encoding=encoding, errors=errors)
+ elif mime_type in AnyUncompressFile.compression_types['bz2']['mime_types']:
+ return _open_bzip2(
+ filename, text=text, buffering=buffering, universal_newline=universal_newline,
+ encoding=encoding, errors=errors)
+ elif mime_type in AnyUncompressFile.compression_types['zip']['mime_types']:
+ return _open_zip(
+ filename, archive_file=archive_file, text=text,
+ universal_newline=universal_newline)
else:
LOG.debug("Did not found appropriate compress type.")
return _open_raw(
fh = gzip.GzipFile(filename, mode, **open_args)
return AnyUncompressFile(fh, 'gzip')
+# =============================================================================
+def _open_bzip2(
+ filename, text=False, buffering=None, universal_newline=False,
+ encoding=None, errors=None):
+
+ open_args = {}
+ mode = 'r'
+ if six.PY2:
+ if text:
+ if universal_newline:
+ mode += 'U'
+ if buffering is not None:
+ open_args['buffering'] = buffering
+ else:
+ if text:
+ if universal_newline:
+ open_args['newline'] = os.linesep
+ if encoding is None:
+ open_args['encoding'] = 'utf-8'
+ else:
+ open_args['encoding'] = encoding
+ if errors is None:
+ open_args['errors'] = 'surrogatescape'
+ else:
+ open_args['errors'] = errors
+ else:
+ mode += 'b'
+
+ LOG.debug("Opening Bzip2 file {!r} with mod {!r}, other open arguments: {}".format(
+ filename, mode, pp(open_args)))
+
+ if six.PY2:
+ fh = bz2.BZ2File(filename, mode, **open_args)
+ else:
+ fh = bz2.open(filename, mode, **open_args)
+ return AnyUncompressFile(fh, 'bzip2')
+
+# =============================================================================
+def _open_zip(filename, archive_file=None, text=False, universal_newline=False):
+
+ if archive_file is None:
+ raise ValueError("A filename for the file inside the zipfile container is needed.")
+
+ zipfile_container = zipfile.ZipFile(filename, 'r')
+
+ mode = 'r'
+ if six.PY2:
+ if text and universal_newline:
+ mode += 'U'
+
+ LOG.debug("Opening file {!r} inside zip file {!r} with mode {!r}".format(
+ archive_file, filename, mode))
+
+ fh = zipfile_container.open(archive_file, mode)
+ ret = AnyUncompressFile(fh, 'zip')
+ ret.container = zipfile_container
+
+ return ret
+
# =============================================================================
def _open_raw(
filename, text=False, buffering=None, universal_newline=False,