Rev 3887: Bring in the CHK inter-differing-serializer fetch code. in http://bzr.arbash-meinel.com/branches/bzr/1.11/differ_serializer

John Arbash Meinel john at arbash-meinel.com
Fri Dec 5 22:19:29 GMT 2008


At http://bzr.arbash-meinel.com/branches/bzr/1.11/differ_serializer

------------------------------------------------------------
revno: 3887
revision-id: john at arbash-meinel.com-20081205221928-kzstz04ngqrxpb12
parent: john at arbash-meinel.com-20081205221847-hs9mh3yuinxq7w29
committer: John Arbash Meinel <john at arbash-meinel.com>
branch nick: differ_serializer
timestamp: Fri 2008-12-05 16:19:28 -0600
message:
  Bring in the CHK inter-differing-serializer fetch code.
  
  Refactor it into several helper functions which makes the flow a bit
  clearer and reduces the indentation.
-------------- next part --------------
=== modified file 'bzrlib/repository.py'
--- a/bzrlib/repository.py	2008-12-05 17:25:01 +0000
+++ b/bzrlib/repository.py	2008-12-05 22:19:28 +0000
@@ -3179,40 +3179,129 @@
             return False
         return True
 
+    def _fetch_batch(self, revision_ids, basis_id, basis_tree):
+        """Fetch across a few revisions.
+
+        :param revision_ids: The revisions to copy
+        :param basis_id: The revision_id of basis_tree
+        :param basis_tree: A tree that is not in revision_ids which should
+            already exist in the target.
+        :return: (basis_id, basis_tree) A new basis to use now that these trees
+            have been copied.
+        """
+        # Walk though all revisions; get inventory deltas, copy referenced
+        # texts that delta references, insert the delta, revision and
+        # signature.
+        text_keys = set()
+        pending_deltas = []
+        pending_revisions = []
+        for tree in self.source.revision_trees(revision_ids):
+            current_revision_id = tree.get_revision_id()
+            delta = tree.inventory._make_delta(basis_tree.inventory)
+            for old_path, new_path, file_id, entry in delta:
+                if new_path is not None:
+                    if not (new_path or self.target.supports_rich_root()):
+                        # We leave the inventory delta in, because that
+                        # will have the deserialised inventory root
+                        # pointer.
+                        continue
+                    # TODO: Do we need:
+                    #       "if entry.revision == current_revision_id" ?
+                    text_keys.add((file_id, entry.revision))
+            revision = self.source.get_revision(current_revision_id)
+            pending_deltas.append((basis_id, delta,
+                current_revision_id, revision.parent_ids))
+            pending_revisions.append(revision)
+            basis_id = current_revision_id
+            basis_tree = tree
+        # Copy file texts
+        from_texts = self.source.texts
+        to_texts = self.target.texts
+        to_texts.insert_record_stream(from_texts.get_record_stream(
+            text_keys, self.target._fetch_order,
+            not self.target._fetch_uses_deltas))
+        # insert deltas
+        for delta in pending_deltas:
+            self.target.add_inventory_by_delta(*delta)
+        # insert signatures and revisions
+        for revision in pending_revisions:
+            try:
+                signature = self.source.get_signature_text(
+                    revision.revision_id)
+                self.target.add_signature_text(revision.revision_id,
+                    signature)
+            except errors.NoSuchRevision:
+                pass
+            self.target.add_revision(revision.revision_id, revision)
+        return basis_id, basis_tree
+
+    def _fetch_all_revisions(self, revision_ids, pb):
+        """Fetch everything for the list of revisions.
+
+        :param revision_ids: The list of revisions to fetch. Must be in
+            topological order.
+        :param pb: A ProgressBar
+        :return: None
+        """
+        basis_id, basis_tree = self._get_basis(revision_ids[0])
+        batch_size = 100
+        for offset in range(0, len(revision_ids), batch_size):
+            self.target.start_write_group()
+            try:
+                pb.update('Transferring revisions', offset,
+                    len(revision_ids))
+                batch = revision_ids[offset:offset+batch_size]
+                basis_id, basis_tree = self._fetch_batch(batch,
+                    basis_id, basis_tree)
+            except:
+                self.target.abort_write_group()
+                raise
+            else:
+                self.target.commit_write_group()
+
     @needs_write_lock
     def fetch(self, revision_id=None, pb=None, find_ghosts=False):
         """See InterRepository.fetch()."""
         revision_ids = self.target.search_missing_revision_ids(self.source,
             revision_id, find_ghosts=find_ghosts).get_keys()
+        if not revision_ids:
+            return 0, 0
         revision_ids = tsort.topo_sort(
             self.source.get_graph().get_parent_map(revision_ids))
-        def revisions_iterator():
-            rev_ids = list(revision_ids)
-            for offset in xrange(0, len(rev_ids), 100):
-                current_revids = rev_ids[offset:offset+100]
-                revisions = self.source.get_revisions(current_revids)
-                trees = self.source.revision_trees(current_revids)
-                keys = [(r,) for r in current_revids]
-                sig_stream = self.source.signatures.get_record_stream(
-                    keys, 'unordered', True)
-                sigs = {}
-                for record in versionedfile.filter_absent(sig_stream):
-                    sigs[record.key[0]] = record.get_bytes_as('fulltext')
-                for rev, tree in zip(revisions, trees):
-                    yield rev, tree, sigs.get(rev.revision_id, None)
         if pb is None:
             my_pb = ui.ui_factory.nested_progress_bar()
             pb = my_pb
         else:
             my_pb = None
         try:
-            install_revisions(self.target, revisions_iterator(),
-                              len(revision_ids), pb)
+            self._fetch_all_revisions(revision_ids, pb)
         finally:
             if my_pb is not None:
                 my_pb.finished()
         return len(revision_ids), 0
 
+    def _get_basis(self, first_revision_id):
+        """Get a revision and tree which exists in the target.
+
+        This assumes that first_revision_id is selected for transmission
+        because all other ancestors are already present. If we can't find an
+        ancestor we fall back to NULL_REVISION since we know that is safe.
+
+        :return: (basis_id, basis_tree)
+        """
+        first_rev = self.source.get_revision(first_revision_id)
+        try:
+            basis_id = first_rev.parent_ids[0]
+            # only valid as a basis if the target has it
+            self.target.get_revision(basis_id)
+            # Try to get a basis tree - if its a ghost it will hit the
+            # NoSuchRevision case.
+            basis_tree = self.source.revision_tree(basis_id)
+        except (IndexError, errors.NoSuchRevision):
+            basis_id = _mod_revision.NULL_REVISION
+            basis_tree = self.source.revision_tree(basis_id)
+        return basis_id, basis_tree
+
 
 class InterOtherToRemote(InterRepository):
     """An InterRepository that simply delegates to the 'real' InterRepository



More information about the bazaar-commits mailing list