Rev 42: Play around with detecting compression breaks. in http://bazaar.launchpad.net/%7Ebzr/bzr-groupcompress/internal_index

John Arbash Meinel john at arbash-meinel.com
Thu Mar 5 03:30:31 GMT 2009


At http://bazaar.launchpad.net/%7Ebzr/bzr-groupcompress/internal_index

------------------------------------------------------------
revno: 42
revision-id: john at arbash-meinel.com-20090305032949-ffww56phklv1vhbj
parent: john at arbash-meinel.com-20090304223243-xrg48jyhczvpkjxc
committer: John Arbash Meinel <john at arbash-meinel.com>
branch nick: internal_index
timestamp: Wed 2009-03-04 21:29:49 -0600
message:
  Play around with detecting compression breaks.
  Trying to get tricky with whether the last insert was a fulltext or delta
  did not pay off well (yet).
  However, using similar logic actually shows some of the best results yet.
  The main difference is probably that we detect overflow and rollback.
  So if we got a big fulltext that pushes us over the line, in the past
  we would leave it alone (poorly compressed in the last group),
  and start a new group, which would start off with a new fulltext.
-------------- next part --------------
=== modified file 'groupcompress.py'
--- a/groupcompress.py	2009-03-04 22:32:43 +0000
+++ b/groupcompress.py	2009-03-05 03:29:49 +0000
@@ -254,6 +254,13 @@
         z_bytes = []
         z_bytes.append(c.compress(bytes))
         del bytes
+        # TODO: we may want to have the header compressed in the same chain
+        #       as the data, or we may not, evaulate it
+        #       having them compressed together is probably a win for
+        #       revisions and the 'inv' portion of chk inventories. As the
+        #       label in the header is duplicated in the text.
+        #       For chk pages and real bytes, I would guess this is not
+        #       true.
         z_bytes.append(c.flush(zlib.Z_SYNC_FLUSH))
         z_len = sum(map(len, z_bytes))
         c_len = len(content)
@@ -295,7 +302,9 @@
         self.lines = []
         self.endpoint = 0
         self.input_bytes = 0
+        self.num_keys = 0
         self.labels_deltas = {}
+        self._last = None
         self._delta_index = _groupcompress_pyx.DeltaIndex()
         self._block = GroupCompressBlock()
 
@@ -338,21 +347,22 @@
         max_delta_size = len(bytes) / 2
         delta = self._delta_index.make_delta(bytes, max_delta_size)
         if (delta is None):
-            # We can't delta (perhaps source_text is empty)
-            # so mark this as an insert
-            self._block.add_entry(key, type='fulltext', sha1=sha1,
-                                  start=self.endpoint, length=len(bytes))
+            type = 'fulltext'
+            length = len(bytes)
             self._delta_index.add_source(bytes, 0)
             new_chunks = [bytes]
         else:
-            self._block.add_entry(key, type='delta', sha1=sha1,
-                                  start=self.endpoint, length=len(delta))
+            type = 'delta'
+            length = len(delta)
             new_chunks = [delta]
             if _FAST:
                 self._delta_index._source_offset += len(delta)
             else:
                 self._delta_index.add_delta_source(delta, 0)
+        self._block.add_entry(key, type=type, sha1=sha1,
+                              start=self.endpoint, length=length)
         delta_start = (self.endpoint, len(self.lines))
+        self.num_keys += 1
         self.output_chunks(new_chunks)
         self.input_bytes += input_len
         delta_end = (self.endpoint, len(self.lines))
@@ -361,7 +371,7 @@
             raise AssertionError('the delta index is out of sync'
                 'with the output lines %s != %s'
                 % (self._delta_index._source_offset, self.endpoint))
-        return sha1, self.endpoint
+        return sha1, self.endpoint, type, length
 
     def extract(self, key):
         """Extract a key previously added to the compressor.
@@ -389,11 +399,23 @@
 
         :param new_chunks: The chunks to output.
         """
+        self._last = (len(self.lines), self.endpoint)
         endpoint = self.endpoint
         self.lines.extend(new_chunks)
         endpoint += sum(map(len, new_chunks))
         self.endpoint = endpoint
 
+    def pop_last(self):
+        """Call this if you want to 'revoke' the last compression.
+
+        After this, the data structures will be rolled back, but you cannot do
+        more compression.
+        """
+        self._delta_index = None
+        del self.lines[self._last[0]:]
+        self.endpoint = self._last[1]
+        self._last = None
+
     def ratio(self):
         """Return the overall compression ratio."""
         return float(self.input_bytes) / float(self.endpoint)
@@ -728,15 +750,7 @@
         self._unadded_refs = {}
         keys_to_add = []
         basis_end = 0
-        groups = 1
         def flush():
-            # TODO: we may want to have the header compressed in the same chain
-            #       as the data, or we may not, evaulate it
-            #       having them compressed together is probably a win for
-            #       revisions and the 'inv' portion of chk inventories. As the
-            #       label in the header is duplicated in the text.
-            #       For chk pages and real bytes, I would guess this is not
-            #       true.
             bytes = self._compressor._block.to_bytes(
                 ''.join(self._compressor.lines))
             index, start, length = self._access.add_raw_records(
@@ -745,7 +759,13 @@
             for key, reads, refs in keys_to_add:
                 nodes.append((key, "%d %d %s" % (start, length, reads), refs))
             self._index.add_records(nodes, random_id=random_id)
+            self._unadded_refs = {}
+            del keys_to_add[:]
+            self._compressor = GroupCompressor(self._delta)
+
         last_prefix = None
+        last_fulltext_len = None
+        max_fulltext_len = 0
         for record in stream:
             # Raise an error when a record is missing.
             if record.storage_kind == 'absent':
@@ -756,21 +776,59 @@
                 adapter_key = record.storage_kind, 'fulltext'
                 adapter = get_adapter(adapter_key)
                 bytes = adapter.get_bytes(record)
-            soft = False
             if len(record.key) > 1:
                 prefix = record.key[0]
-                if (last_prefix is not None and prefix != last_prefix):
-                    soft = True
-                    if basis_end > 1024 * 1024 * 2:
-                        flush()
-                        self._compressor = GroupCompressor(self._delta)
-                        self._unadded_refs = {}
-                        keys_to_add = []
-                        basis_end = 0
-                        groups += 1
-                last_prefix = prefix
-            found_sha1, end_point = self._compressor.compress(record.key,
-                bytes, record.sha1, soft=soft)
+            else:
+                prefix = None
+            max_fulltext_len = max(max_fulltext_len, len(bytes))
+            (found_sha1, end_point, type,
+             length) = self._compressor.compress(record.key,
+                bytes, record.sha1)
+            # Check if we want to continue to include that text
+            start_new_block = False
+            if end_point > 2 * max_fulltext_len:
+                if end_point > 4*1024*1024:
+                    start_new_block = True
+                elif (prefix is not None and prefix != last_prefix
+                      and end_point > 2*1024*1024):
+                    start_new_block = True
+            # if type == 'fulltext':
+            #     # If this is the first text, we don't do anything
+            #     if self._compressor.num_keys > 1:
+            #         if prefix is not None and prefix != last_prefix:
+            #             # We just inserted a fulltext for a different prefix
+            #             # (aka file-id).
+            #             if end_point > 512 * 1024:
+            #                 start_new_block = True
+            #             # TODO: Consider packing several small texts together
+            #             #       maybe only flush if end_point > some threshold
+            #             # if end_point > 512 * 1024 or len(bytes) <
+            #             #     start_new_block = true
+            #         else:
+            #             # We just added a fulltext, part of the same file-id
+            #             if (end_point > 2*1024*1024
+            #                 and end_point > 5*max_fulltext_len):
+            #                 start_new_block = True
+            #     last_fulltext_len = len(bytes)
+            # else:
+            #     delta_ratio = float(len(bytes)) / length
+            #     if delta_ratio < 3: # Not much compression
+            #         if end_point > 1*1024*1024:
+            #             start_new_block = True
+            #     elif delta_ratio < 10: # 10:1 compression
+            #         if end_point > 4*1024*1024:
+            #             start_new_block = True
+            last_prefix = prefix
+            if start_new_block:
+                self._compressor.pop_last()
+                flush()
+                basis_end = 0
+                max_fulltext_len = len(bytes)
+                (found_sha1, end_point, type,
+                 length) = self._compressor.compress(record.key,
+                    bytes, record.sha1)
+                assert type == 'fulltext'
+                last_fulltext_len = length
             if record.key[-1] is None:
                 key = record.key[:-1] + ('sha1:' + found_sha1,)
             else:
@@ -780,27 +838,10 @@
             keys_to_add.append((key, '%d %d' % (basis_end, end_point),
                 (record.parents,)))
             basis_end = end_point
-            # Interestingly, the sweet spot is 4MB, at 8 and 2 MB the total
-            # size increases... we need a better way of deciding than just
-            # random testing against a given dataset.
-            #   2MB     10.3MB
-            #   3MB      8.4MB
-            #   4MB      8.6MB
-            #   8MB     10.0MB
-            # This effects file content more than other bits, because they
-            # don't get *large* enough to overflow here. (The total compressed
-            # inventory size is only 1.5MB on my test set.)
-            if basis_end > 1024 * 1024 * 4:
-                flush()
-                self._compressor = GroupCompressor(self._delta)
-                self._unadded_refs = {}
-                keys_to_add = []
-                basis_end = 0
-                groups += 1
         if len(keys_to_add):
             flush()
         self._compressor = None
-        self._unadded_refs = {}
+        assert self._unadded_refs == {}
 
     def iter_lines_added_or_present_in_keys(self, keys, pb=None):
         """Iterate over the lines in the versioned files from keys.

=== modified file 'tests/test_groupcompress.py'
--- a/tests/test_groupcompress.py	2009-03-04 21:42:11 +0000
+++ b/tests/test_groupcompress.py	2009-03-05 03:29:49 +0000
@@ -59,7 +59,7 @@
     def test_one_nosha_delta(self):
         # diff against NUKK
         compressor = groupcompress.GroupCompressor(True)
-        sha1, end_point = compressor.compress(('label',),
+        sha1, end_point, _, _ = compressor.compress(('label',),
             'strange\ncommon\n', None)
         self.assertEqual(sha_string('strange\ncommon\n'), sha1)
         expected_lines = [
@@ -85,10 +85,10 @@
 
     def test_two_nosha_delta(self):
         compressor = groupcompress.GroupCompressor(True)
-        sha1_1, _ = compressor.compress(('label',),
+        sha1_1, _, _, _ = compressor.compress(('label',),
             'strange\ncommon long line\nthat needs a 16 byte match\n', None)
         expected_lines = list(compressor.lines)
-        sha1_2, end_point = compressor.compress(('newlabel',),
+        sha1_2, end_point, _, _ = compressor.compress(('newlabel',),
             'common long line\nthat needs a 16 byte match\ndifferent\n', None)
         self.assertEqual(sha_string('common long line\n'
                                     'that needs a 16 byte match\n'
@@ -108,12 +108,12 @@
         # The first interesting test: make a change that should use lines from
         # both parents.
         compressor = groupcompress.GroupCompressor(True)
-        sha1_1, end_point = compressor.compress(('label',),
+        sha1_1, end_point, _, _ = compressor.compress(('label',),
             'strange\ncommon very very long line\nwith some extra text\n', None)
-        sha1_2, _ = compressor.compress(('newlabel',),
+        sha1_2, _, _, _ = compressor.compress(('newlabel',),
             'different\nmoredifferent\nand then some more\n', None)
         expected_lines = list(compressor.lines)
-        sha1_3, end_point = compressor.compress(('label3',),
+        sha1_3, end_point, _, _ = compressor.compress(('label3',),
             'new\ncommon very very long line\nwith some extra text\n'
             'different\nmoredifferent\nand then some more\n',
             None)
@@ -149,10 +149,10 @@
         # Knit fetching will try to reconstruct texts locally which results in
         # reading something that is in the compressor stream already.
         compressor = groupcompress.GroupCompressor(True)
-        sha1_1, _ = compressor.compress(('label',),
+        sha1_1, _, _, _ = compressor.compress(('label',),
             'strange\ncommon long line\nthat needs a 16 byte match\n', None)
         expected_lines = list(compressor.lines)
-        sha1_2, end_point = compressor.compress(('newlabel',),
+        sha1_2, end_point, _, _ = compressor.compress(('newlabel',),
             'common long line\nthat needs a 16 byte match\ndifferent\n', None)
         # get the first out
         self.assertEqual(('strange\ncommon long line\n'



More information about the bazaar-commits mailing list