Rev 3383: get_record_stream stacking for delta access. in http://people.ubuntu.com/~robertc/baz2.0/shallow-branch

Robert Collins robertc at robertcollins.net
Mon Jun 23 01:19:33 BST 2008


At http://people.ubuntu.com/~robertc/baz2.0/shallow-branch

------------------------------------------------------------
revno: 3383
revision-id: robertc at robertcollins.net-20080623001926-8vos9qr0xgbm74wu
parent: robertc at robertcollins.net-20080619215406-rmavu3lj4q2d6ryf
committer: Robert Collins <robertc at robertcollins.net>
branch nick: stacking-knits
timestamp: Mon 2008-06-23 10:19:26 +1000
message:
  get_record_stream stacking for delta access.
modified:
  bzrlib/knit.py                 knit.py-20051212171256-f056ac8f0fbe1bd9
  bzrlib/tests/test_knit.py      test_knit.py-20051212171302-95d4c00dd5f11f2b
=== modified file 'bzrlib/knit.py'
--- a/bzrlib/knit.py	2008-06-19 21:54:06 +0000
+++ b/bzrlib/knit.py	2008-06-23 00:19:26 +0000
@@ -1025,16 +1025,30 @@
         :return: A mapping from keys to parents. Absent keys are absent from
             the mapping.
         """
+        return self._get_parent_map(keys)[0]
+
+    def _get_parent_map(self, keys):
+        """Get a map of the parents of keys.
+
+        :param keys: The keys to look up parents for.
+        :return: A tuple. The first element is a mapping from keys to parents.
+            Absent keys are absent from the mapping. The second element is a
+            list with the locations each key was found in. The first element
+            is the in-this-knit parents, the second the first fallback source,
+            and so on.
+        """
         result = {}
         sources = [self._index] + self._fallback_vfs
+        source_results = []
         missing = set(keys)
         for source in sources:
             if not missing:
                 break
             new_result = source.get_parent_map(missing)
+            source_results.append(new_result)
             result.update(new_result)
             missing.difference_update(set(new_result))
-        return result
+        return result, source_results
 
     def _get_record_map(self, keys, allow_missing=False):
         """Produce a dictionary of knit records.
@@ -1079,6 +1093,8 @@
         """
         # keys might be a generator
         keys = set(keys)
+        if not keys:
+            return
         if not self._index.has_graph:
             # Cannot topological order when no graph has been stored.
             ordering = 'unordered'
@@ -1092,6 +1108,7 @@
         # There may be more absent keys : if we're missing the basis component
         # and are trying to include the delta closure.
         if include_delta_closure:
+            needed_from_fallback = set()
             # key:True means key can be reconstructed
             checked_keys = {}
             for key in keys:
@@ -1099,7 +1116,7 @@
                 try:
                     chain = [key, positions[key][2]]
                 except KeyError:
-                    absent_keys.add(key)
+                    needed_from_fallback.add(key)
                     continue
                 result = True
                 while chain[-1] is not None:
@@ -1116,33 +1133,64 @@
                 for chain_key in chain[:-1]:
                     checked_keys[chain_key] = result
                 if not result:
-                    absent_keys.add(key)
+                    needed_from_fallback.add(key)
+        # Double index lookups here : need a unified api ?
+        global_map, parent_maps = self._get_parent_map(keys)
+        if ordering == 'topological':
+            # Global topological sort
+            present_keys = topo_sort(global_map)
+            # Now group by source:
+            source_keys = []
+            current_source = None
+            for key in present_keys:
+                for parent_map in parent_maps:
+                    if key in parent_map:
+                        key_source = parent_map
+                        break
+                if current_source is not key_source:
+                    source_keys.append((key_source, []))
+                    current_source = key_source
+                source_keys[-1][1].append(key)
+        else:
+            # Just group by source; remote sources first.
+            present_keys = []
+            source_keys = []
+            for parent_map in reversed(parent_maps):
+                source_keys.append((parent_map, []))
+                for key in parent_map:
+                    present_keys.append(key)
+                    source_keys[-1][1].append(key)
+        absent_keys = keys - set(global_map)
         for key in absent_keys:
             yield AbsentContentFactory(key)
         # restrict our view to the keys we can answer.
-        keys = keys - absent_keys
-        # Double index lookups here : need a unified api ?
-        parent_map = self.get_parent_map(keys)
-        if ordering == 'topological':
-            present_keys = topo_sort(parent_map)
-        else:
-            present_keys = keys
+        our_keys = parent_maps[0]
+        # keys - needed_from_fallback
+        # keys = keys - absent_keys
         # XXX: Memory: TODO: batch data here to cap buffered data at (say) 1MB.
-        # XXX: At that point we need to consider double reads by utilising
-        # components multiple times.
+        # XXX: At that point we need to consider the impact of double reads by
+        # utilising components multiple times.
         if include_delta_closure:
             # XXX: get_content_maps performs its own index queries; allow state
             # to be passed in.
             text_map, _ = self._get_content_maps(present_keys)
             for key in present_keys:
-                yield FulltextContentFactory(key, parent_map[key], None,
+                yield FulltextContentFactory(key, global_map[key], None,
                     ''.join(text_map[key]))
         else:
-            records = [(key, positions[key][1]) for key in present_keys]
-            for key, raw_data, sha1 in self._read_records_iter_raw(records):
-                (record_details, index_memo, _) = positions[key]
-                yield KnitContentFactory(key, parent_map[key],
-                    record_details, sha1, raw_data, self._factory.annotated, None)
+            for source, keys in source_keys:
+                if source is parent_maps[0]:
+                    # this KnitVersionedFiles
+                    records = [(key, positions[key][1]) for key in keys]
+                    for key, raw_data, sha1 in self._read_records_iter_raw(records):
+                        (record_details, index_memo, _) = positions[key]
+                        yield KnitContentFactory(key, global_map[key],
+                            record_details, sha1, raw_data, self._factory.annotated, None)
+                else:
+                    vf = self._fallback_vfs[parent_maps.index(source) - 1]
+                    for record in vf.get_record_stream(keys, ordering,
+                        include_delta_closure):
+                        yield record
 
     def get_sha1s(self, keys):
         """See VersionedFiles.get_sha1s()."""

=== modified file 'bzrlib/tests/test_knit.py'
--- a/bzrlib/tests/test_knit.py	2008-06-19 21:54:06 +0000
+++ b/bzrlib/tests/test_knit.py	2008-06-23 00:19:26 +0000
@@ -59,6 +59,7 @@
 from bzrlib.transport.memory import MemoryTransport
 from bzrlib.tuned_gzip import GzipFile
 from bzrlib.versionedfile import (
+    AbsentContentFactory,
     ConstantMapper,
     RecordingVersionedFilesDecorator,
     )
@@ -1423,8 +1424,87 @@
         self.assertEqual([("get_parent_map", set([key_basis, key_missing]))],
             basis.calls)
 
-    def test_get_record_stream(self):
-        pass
+    def test_get_record_stream_unordered_deltas(self):
+        # records from the test knit are answered without asking the basis:
+        basis, test = self.get_basis_and_test_knit()
+        key = ('foo',)
+        key_basis = ('bar',)
+        key_missing = ('missing',)
+        test.add_lines(key, (), ['foo\n'])
+        records = list(test.get_record_stream([key], 'unordered', False))
+        self.assertEqual(1, len(records))
+        self.assertEqual([], basis.calls)
+        # Missing (from test knit) objects are retrieved from the basis:
+        basis.add_lines(key_basis, (), ['foo\n', 'bar\n'])
+        basis.calls = []
+        records = list(test.get_record_stream([key_basis, key_missing],
+            'unordered', False))
+        self.assertEqual(2, len(records))
+        calls = list(basis.calls)
+        for record in records:
+            self.assertSubset([record.key], (key_basis, key_missing))
+            if record.key == key_missing:
+                self.assertIsInstance(record, AbsentContentFactory)
+            else:
+                reference = list(basis.get_record_stream([key_basis],
+                    'unordered', False))[0]
+                self.assertEqual(reference.key, record.key)
+                self.assertEqual(reference.sha1, record.sha1)
+                self.assertEqual(reference.storage_kind, record.storage_kind)
+                self.assertEqual(reference.get_bytes_as(reference.storage_kind),
+                    record.get_bytes_as(record.storage_kind))
+        # Its not strictly minimal, but it seems reasonable for now for it to
+        # ask which fallbacks have which parents.
+        self.assertEqual([
+            ("get_parent_map", set([key_basis, key_missing])),
+            ("get_record_stream", [key_basis], 'unordered', False)],
+            calls)
+
+    def test_get_record_stream_ordered_deltas(self):
+        # ordering is preserved down into the fallback store.
+        basis, test = self.get_basis_and_test_knit()
+        key = ('foo',)
+        key_basis = ('bar',)
+        key_basis_2 = ('quux',)
+        key_missing = ('missing',)
+        test.add_lines(key, (key_basis,), ['foo\n'])
+        # Missing (from test knit) objects are retrieved from the basis:
+        basis.add_lines(key_basis, (key_basis_2,), ['foo\n', 'bar\n'])
+        basis.add_lines(key_basis_2, (), ['quux\n'])
+        basis.calls = []
+        # ask for in non-topological order
+        records = list(test.get_record_stream(
+            [key, key_basis, key_missing, key_basis_2], 'topological', False))
+        self.assertEqual(4, len(records))
+        results = []
+        for record in records:
+            self.assertSubset([record.key],
+                (key_basis, key_missing, key_basis_2, key))
+            if record.key == key_missing:
+                self.assertIsInstance(record, AbsentContentFactory)
+            else:
+                results.append((record.key, record.sha1, record.storage_kind,
+                    record.get_bytes_as(record.storage_kind)))
+        calls = list(basis.calls)
+        order = [record[0] for record in results]
+        self.assertEqual([key_basis_2, key_basis, key], order)
+        for result in results:
+            if result[0] == key:
+                source = test
+            else:
+                source = basis
+            record = source.get_record_stream([result[0]], 'unordered',
+                False).next()
+            self.assertEqual(record.key, result[0])
+            self.assertEqual(record.sha1, result[1])
+            self.assertEqual(record.storage_kind, result[2])
+            self.assertEqual(record.get_bytes_as(record.storage_kind), result[3])
+        # Its not strictly minimal, but it seems reasonable for now for it to
+        # ask which fallbacks have which parents.
+        self.assertEqual([
+            ("get_parent_map", set([key_basis, key_basis_2, key_missing])),
+            ("get_record_stream", [key_basis_2, key_basis], 'topological', False)],
+            calls)
 
     def test_get_sha1s(self):
         # sha1's in the test knit are answered without asking the basis




More information about the bazaar-commits mailing list