Rev 3878: Some hacky code to check for: in http://bzr.arbash-meinel.com/branches/bzr/brisbane/hack3

John Arbash Meinel john at arbash-meinel.com
Thu Mar 19 22:35:56 GMT 2009


At http://bzr.arbash-meinel.com/branches/bzr/brisbane/hack3

------------------------------------------------------------
revno: 3878
revision-id: john at arbash-meinel.com-20090319223546-odxel2nktm700d7e
parent: john at arbash-meinel.com-20090319194720-4esxj7gnrmfaykww
committer: John Arbash Meinel <john at arbash-meinel.com>
branch nick: hack3
timestamp: Thu 2009-03-19 17:35:46 -0500
message:
  Some hacky code to check for:
  a) empty records, like directories and symlinks. This might decrease index pressure
  a tiny bit, as they can be recorded as (0, 0, 0, 0).
  b) duplicate entries in the same gc block.
  Rather than insert another copy of the same delta (or a delta to the identical text).
  
  We seem to get quite a few hits for both (b) and (a), so it is probably worth looking
  closer at them.
-------------- next part --------------
=== modified file 'bzrlib/groupcompress.py'
--- a/bzrlib/groupcompress.py	2009-03-19 18:38:49 +0000
+++ b/bzrlib/groupcompress.py	2009-03-19 22:35:46 +0000
@@ -339,6 +339,8 @@
         :param sha1: TODO (should we validate only when sha1 is supplied?)
         :return: The bytes for the content
         """
+        if start == end == 0:
+            return ''
         # Make sure we have enough bytes for this record
         # TODO: if we didn't want to track the end of this entry, we could
         #       _ensure_content(start+enough_bytes_for_type_and_length), and
@@ -747,12 +749,13 @@
        left side.
     """
 
-    def __init__(self):
+    def __init__(self, check_for_dupes=False):
         """Create a GroupCompressor."""
         # Consider seeding the lines with some sort of GC Start flag, or
         # putting it as part of the output stream, rather than in the
         # compressed bytes.
         self.lines = []
+        self._check_for_dupes = check_for_dupes
         self.endpoint = 0
         self.input_bytes = 0
         self.num_keys = 0
@@ -760,6 +763,8 @@
         self._last = None
         self._delta_index = _groupcompress_pyx.DeltaIndex()
         self._block = GroupCompressBlock()
+        self._entries_by_sha1 = {}
+        self._deduped_entries = 0
 
     def compress(self, key, bytes, expected_sha, nostore_sha=None, soft=False):
         """Compress lines with label key.
@@ -780,10 +785,14 @@
             the group output so far.
         :seealso VersionedFiles.add_lines:
         """
+        if not bytes: # empty, like a dir entry, etc
+            return None, 0, 'fulltext', 0
         sha1 = None
         # we assume someone knew what they were doing when they passed it in
         if expected_sha is not None:
             sha1 = expected_sha
+        if self._check_for_dupes and sha1 is None:
+            sha1 = osutils.sha_string(bytes)
         if nostore_sha is not None:
             if sha1 is None:
                 sha1 = osutils.sha_string(bytes)
@@ -807,6 +816,13 @@
             raise AssertionError('_source_offset != endpoint'
                 ' somehow the DeltaIndex got out of sync with'
                 ' the output lines')
+        if self._check_for_dupes and sha1 in self._entries_by_sha1:
+            self._deduped_entries += 1
+            (type, start, length) = self._entries_by_sha1[sha1]
+            self._block.add_entry(key, type=type,
+                                  sha1=sha1, start=start,
+                                  length=length)
+            return sha1, self.endpoint, 'sha1_dupe', 0
         max_delta_size = len(bytes) / 2
         delta = self._delta_index.make_delta(bytes, max_delta_size)
         if (delta is None):
@@ -828,6 +844,8 @@
                 self._delta_index.add_delta_source(delta, len_mini_header)
         self._block.add_entry(key, type=type, sha1=sha1,
                               start=self.endpoint, length=length)
+        if self._check_for_dupes:
+            self._entries_by_sha1[sha1] = (type, self.endpoint, length)
         delta_start = (self.endpoint, len(self.lines))
         self.num_keys += 1
         self.output_chunks(new_chunks)
@@ -957,7 +975,8 @@
 class GroupCompressVersionedFiles(VersionedFiles):
     """A group-compress based VersionedFiles implementation."""
 
-    def __init__(self, index, access, delta=True):
+    def __init__(self, index, access, delta=True,
+                 check_for_dupes=False):
         """Create a GroupCompressVersionedFiles object.
 
         :param index: The index object storing access and graph data.
@@ -968,6 +987,7 @@
         self._access = access
         self._delta = delta
         self._unadded_refs = {}
+        self._check_for_dupes = check_for_dupes
         self._group_cache = LRUSizeCache(max_size=50*1024*1024)
         self._fallback_vfs = []
 
@@ -1401,11 +1421,17 @@
                 return adapter
         # This will go up to fulltexts for gc to gc fetching, which isn't
         # ideal.
-        self._compressor = GroupCompressor()
+        self._compressor = GroupCompressor(self._check_for_dupes)
         self._unadded_refs = {}
         keys_to_add = []
         basis_end = 0
+        total_deduped = [0]
         def flush():
+            if self._compressor._deduped_entries > 0:
+                total_deduped[0] += self._compressor._deduped_entries
+                trace.note('Dedupped %d out of %d entries',
+                           self._compressor._deduped_entries,
+                           len(self._compressor._block._entries))
             bytes = self._compressor.flush().to_bytes()
             index, start, length = self._access.add_raw_records(
                 [(None, len(bytes))], bytes)[0]
@@ -1415,7 +1441,7 @@
             self._index.add_records(nodes, random_id=random_id)
             self._unadded_refs = {}
             del keys_to_add[:]
-            self._compressor = GroupCompressor()
+            self._compressor = GroupCompressor(self._check_for_dupes)
 
         last_prefix = None
         last_fulltext_len = None
@@ -1525,12 +1551,17 @@
                 key = record.key
             self._unadded_refs[key] = record.parents
             yield found_sha1
-            keys_to_add.append((key, '%d %d' % (basis_end, end_point),
-                (record.parents,)))
+            if length == 0:
+                keys_to_add.append((key, '0 0', (record.parents,)))
+            else:
+                keys_to_add.append((key, '%d %d' % (basis_end, end_point),
+                    (record.parents,)))
             basis_end = end_point
         if len(keys_to_add):
             flush()
         self._compressor = None
+        if total_deduped[0] > 0:
+            trace.note('Total deduped = %d\n', total_deduped[0])
 
     def iter_lines_added_or_present_in_keys(self, keys, pb=None):
         """Iterate over the lines in the versioned files from keys.

=== modified file 'bzrlib/repofmt/groupcompress_repo.py'
--- a/bzrlib/repofmt/groupcompress_repo.py	2009-03-19 19:47:20 +0000
+++ b/bzrlib/repofmt/groupcompress_repo.py	2009-03-19 22:35:46 +0000
@@ -358,13 +358,17 @@
                 indices.append(sub_index)
             index = _mod_index.CombinedGraphIndex(indices)
             add_callback = None
+        check_for_dupes = False
+        if index_name == 'text_index':
+            check_for_dupes = True
         vf = GroupCompressVersionedFiles(
             _GCGraphIndex(index,
                           add_callback=add_callback,
                           parents=parents,
                           is_locked=self._pack_collection.repo.is_locked),
             access=access,
-            delta=delta)
+            delta=delta,
+            check_for_dupes=check_for_dupes)
         return vf
 
     def _build_vfs(self, index_name, parents, delta):



More information about the bazaar-commits mailing list