Rev 3918: Create a GC => GC fetch code path. in http://bzr.arbash-meinel.com/branches/bzr/brisbane/pack_stream_source
John Arbash Meinel
john at arbash-meinel.com
Tue Mar 31 21:33:18 BST 2009
At http://bzr.arbash-meinel.com/branches/bzr/brisbane/pack_stream_source
------------------------------------------------------------
revno: 3918
revision-id: john at arbash-meinel.com-20090331203315-amx3mtucm00rzzbr
parent: john at arbash-meinel.com-20090331170410-sxpic3huylfctxu2
committer: John Arbash Meinel <john at arbash-meinel.com>
branch nick: pack_stream_source
timestamp: Tue 2009-03-31 15:33:15 -0500
message:
Create a GC => GC fetch code path.
I would have liked this to be generic pack => pack, but generalizing it sucks,
and this seems to be good for now.
-------------- next part --------------
=== modified file 'bzrlib/repofmt/groupcompress_repo.py'
--- a/bzrlib/repofmt/groupcompress_repo.py 2009-03-31 16:20:47 +0000
+++ b/bzrlib/repofmt/groupcompress_repo.py 2009-03-31 20:33:15 +0000
@@ -222,7 +222,7 @@
# don't need these sets anymore
id_roots_set.clear()
p_id_roots_set.clear()
- return _filtered_inv_stream()
+ return 'inventories', _filtered_inv_stream()
def _get_chk_streams(self, source_vf, keys, pb=None):
# We want to stream the keys from 'id_roots', and things they
@@ -582,82 +582,6 @@
self._obsolete_packs(packs)
-# XXX: This format is scheduled for termination
-#
-# class GCPackRepository(KnitPackRepository):
-# """GC customisation of KnitPackRepository."""
-#
-# def __init__(self, _format, a_bzrdir, control_files, _commit_builder_class,
-# _serializer):
-# """Overridden to change pack collection class."""
-# KnitPackRepository.__init__(self, _format, a_bzrdir, control_files,
-# _commit_builder_class, _serializer)
-# # and now replace everything it did :)
-# index_transport = self._transport.clone('indices')
-# self._pack_collection = GCRepositoryPackCollection(self,
-# self._transport, index_transport,
-# self._transport.clone('upload'),
-# self._transport.clone('packs'),
-# _format.index_builder_class,
-# _format.index_class,
-# use_chk_index=self._format.supports_chks,
-# )
-# self.inventories = GroupCompressVersionedFiles(
-# _GCGraphIndex(self._pack_collection.inventory_index.combined_index,
-# add_callback=self._pack_collection.inventory_index.add_callback,
-# parents=True, is_locked=self.is_locked),
-# access=self._pack_collection.inventory_index.data_access)
-# self.revisions = GroupCompressVersionedFiles(
-# _GCGraphIndex(self._pack_collection.revision_index.combined_index,
-# add_callback=self._pack_collection.revision_index.add_callback,
-# parents=True, is_locked=self.is_locked),
-# access=self._pack_collection.revision_index.data_access,
-# delta=False)
-# self.signatures = GroupCompressVersionedFiles(
-# _GCGraphIndex(self._pack_collection.signature_index.combined_index,
-# add_callback=self._pack_collection.signature_index.add_callback,
-# parents=False, is_locked=self.is_locked),
-# access=self._pack_collection.signature_index.data_access,
-# delta=False)
-# self.texts = GroupCompressVersionedFiles(
-# _GCGraphIndex(self._pack_collection.text_index.combined_index,
-# add_callback=self._pack_collection.text_index.add_callback,
-# parents=True, is_locked=self.is_locked),
-# access=self._pack_collection.text_index.data_access)
-# if _format.supports_chks:
-# # No graph, no compression:- references from chks are between
-# # different objects not temporal versions of the same; and without
-# # some sort of temporal structure knit compression will just fail.
-# self.chk_bytes = GroupCompressVersionedFiles(
-# _GCGraphIndex(self._pack_collection.chk_index.combined_index,
-# add_callback=self._pack_collection.chk_index.add_callback,
-# parents=False, is_locked=self.is_locked),
-# access=self._pack_collection.chk_index.data_access)
-# else:
-# self.chk_bytes = None
-# # True when the repository object is 'write locked' (as opposed to the
-# # physical lock only taken out around changes to the pack-names list.)
-# # Another way to represent this would be a decorator around the control
-# # files object that presents logical locks as physical ones - if this
-# # gets ugly consider that alternative design. RBC 20071011
-# self._write_lock_count = 0
-# self._transaction = None
-# # for tests
-# self._reconcile_does_inventory_gc = True
-# self._reconcile_fixes_text_parents = True
-# self._reconcile_backsup_inventory = False
-#
-# def suspend_write_group(self):
-# raise errors.UnsuspendableWriteGroup(self)
-#
-# def _resume_write_group(self, tokens):
-# raise errors.UnsuspendableWriteGroup(self)
-#
-# def _reconcile_pack(self, collection, packs, extension, revs, pb):
-# bork
-# return packer.pack(pb)
-
-
class GCCHKPackRepository(CHKInventoryRepository):
"""GC customisation of CHKInventoryRepository."""
@@ -716,6 +640,14 @@
self._reconcile_fixes_text_parents = True
self._reconcile_backsup_inventory = False
+ def _get_source(self, to_format):
+ """Return a source for streaming from this repository."""
+ if to_format.__class__ is self._format.__class__:
+ # We must be exactly the same format, otherwise stuff like the chk
+ # page layout might be different
+ return GroupCHKStreamSource(self, to_format)
+ return StreamSource(self, to_format)
+
def suspend_write_group(self):
raise errors.UnsuspendableWriteGroup(self)
@@ -851,3 +783,106 @@
if not getattr(target_format, 'supports_tree_reference', False):
raise errors.BadConversionTarget(
'Does not support nested trees', target_format)
+
+
+class GroupCHKStreamSource(repository.StreamSource):
+ """Used when both the source and target repo are GroupCHK repos."""
+
+ def __init__(self, from_repository, to_format):
+ """Create a StreamSource streaming from from_repository."""
+ super(GroupCHKStreamSource, self).__init__(from_repository, to_format)
+ self._revision_keys = None
+ self._text_keys = None
+ # These are the 'exclusion' set
+ self._chk_id_roots_no_fetch = None
+ self._chk_p_id_roots_no_fetch = None
+ self._chk_id_roots = None
+ self._chk_p_id_roots = None
+
+ def _get_filtered_inv_stream(self):
+ """Get a stream of inventory texts.
+
+ When this function returns, self._chk_id_roots and self._chk_p_id_roots
+ should be populated.
+ """
+ self._chk_id_roots = []
+ self._chk_p_id_roots = []
+ def _filtered_inv_stream():
+ id_roots_set = set()
+ p_id_roots_set = set()
+ source_vf = self.from_repository.inventories
+ stream = source_vf.get_record_stream(self._revision_keys,
+ 'groupcompress', True)
+ for record in stream:
+ bytes = record.get_bytes_as('fulltext')
+ chk_inv = inventory.CHKInventory.deserialise(None, bytes,
+ record.key)
+ key = chk_inv.id_to_entry.key()
+ if key not in id_roots_set:
+ self._chk_id_roots.append(key)
+ id_roots_set.add(key)
+ p_id_map = chk_inv.parent_id_basename_to_file_id
+ if p_id_map is None:
+ raise AssertionError('Parent id -> file_id map not set')
+ key = p_id_map.key()
+ if key not in p_id_roots_set:
+ p_id_roots_set.add(key)
+ self._chk_p_id_roots.append(key)
+ yield record
+ # We have finished processing all of the inventory records, we
+ # don't need these sets anymore
+ id_roots_set.clear()
+ p_id_roots_set.clear()
+ return ('inventories', _filtered_inv_stream())
+
+ def _get_filtered_chk_streams(self, excluded_keys):
+ import pdb; pdb.set_trace()
+ self._text_keys = set()
+ excluded_keys.discard(_mod_revision.NULL_REVISION)
+ if not excluded_keys:
+ uninteresting_root_keys = set()
+ uninteresting_pid_root_keys = set()
+ else:
+ uninteresting_root_keys = set()
+ uninteresting_pid_root_keys = set()
+ for inv in self.from_repository.iter_inventories(excluded_keys):
+ uninteresting_root_keys.add(inv.id_to_entry.key())
+ uninteresting_pid_root_keys.add(
+ inv.parent_id_basename_to_file_id.key())
+ bytes_to_info = inventory.CHKInventory._bytes_to_utf8name_key
+ chk_bytes = self.from_repository.chk_bytes
+ def _filter_id_to_entry():
+ for record, items in chk_map.iter_interesting_nodes(chk_bytes,
+ self._chk_id_roots, uninteresting_root_keys):
+ for name, bytes in items:
+ # Note: we don't care about name_utf8, because we are always
+ # rich-root = True
+ _, file_id, revision_id = bytes_to_info(bytes)
+ self._text_keys.add((file_id, revision_id))
+ if record is not None:
+ yield record
+ yield 'chk_bytes', _filter_id_to_entry()
+ def _get_parent_id_basename_to_file_id_pages():
+ for record, items in chk_map.iter_interesting_nodes(chk_bytes,
+ self._chk_p_id_roots, uninteresting_pid_root_keys):
+ if record is not None:
+ yield record
+ yield 'chk_bytes', _get_parent_id_basename_to_file_id_pages()
+
+ def _get_text_stream(self):
+ # Note: We know we don't have to handle adding root keys, because both
+ # the source and target are GCCHK, and those always support rich-roots
+ return ('texts', self.from_repository.texts.get_record_stream(
+ self._text_keys, 'groupcompress', False))
+
+ def get_stream(self, search):
+ revision_ids = search.get_keys()
+ for stream_info in self._fetch_revision_texts(revision_ids):
+ yield stream_info
+ self._revision_keys = [(rev_id,) for rev_id in revision_ids]
+ yield self._get_filtered_inv_stream()
+ # Can we get this from the search?
+ _, _, exclude_keys, _ = search.get_recipe()
+ for stream_info in self._get_filtered_chk_streams(exclude_keys):
+ yield stream_info
+ yield self._get_text_stream()
=== modified file 'bzrlib/tests/test_fetch.py'
--- a/bzrlib/tests/test_fetch.py 2009-03-23 14:59:43 +0000
+++ b/bzrlib/tests/test_fetch.py 2009-03-31 20:33:15 +0000
@@ -602,3 +602,30 @@
self.make_two_commits(change_root=False, fetch_twice=True)
self.assertEqual((('TREE_ROOT', 'first-id'),),
self.get_parents('TREE_ROOT', 'second-id'))
+
+
+class TestGCCHKToGCCHKFetch(TestCaseWithTransport):
+
+ def test_simple_fetch(self):
+ builder = self.make_branch_builder('source', format='gc-chk255-big')
+ builder.start_series()
+ try:
+ builder.build_snapshot('A', None, [
+ ('add', ('', 'TREE_ROOT', 'directory', None))])
+ builder.build_snapshot('B', ['A'], [
+ ('add', ('foo', 'foo-id', 'file', 'foo content\n'))])
+ builder.build_snapshot('C', ['B'], [
+ ('add', ('bar', 'bar-id', 'file', 'bar content\n'))])
+ finally:
+ builder.finish_series()
+ source_branch = builder.get_branch()
+
+ target_branch = self.make_branch('target', format='gc-chk255-big')
+ target_branch.lock_write()
+ self.addCleanup(target_branch.unlock)
+ target = target_branch.repository
+ target.fetch(source_branch.repository, 'B')
+ self.assertEqual({'B': ('A',)}, target.get_parent_map(['B', 'C']))
+ target.fetch(source_branch.repository, 'C')
+ self.assertEqual({'B': ('A',), 'C': ('B',)},
+ target.get_parent_map(['B', 'C']))
More information about the bazaar-commits
mailing list