Rev 3383: get_record_stream stacking for delta access. in http://people.ubuntu.com/~robertc/baz2.0/shallow-branch
Robert Collins
robertc at robertcollins.net
Mon Jun 23 01:19:33 BST 2008
At http://people.ubuntu.com/~robertc/baz2.0/shallow-branch
------------------------------------------------------------
revno: 3383
revision-id: robertc at robertcollins.net-20080623001926-8vos9qr0xgbm74wu
parent: robertc at robertcollins.net-20080619215406-rmavu3lj4q2d6ryf
committer: Robert Collins <robertc at robertcollins.net>
branch nick: stacking-knits
timestamp: Mon 2008-06-23 10:19:26 +1000
message:
get_record_stream stacking for delta access.
modified:
bzrlib/knit.py knit.py-20051212171256-f056ac8f0fbe1bd9
bzrlib/tests/test_knit.py test_knit.py-20051212171302-95d4c00dd5f11f2b
=== modified file 'bzrlib/knit.py'
--- a/bzrlib/knit.py 2008-06-19 21:54:06 +0000
+++ b/bzrlib/knit.py 2008-06-23 00:19:26 +0000
@@ -1025,16 +1025,30 @@
:return: A mapping from keys to parents. Absent keys are absent from
the mapping.
"""
+ return self._get_parent_map(keys)[0]
+
+ def _get_parent_map(self, keys):
+ """Get a map of the parents of keys.
+
+ :param keys: The keys to look up parents for.
+ :return: A tuple. The first element is a mapping from keys to parents.
+ Absent keys are absent from the mapping. The second element is a
+ list with the locations each key was found in. The first element
+ is the in-this-knit parents, the second the first fallback source,
+ and so on.
+ """
result = {}
sources = [self._index] + self._fallback_vfs
+ source_results = []
missing = set(keys)
for source in sources:
if not missing:
break
new_result = source.get_parent_map(missing)
+ source_results.append(new_result)
result.update(new_result)
missing.difference_update(set(new_result))
- return result
+ return result, source_results
def _get_record_map(self, keys, allow_missing=False):
"""Produce a dictionary of knit records.
@@ -1079,6 +1093,8 @@
"""
# keys might be a generator
keys = set(keys)
+ if not keys:
+ return
if not self._index.has_graph:
# Cannot topological order when no graph has been stored.
ordering = 'unordered'
@@ -1092,6 +1108,7 @@
# There may be more absent keys : if we're missing the basis component
# and are trying to include the delta closure.
if include_delta_closure:
+ needed_from_fallback = set()
# key:True means key can be reconstructed
checked_keys = {}
for key in keys:
@@ -1099,7 +1116,7 @@
try:
chain = [key, positions[key][2]]
except KeyError:
- absent_keys.add(key)
+ needed_from_fallback.add(key)
continue
result = True
while chain[-1] is not None:
@@ -1116,33 +1133,64 @@
for chain_key in chain[:-1]:
checked_keys[chain_key] = result
if not result:
- absent_keys.add(key)
+ needed_from_fallback.add(key)
+ # Double index lookups here : need a unified api ?
+ global_map, parent_maps = self._get_parent_map(keys)
+ if ordering == 'topological':
+ # Global topological sort
+ present_keys = topo_sort(global_map)
+ # Now group by source:
+ source_keys = []
+ current_source = None
+ for key in present_keys:
+ for parent_map in parent_maps:
+ if key in parent_map:
+ key_source = parent_map
+ break
+ if current_source is not key_source:
+ source_keys.append((key_source, []))
+ current_source = key_source
+ source_keys[-1][1].append(key)
+ else:
+ # Just group by source; remote sources first.
+ present_keys = []
+ source_keys = []
+ for parent_map in reversed(parent_maps):
+ source_keys.append((parent_map, []))
+ for key in parent_map:
+ present_keys.append(key)
+ source_keys[-1][1].append(key)
+ absent_keys = keys - set(global_map)
for key in absent_keys:
yield AbsentContentFactory(key)
# restrict our view to the keys we can answer.
- keys = keys - absent_keys
- # Double index lookups here : need a unified api ?
- parent_map = self.get_parent_map(keys)
- if ordering == 'topological':
- present_keys = topo_sort(parent_map)
- else:
- present_keys = keys
+ our_keys = parent_maps[0]
+ # keys - needed_from_fallback
+ # keys = keys - absent_keys
# XXX: Memory: TODO: batch data here to cap buffered data at (say) 1MB.
- # XXX: At that point we need to consider double reads by utilising
- # components multiple times.
+ # XXX: At that point we need to consider the impact of double reads by
+ # utilising components multiple times.
if include_delta_closure:
# XXX: get_content_maps performs its own index queries; allow state
# to be passed in.
text_map, _ = self._get_content_maps(present_keys)
for key in present_keys:
- yield FulltextContentFactory(key, parent_map[key], None,
+ yield FulltextContentFactory(key, global_map[key], None,
''.join(text_map[key]))
else:
- records = [(key, positions[key][1]) for key in present_keys]
- for key, raw_data, sha1 in self._read_records_iter_raw(records):
- (record_details, index_memo, _) = positions[key]
- yield KnitContentFactory(key, parent_map[key],
- record_details, sha1, raw_data, self._factory.annotated, None)
+ for source, keys in source_keys:
+ if source is parent_maps[0]:
+ # this KnitVersionedFiles
+ records = [(key, positions[key][1]) for key in keys]
+ for key, raw_data, sha1 in self._read_records_iter_raw(records):
+ (record_details, index_memo, _) = positions[key]
+ yield KnitContentFactory(key, global_map[key],
+ record_details, sha1, raw_data, self._factory.annotated, None)
+ else:
+ vf = self._fallback_vfs[parent_maps.index(source) - 1]
+ for record in vf.get_record_stream(keys, ordering,
+ include_delta_closure):
+ yield record
def get_sha1s(self, keys):
"""See VersionedFiles.get_sha1s()."""
=== modified file 'bzrlib/tests/test_knit.py'
--- a/bzrlib/tests/test_knit.py 2008-06-19 21:54:06 +0000
+++ b/bzrlib/tests/test_knit.py 2008-06-23 00:19:26 +0000
@@ -59,6 +59,7 @@
from bzrlib.transport.memory import MemoryTransport
from bzrlib.tuned_gzip import GzipFile
from bzrlib.versionedfile import (
+ AbsentContentFactory,
ConstantMapper,
RecordingVersionedFilesDecorator,
)
@@ -1423,8 +1424,87 @@
self.assertEqual([("get_parent_map", set([key_basis, key_missing]))],
basis.calls)
- def test_get_record_stream(self):
- pass
+ def test_get_record_stream_unordered_deltas(self):
+ # records from the test knit are answered without asking the basis:
+ basis, test = self.get_basis_and_test_knit()
+ key = ('foo',)
+ key_basis = ('bar',)
+ key_missing = ('missing',)
+ test.add_lines(key, (), ['foo\n'])
+ records = list(test.get_record_stream([key], 'unordered', False))
+ self.assertEqual(1, len(records))
+ self.assertEqual([], basis.calls)
+ # Missing (from test knit) objects are retrieved from the basis:
+ basis.add_lines(key_basis, (), ['foo\n', 'bar\n'])
+ basis.calls = []
+ records = list(test.get_record_stream([key_basis, key_missing],
+ 'unordered', False))
+ self.assertEqual(2, len(records))
+ calls = list(basis.calls)
+ for record in records:
+ self.assertSubset([record.key], (key_basis, key_missing))
+ if record.key == key_missing:
+ self.assertIsInstance(record, AbsentContentFactory)
+ else:
+ reference = list(basis.get_record_stream([key_basis],
+ 'unordered', False))[0]
+ self.assertEqual(reference.key, record.key)
+ self.assertEqual(reference.sha1, record.sha1)
+ self.assertEqual(reference.storage_kind, record.storage_kind)
+ self.assertEqual(reference.get_bytes_as(reference.storage_kind),
+ record.get_bytes_as(record.storage_kind))
+ # Its not strictly minimal, but it seems reasonable for now for it to
+ # ask which fallbacks have which parents.
+ self.assertEqual([
+ ("get_parent_map", set([key_basis, key_missing])),
+ ("get_record_stream", [key_basis], 'unordered', False)],
+ calls)
+
+ def test_get_record_stream_ordered_deltas(self):
+ # ordering is preserved down into the fallback store.
+ basis, test = self.get_basis_and_test_knit()
+ key = ('foo',)
+ key_basis = ('bar',)
+ key_basis_2 = ('quux',)
+ key_missing = ('missing',)
+ test.add_lines(key, (key_basis,), ['foo\n'])
+ # Missing (from test knit) objects are retrieved from the basis:
+ basis.add_lines(key_basis, (key_basis_2,), ['foo\n', 'bar\n'])
+ basis.add_lines(key_basis_2, (), ['quux\n'])
+ basis.calls = []
+ # ask for in non-topological order
+ records = list(test.get_record_stream(
+ [key, key_basis, key_missing, key_basis_2], 'topological', False))
+ self.assertEqual(4, len(records))
+ results = []
+ for record in records:
+ self.assertSubset([record.key],
+ (key_basis, key_missing, key_basis_2, key))
+ if record.key == key_missing:
+ self.assertIsInstance(record, AbsentContentFactory)
+ else:
+ results.append((record.key, record.sha1, record.storage_kind,
+ record.get_bytes_as(record.storage_kind)))
+ calls = list(basis.calls)
+ order = [record[0] for record in results]
+ self.assertEqual([key_basis_2, key_basis, key], order)
+ for result in results:
+ if result[0] == key:
+ source = test
+ else:
+ source = basis
+ record = source.get_record_stream([result[0]], 'unordered',
+ False).next()
+ self.assertEqual(record.key, result[0])
+ self.assertEqual(record.sha1, result[1])
+ self.assertEqual(record.storage_kind, result[2])
+ self.assertEqual(record.get_bytes_as(record.storage_kind), result[3])
+ # Its not strictly minimal, but it seems reasonable for now for it to
+ # ask which fallbacks have which parents.
+ self.assertEqual([
+ ("get_parent_map", set([key_basis, key_basis_2, key_missing])),
+ ("get_record_stream", [key_basis_2, key_basis], 'topological', False)],
+ calls)
def test_get_sha1s(self):
# sha1's in the test knit are answered without asking the basis
More information about the bazaar-commits
mailing list