Rev 3918: Create a GC => GC fetch code path. in http://bzr.arbash-meinel.com/branches/bzr/brisbane/pack_stream_source

John Arbash Meinel john at arbash-meinel.com
Tue Mar 31 21:33:18 BST 2009


At http://bzr.arbash-meinel.com/branches/bzr/brisbane/pack_stream_source

------------------------------------------------------------
revno: 3918
revision-id: john at arbash-meinel.com-20090331203315-amx3mtucm00rzzbr
parent: john at arbash-meinel.com-20090331170410-sxpic3huylfctxu2
committer: John Arbash Meinel <john at arbash-meinel.com>
branch nick: pack_stream_source
timestamp: Tue 2009-03-31 15:33:15 -0500
message:
  Create a GC => GC fetch code path.
  
  I would have liked this to be generic pack => pack, but generalizing it sucks,
  and this seems to be good for now.
-------------- next part --------------
=== modified file 'bzrlib/repofmt/groupcompress_repo.py'
--- a/bzrlib/repofmt/groupcompress_repo.py	2009-03-31 16:20:47 +0000
+++ b/bzrlib/repofmt/groupcompress_repo.py	2009-03-31 20:33:15 +0000
@@ -222,7 +222,7 @@
             # don't need these sets anymore
             id_roots_set.clear()
             p_id_roots_set.clear()
-        return _filtered_inv_stream()
+        return 'inventories', _filtered_inv_stream()
 
     def _get_chk_streams(self, source_vf, keys, pb=None):
         # We want to stream the keys from 'id_roots', and things they
@@ -582,82 +582,6 @@
             self._obsolete_packs(packs)
 
 
-# XXX: This format is scheduled for termination
-#
-# class GCPackRepository(KnitPackRepository):
-#     """GC customisation of KnitPackRepository."""
-#
-#     def __init__(self, _format, a_bzrdir, control_files, _commit_builder_class,
-#         _serializer):
-#         """Overridden to change pack collection class."""
-#         KnitPackRepository.__init__(self, _format, a_bzrdir, control_files,
-#             _commit_builder_class, _serializer)
-#         # and now replace everything it did :)
-#         index_transport = self._transport.clone('indices')
-#         self._pack_collection = GCRepositoryPackCollection(self,
-#             self._transport, index_transport,
-#             self._transport.clone('upload'),
-#             self._transport.clone('packs'),
-#             _format.index_builder_class,
-#             _format.index_class,
-#             use_chk_index=self._format.supports_chks,
-#             )
-#         self.inventories = GroupCompressVersionedFiles(
-#             _GCGraphIndex(self._pack_collection.inventory_index.combined_index,
-#                 add_callback=self._pack_collection.inventory_index.add_callback,
-#                 parents=True, is_locked=self.is_locked),
-#             access=self._pack_collection.inventory_index.data_access)
-#         self.revisions = GroupCompressVersionedFiles(
-#             _GCGraphIndex(self._pack_collection.revision_index.combined_index,
-#                 add_callback=self._pack_collection.revision_index.add_callback,
-#                 parents=True, is_locked=self.is_locked),
-#             access=self._pack_collection.revision_index.data_access,
-#             delta=False)
-#         self.signatures = GroupCompressVersionedFiles(
-#             _GCGraphIndex(self._pack_collection.signature_index.combined_index,
-#                 add_callback=self._pack_collection.signature_index.add_callback,
-#                 parents=False, is_locked=self.is_locked),
-#             access=self._pack_collection.signature_index.data_access,
-#             delta=False)
-#         self.texts = GroupCompressVersionedFiles(
-#             _GCGraphIndex(self._pack_collection.text_index.combined_index,
-#                 add_callback=self._pack_collection.text_index.add_callback,
-#                 parents=True, is_locked=self.is_locked),
-#             access=self._pack_collection.text_index.data_access)
-#         if _format.supports_chks:
-#             # No graph, no compression:- references from chks are between
-#             # different objects not temporal versions of the same; and without
-#             # some sort of temporal structure knit compression will just fail.
-#             self.chk_bytes = GroupCompressVersionedFiles(
-#                 _GCGraphIndex(self._pack_collection.chk_index.combined_index,
-#                     add_callback=self._pack_collection.chk_index.add_callback,
-#                     parents=False, is_locked=self.is_locked),
-#                 access=self._pack_collection.chk_index.data_access)
-#         else:
-#             self.chk_bytes = None
-#         # True when the repository object is 'write locked' (as opposed to the
-#         # physical lock only taken out around changes to the pack-names list.)
-#         # Another way to represent this would be a decorator around the control
-#         # files object that presents logical locks as physical ones - if this
-#         # gets ugly consider that alternative design. RBC 20071011
-#         self._write_lock_count = 0
-#         self._transaction = None
-#         # for tests
-#         self._reconcile_does_inventory_gc = True
-#         self._reconcile_fixes_text_parents = True
-#         self._reconcile_backsup_inventory = False
-#
-#     def suspend_write_group(self):
-#         raise errors.UnsuspendableWriteGroup(self)
-#
-#     def _resume_write_group(self, tokens):
-#         raise errors.UnsuspendableWriteGroup(self)
-#
-#     def _reconcile_pack(self, collection, packs, extension, revs, pb):
-#         bork
-#         return packer.pack(pb)
-
-
 class GCCHKPackRepository(CHKInventoryRepository):
     """GC customisation of CHKInventoryRepository."""
 
@@ -716,6 +640,14 @@
         self._reconcile_fixes_text_parents = True
         self._reconcile_backsup_inventory = False
 
+    def _get_source(self, to_format):
+        """Return a source for streaming from this repository."""
+        if to_format.__class__ is self._format.__class__:
+            # We must be exactly the same format, otherwise stuff like the chk
+            # page layout might be different
+            return GroupCHKStreamSource(self, to_format)
+        return StreamSource(self, to_format)
+
     def suspend_write_group(self):
         raise errors.UnsuspendableWriteGroup(self)
 
@@ -851,3 +783,106 @@
         if not getattr(target_format, 'supports_tree_reference', False):
             raise errors.BadConversionTarget(
                 'Does not support nested trees', target_format)
+
+
+class GroupCHKStreamSource(repository.StreamSource):
+    """Used when both the source and target repo are GroupCHK repos."""
+
+    def __init__(self, from_repository, to_format):
+        """Create a StreamSource streaming from from_repository."""
+        super(GroupCHKStreamSource, self).__init__(from_repository, to_format)
+        self._revision_keys = None
+        self._text_keys = None
+        # These are the 'exclusion' set
+        self._chk_id_roots_no_fetch = None
+        self._chk_p_id_roots_no_fetch = None
+        self._chk_id_roots = None
+        self._chk_p_id_roots = None
+
+    def _get_filtered_inv_stream(self):
+        """Get a stream of inventory texts.
+
+        When this function returns, self._chk_id_roots and self._chk_p_id_roots
+        should be populated.
+        """
+        self._chk_id_roots = []
+        self._chk_p_id_roots = []
+        def _filtered_inv_stream():
+            id_roots_set = set()
+            p_id_roots_set = set()
+            source_vf = self.from_repository.inventories
+            stream = source_vf.get_record_stream(self._revision_keys,
+                                                 'groupcompress', True)
+            for record in stream:
+                bytes = record.get_bytes_as('fulltext')
+                chk_inv = inventory.CHKInventory.deserialise(None, bytes,
+                                                             record.key)
+                key = chk_inv.id_to_entry.key()
+                if key not in id_roots_set:
+                    self._chk_id_roots.append(key)
+                    id_roots_set.add(key)
+                p_id_map = chk_inv.parent_id_basename_to_file_id
+                if p_id_map is None:
+                    raise AssertionError('Parent id -> file_id map not set')
+                key = p_id_map.key()
+                if key not in p_id_roots_set:
+                    p_id_roots_set.add(key)
+                    self._chk_p_id_roots.append(key)
+                yield record
+            # We have finished processing all of the inventory records, we
+            # don't need these sets anymore
+            id_roots_set.clear()
+            p_id_roots_set.clear()
+        return ('inventories', _filtered_inv_stream())
+
+    def _get_filtered_chk_streams(self, excluded_keys):
+        import pdb; pdb.set_trace()
+        self._text_keys = set()
+        excluded_keys.discard(_mod_revision.NULL_REVISION)
+        if not excluded_keys:
+            uninteresting_root_keys = set()
+            uninteresting_pid_root_keys = set()
+        else:
+            uninteresting_root_keys = set()
+            uninteresting_pid_root_keys = set()
+            for inv in self.from_repository.iter_inventories(excluded_keys):
+                uninteresting_root_keys.add(inv.id_to_entry.key())
+                uninteresting_pid_root_keys.add(
+                    inv.parent_id_basename_to_file_id.key())
+        bytes_to_info = inventory.CHKInventory._bytes_to_utf8name_key
+        chk_bytes = self.from_repository.chk_bytes
+        def _filter_id_to_entry():
+            for record, items in chk_map.iter_interesting_nodes(chk_bytes,
+                        self._chk_id_roots, uninteresting_root_keys):
+                for name, bytes in items:
+                    # Note: we don't care about name_utf8, because we are always
+                    # rich-root = True
+                    _, file_id, revision_id = bytes_to_info(bytes)
+                    self._text_keys.add((file_id, revision_id))
+                if record is not None:
+                    yield record
+        yield 'chk_bytes', _filter_id_to_entry()
+        def _get_parent_id_basename_to_file_id_pages():
+            for record, items in chk_map.iter_interesting_nodes(chk_bytes,
+                        self._chk_p_id_roots, uninteresting_pid_root_keys):
+                if record is not None:
+                    yield record
+        yield 'chk_bytes', _get_parent_id_basename_to_file_id_pages()
+
+    def _get_text_stream(self):
+        # Note: We know we don't have to handle adding root keys, because both
+        # the source and target are GCCHK, and those always support rich-roots
+        return ('texts', self.from_repository.texts.get_record_stream(
+                            self._text_keys, 'groupcompress', False))
+
+    def get_stream(self, search):
+        revision_ids = search.get_keys()
+        for stream_info in self._fetch_revision_texts(revision_ids):
+            yield stream_info
+        self._revision_keys = [(rev_id,) for rev_id in revision_ids]
+        yield self._get_filtered_inv_stream()
+        # Can we get this from the search?
+        _, _, exclude_keys, _ = search.get_recipe()
+        for stream_info in self._get_filtered_chk_streams(exclude_keys):
+            yield stream_info
+        yield self._get_text_stream()

=== modified file 'bzrlib/tests/test_fetch.py'
--- a/bzrlib/tests/test_fetch.py	2009-03-23 14:59:43 +0000
+++ b/bzrlib/tests/test_fetch.py	2009-03-31 20:33:15 +0000
@@ -602,3 +602,30 @@
         self.make_two_commits(change_root=False, fetch_twice=True)
         self.assertEqual((('TREE_ROOT', 'first-id'),),
             self.get_parents('TREE_ROOT', 'second-id'))
+
+
+class TestGCCHKToGCCHKFetch(TestCaseWithTransport):
+
+    def test_simple_fetch(self):
+        builder = self.make_branch_builder('source', format='gc-chk255-big')
+        builder.start_series()
+        try:
+            builder.build_snapshot('A', None, [
+                ('add', ('', 'TREE_ROOT', 'directory', None))])
+            builder.build_snapshot('B', ['A'], [
+                ('add', ('foo', 'foo-id', 'file', 'foo content\n'))])
+            builder.build_snapshot('C', ['B'], [
+                ('add', ('bar', 'bar-id', 'file', 'bar content\n'))])
+        finally:
+            builder.finish_series()
+        source_branch = builder.get_branch()
+
+        target_branch = self.make_branch('target', format='gc-chk255-big')
+        target_branch.lock_write()
+        self.addCleanup(target_branch.unlock)
+        target = target_branch.repository
+        target.fetch(source_branch.repository, 'B')
+        self.assertEqual({'B': ('A',)}, target.get_parent_map(['B', 'C']))
+        target.fetch(source_branch.repository, 'C')
+        self.assertEqual({'B': ('A',), 'C': ('B',)},
+                         target.get_parent_map(['B', 'C']))



More information about the bazaar-commits mailing list