# Standard modules
import os
import logging
+import pprint
+import zipfile
import gzip
HAS_BZIP2 = False
# Own modules
from trace_maillog import magic
-__version__ = '0.3.0'
+__version__ = '0.3.1'
LOG = logging.getLogger(__name__)
return msg
+# =============================================================================
+def pp(value, indent=4, width=99, depth=None):
+ pretty_printer = pprint.PrettyPrinter(
+ indent=indent, width=width, depth=depth)
+ return pretty_printer.pformat(value)
+
+
# =============================================================================
class AnyUncompressFile(object):
# -------------------------------------------------------------------------
def __init__(self, fh, compress_type=None):
- self.fh = fh
+ self._fh = fh
self.compress_type = compress_type
+ # -------------------------------------------------------------------------
+ def __enter__(self):
+ return self._fh
+
+ # -------------------------------------------------------------------------
+ def __exit__(self, exc_type, exc_value, traceback):
+
+ if self._fh:
+ LOG.debug("Closing compressed file.")
+ self._fh.close()
+
+ if exc_type is None and exc_value is None and traceback is None:
+ return True
+
+ return False
+
+ # -------------------------------------------------------------------------
+ def __del__(self):
+
+ self.close()
+
+ # -------------------------------------------------------------------------
+ def close(self):
+
+ if self._fh:
+ LOG.debug("Closing compressed file.")
+ self._fh.close()
+ self._fh = None
+
# -------------------------------------------------------------------------
if HAS_BZIP2:
compression_types['bz2']['supported'] = True
LOG.debug("Got compress type {!r}.".format(compress_type))
if not supported:
raise InvalidCompressionError(compress_type, filename)
+ if mime_type in AnyUncompressFile.compression_types['gzip']['mime_types']:
+ return _open_gzip(
+ filename, text=text, universal_newline=universal_newline,
+ encoding=encoding, errors=errors)
else:
LOG.debug("Did not found appropriate compress type.")
+ return _open_raw(
+ filename, text=text, buffering=buffering, universal_newline=universal_newline,
+ encoding=encoding, errors=errors)
else:
- LOG.debug("Could not assign a compress type to {!r}, because file is empty.".format(
+ LOG.debug("Could not assign a compress type to {!r}, because file it is empty.".format(
filename))
+ return _open_raw(
+ filename, text=text, buffering=buffering, universal_newline=universal_newline,
+ encoding=encoding, errors=errors)
return None
+# =============================================================================
+def _open_gzip(
+ filename, text=False, universal_newline=False,
+ encoding=None, errors=None):
+
+ open_args = {}
+ mode = 'rb'
+ if text and six.PY3:
+ mode = 'r'
+ if universal_newline:
+ open_args['newline'] = os.linesep
+ if encoding is None:
+ open_args['encoding'] = 'utf-8'
+ else:
+ open_args['encoding'] = encoding
+ if errors is None:
+ open_args['errors'] = 'surrogatescape'
+ else:
+ open_args['errors'] = errors
+
+ LOG.debug("Opening gzip file {!r} with mod {!r}, other open arguments: {}".format(
+ filename, mode, pp(open_args)))
+ fh = gzip.GzipFile(filename, mode, **open_args)
+ return AnyUncompressFile(fh, 'gzip')
+
+# =============================================================================
+def _open_raw(
+ filename, text=False, buffering=None, universal_newline=False,
+ encoding=None, errors=None):
+
+ open_args = {}
+ if buffering is not None:
+ open_args['buffering'] = buffering
+ mode = 'r'
+ if six.PY2:
+ if text:
+ if universal_newline:
+ mode += 'U'
+ else:
+ mode += 'b'
+ import __builtin__
+ builtin_open = __builtin__.open
+ else:
+ if text:
+ if universal_newline:
+ open_args['newline'] = os.linesep
+ if encoding is None:
+ open_args['encoding'] = 'utf-8'
+ else:
+ open_args['encoding'] = encoding
+ if errors is None:
+ open_args['errors'] = 'surrogatescape'
+ else:
+ open_args['errors'] = errors
+ else:
+ mode += 'b'
+ if buffering is None:
+ open_args['buffering'] = -1
+ import builtins
+ builtin_open = builtins.open
+
+ LOG.debug("Opening raw file {!r} with mod {!r}, other open arguments: {}".format(
+ filename, mode, pp(open_args)))
+
+ fh = builtin_open(filename, mode, **open_args)
+ return AnyUncompressFile(fh)
+
+
# =============================================================================
if __name__ == "__main__":