Rev 2485: Add basic interfaces for compressing and decompressing strings, in http://bzr.arbash-meinel.com/branches/bzr/0.17-dev/gzip_reader
John Arbash Meinel
john at arbash-meinel.com
Wed May 9 02:32:31 BST 2007
At http://bzr.arbash-meinel.com/branches/bzr/0.17-dev/gzip_reader
------------------------------------------------------------
revno: 2485
revision-id: john at arbash-meinel.com-20070509013152-eosdtgwm54158m0c
parent: pqm at pqm.ubuntu.com-20070508203256-wcxwdphd1y2psezh
committer: John Arbash Meinel <john at arbash-meinel.com>
branch nick: gzip_reader
timestamp: Tue 2007-05-08 20:31:52 -0500
message:
Add basic interfaces for compressing and decompressing strings,
which can be optimized to not cerate as many temporary objects.
modified:
bzrlib/tests/test_tuned_gzip.py test_tuned_gzip.py-20060418042056-c576dfc708984968
bzrlib/tuned_gzip.py tuned_gzip.py-20060407014720-5aadc518e928e8d2
-------------- next part --------------
=== modified file 'bzrlib/tests/test_tuned_gzip.py'
--- a/bzrlib/tests/test_tuned_gzip.py 2006-10-11 23:08:27 +0000
+++ b/bzrlib/tests/test_tuned_gzip.py 2007-05-09 01:31:52 +0000
@@ -24,8 +24,10 @@
from unittest import TestCase
import zlib
-
-from bzrlib import tuned_gzip
+from bzrlib import (
+ tests,
+ tuned_gzip,
+ )
class FakeDecompress(object):
@@ -85,3 +87,42 @@
self.assertEqual('', stream.read())
# and it should be new member time in the stream.
self.failUnless(myfile._new_member)
+
+
+class TestCompressText(tests.TestCase):
+
+ def assertCompressed(self, plain_text):
+ """Check that tuned_gzip.compress_text() returns valid gzipped text.
+
+ The returned text should be decompressible into the same raw text.
+ """
+ compressed = tuned_gzip.compress_text(plain_text)
+ # This is the gzip magic number and compression algorithm.
+ # And the flags field, which should not include a filename.
+ self.assertStartsWith(compressed,
+ '\037\213' # magic num
+ '\010' # compression method
+ '\0' # flags (empty)
+ '\0\0\0\0' # timestamp (empty)
+ '\002' # ???
+ '\377' # ???
+ )
+ gz = tuned_gzip.GzipFile(mode='rb', fileobj=StringIO(compressed))
+ self.assertEqual(plain_text, gz.read())
+
+ def test_compress_empty_string(self):
+ self.assertCompressed('')
+
+
+
+class TestDecompressGzipHunk(tests.TestCase):
+
+ def assertDecompress(self, expected, gzip_hunk):
+ """Check that tuned_gzip.decompress_gzip_hunk() returns valid text.
+ """
+ plain_text = tuned_gzip.decompress_gzip_hunk(gzip_hunk)
+ self.assertEqual(expected, plain_text)
+
+ def test_decompress_empty_string(self):
+ self.assertDecompress('', '\x1f\x8b\x08\x00\x00\x00\x00\x00\x02\xff'
+ '\x03\x00\x00\x00\x00\x00\x00\x00\x00\x00')
=== modified file 'bzrlib/tuned_gzip.py'
--- a/bzrlib/tuned_gzip.py 2006-10-11 23:08:27 +0000
+++ b/bzrlib/tuned_gzip.py 2007-05-09 01:31:52 +0000
@@ -330,3 +330,39 @@
self.write(''.join(lines))
+def compress_text(text):
+ """Take a string and return the gzip compressed text.
+
+ This is the same as doing::
+
+ tmp = StringIO()
+ gz = GzipFile(mode='wb', fileobj=tmp)
+ gz.write(text)
+ out = tmp.getvalue()
+
+ However it is focused on not creating as many intermediate objects.
+
+ :param text: A Python str containing text to be compressed. This should
+ not be a unicode string.
+ :return: A Python str containing the compressed text.
+ """
+ tmp = StringIO()
+ gz = GzipFile(mode='wb', fileobj=tmp)
+ gz.flush()
+ gz.close()
+ return tmp.getvalue()
+
+
+def decompress_gzip_hunk(hunk):
+ """Take a gzipped compressed string, and return the decompressed string.
+
+ This is essentially the same as doing::
+
+ out = GzipFile(mode='rb', fileobj=StringIO(hunk)).read()
+
+ However, it does not require creating all of the intermediate objects.
+
+ :param hunk: A Python str containing a gzipped section.
+ :return: A Python str containg the uncompressed text.
+ """
+ return GzipFile(mode='rb', fileobj=StringIO(hunk)).read()
More information about the bazaar-commits
mailing list