Rev 5759: Expose the max_entries_per_source into GroupCompressVersionedFiles in http://bazaar.launchpad.net/~jameinel/bzr/2.4-max-entries-gc-602614

John Arbash Meinel john at arbash-meinel.com
Thu May 12 11:58:43 UTC 2011


At http://bazaar.launchpad.net/~jameinel/bzr/2.4-max-entries-gc-602614

------------------------------------------------------------
revno: 5759
revision-id: john at arbash-meinel.com-20110512115831-q8nzn6owcz3sy3ul
parent: john at arbash-meinel.com-20110512113839-reasdnirl4889ptb
committer: John Arbash Meinel <john at arbash-meinel.com>
branch nick: 2.4-max-entries-gc-602614
timestamp: Thu 2011-05-12 13:58:31 +0200
message:
  Expose the max_entries_per_source into GroupCompressVersionedFiles
  
  At the moment, we set it via a global config value, that gets looked up once
  per VF object, which isn't ideal, but is good enough for now.
-------------- next part --------------
=== modified file 'bzrlib/groupcompress.py'
--- a/bzrlib/groupcompress.py	2011-05-09 02:33:16 +0000
+++ b/bzrlib/groupcompress.py	2011-05-12 11:58:31 +0000
@@ -27,6 +27,7 @@
 lazy_import(globals(), """
 from bzrlib import (
     annotate,
+    config,
     debug,
     errors,
     graph as _mod_graph,
@@ -910,7 +911,7 @@
 
 class PythonGroupCompressor(_CommonGroupCompressor):
 
-    def __init__(self):
+    def __init__(self, max_entries_per_source=None):
         """Create a GroupCompressor.
 
         Used only if the pyrex version is not available.
@@ -969,9 +970,10 @@
        left side.
     """
 
-    def __init__(self):
+    def __init__(self, max_entries_per_source=None):
         super(PyrexGroupCompressor, self).__init__()
-        self._delta_index = DeltaIndex()
+        self._delta_index = DeltaIndex(
+            max_entries_per_source=max_entries_per_source)
 
     def _compress(self, key, bytes, max_delta_size, soft=False):
         """see _CommonGroupCompressor._compress"""
@@ -1177,8 +1179,18 @@
 class GroupCompressVersionedFiles(VersionedFilesWithFallbacks):
     """A group-compress based VersionedFiles implementation."""
 
+    # This controls how the GroupCompress DeltaIndex works. Basically, we
+    # compute hash pointers into the source blocks (so hash(text) => text).
+    # However each of these references costs some memory in trade against a
+    # more accurate match result. For very large files, they either are
+    # pre-compressed and change in bulk whenever they change, or change in just
+    # local blocks. Either way, 'improved resolution' is not very helpful,
+    # versus running out of memory trying to track everything. The default max
+    # gives 100% sampling of a 1MB file.
+    _DEFAULT_MAX_ENTRIES_PER_SOURCE = 1024 * 1024 / 16
+
     def __init__(self, index, access, delta=True, _unadded_refs=None,
-            _group_cache=None):
+                 _group_cache=None):
         """Create a GroupCompressVersionedFiles object.
 
         :param index: The index object storing access and graph data.
@@ -1197,6 +1209,7 @@
             _group_cache = LRUSizeCache(max_size=50*1024*1024)
         self._group_cache = _group_cache
         self._immediate_fallback_vfs = []
+        self._max_entries_per_source = None
 
     def without_fallbacks(self):
         """Return a clone of this object without any fallbacks configured."""
@@ -1628,6 +1641,27 @@
         for _ in self._insert_record_stream(stream, random_id=False):
             pass
 
+    def _make_group_compressor(self):
+        if self._max_entries_per_source is None:
+            # TODO: VersionedFiles don't know about their containing
+            #       repository, so they don't have much of an idea about their
+            #       location. So for now, this is only a global option.
+            c = config.GlobalConfig()
+            val = c.get_user_option('bzr.groupcompress.max_entries_per_source')
+            if val is not None:
+                try:
+                    val = int(val)
+                except ValueError, e:
+                    trace.warning('Value for '
+                                  '"bzr.groupcompress.max_entries_per_source"'
+                                  ' %r is not an integer'
+                                  % (val,))
+                    val = None
+            if val is None:
+                val = self._DEFAULT_MAX_ENTRIES_PER_SOURCE
+            self._max_entries_per_source = val
+        return GroupCompressor(self._max_entries_per_source)
+
     def _insert_record_stream(self, stream, random_id=False, nostore_sha=None,
                               reuse_blocks=True):
         """Internal core to insert a record stream into this container.
@@ -1656,12 +1690,12 @@
                 return adapter
         # This will go up to fulltexts for gc to gc fetching, which isn't
         # ideal.
-        self._compressor = GroupCompressor()
+        self._compressor = self._make_group_compressor()
         self._unadded_refs = {}
         keys_to_add = []
         def flush():
             bytes_len, chunks = self._compressor.flush().to_chunks()
-            self._compressor = GroupCompressor()
+            self._compressor = self._make_group_compressor()
             # Note: At this point we still have 1 copy of the fulltext (in
             #       record and the var 'bytes'), and this generates 2 copies of
             #       the compressed text (one for bytes, one in chunks)

=== modified file 'bzrlib/tests/test_groupcompress.py'
--- a/bzrlib/tests/test_groupcompress.py	2011-01-10 22:20:12 +0000
+++ b/bzrlib/tests/test_groupcompress.py	2011-05-12 11:58:31 +0000
@@ -20,6 +20,7 @@
 
 from bzrlib import (
     btree_index,
+    config,
     groupcompress,
     errors,
     index as _mod_index,
@@ -770,6 +771,49 @@
         self.assertEqual(0, len(vf._group_cache))
 
 
+class TestGroupCompressConfig(tests.TestCaseWithTransport):
+
+    def make_test_vf(self):
+        t = self.get_transport('.')
+        t.ensure_base()
+        factory = groupcompress.make_pack_factory(graph=True,
+            delta=False, keylength=1, inconsistency_fatal=True)
+        vf = factory(t)
+        self.addCleanup(groupcompress.cleanup_pack_group, vf)
+        return vf
+
+    def test_max_entries_per_source_default(self):
+        vf = self.make_test_vf()
+        gc = vf._make_group_compressor()
+        self.assertEqual(vf._DEFAULT_MAX_ENTRIES_PER_SOURCE,
+                         vf._max_entries_per_source)
+        if isinstance(gc, groupcompress.PyrexGroupCompressor):
+            self.assertEqual(vf._DEFAULT_MAX_ENTRIES_PER_SOURCE,
+                             gc._delta_index._max_entries_per_source)
+
+    def test_max_entries_per_source_in_config(self):
+        c = config.GlobalConfig()
+        c.set_user_option('bzr.groupcompress.max_entries_per_source', '10000')
+        vf = self.make_test_vf()
+        gc = vf._make_group_compressor()
+        self.assertEqual(10000, vf._max_entries_per_source)
+        if isinstance(gc, groupcompress.PyrexGroupCompressor):
+            self.assertEqual(10000, gc._delta_index._max_entries_per_source)
+
+    def test_max_entries_per_source_bad_config(self):
+        c = config.GlobalConfig()
+        c.set_user_option('bzr.groupcompress.max_entries_per_source', 'boogah')
+        vf = self.make_test_vf()
+        # TODO: This is triggering a warning, we might want to trap and make
+        #       sure it is readable.
+        gc = vf._make_group_compressor()
+        self.assertEqual(vf._DEFAULT_MAX_ENTRIES_PER_SOURCE,
+                         vf._max_entries_per_source)
+        if isinstance(gc, groupcompress.PyrexGroupCompressor):
+            self.assertEqual(vf._DEFAULT_MAX_ENTRIES_PER_SOURCE,
+                             gc._delta_index._max_entries_per_source)
+
+
 
 class StubGCVF(object):
     def __init__(self, canned_get_blocks=None):



More information about the bazaar-commits mailing list