Rev 3887: Bring in the CHK inter-differing-serializer fetch code. in http://bzr.arbash-meinel.com/branches/bzr/1.11/differ_serializer
John Arbash Meinel
john at arbash-meinel.com
Fri Dec 5 22:19:29 GMT 2008
At http://bzr.arbash-meinel.com/branches/bzr/1.11/differ_serializer
------------------------------------------------------------
revno: 3887
revision-id: john at arbash-meinel.com-20081205221928-kzstz04ngqrxpb12
parent: john at arbash-meinel.com-20081205221847-hs9mh3yuinxq7w29
committer: John Arbash Meinel <john at arbash-meinel.com>
branch nick: differ_serializer
timestamp: Fri 2008-12-05 16:19:28 -0600
message:
Bring in the CHK inter-differing-serializer fetch code.
Refactor it into several helper functions which makes the flow a bit
clearer and reduces the indentation.
-------------- next part --------------
=== modified file 'bzrlib/repository.py'
--- a/bzrlib/repository.py 2008-12-05 17:25:01 +0000
+++ b/bzrlib/repository.py 2008-12-05 22:19:28 +0000
@@ -3179,40 +3179,129 @@
return False
return True
+ def _fetch_batch(self, revision_ids, basis_id, basis_tree):
+ """Fetch across a few revisions.
+
+ :param revision_ids: The revisions to copy
+ :param basis_id: The revision_id of basis_tree
+ :param basis_tree: A tree that is not in revision_ids which should
+ already exist in the target.
+ :return: (basis_id, basis_tree) A new basis to use now that these trees
+ have been copied.
+ """
+ # Walk though all revisions; get inventory deltas, copy referenced
+ # texts that delta references, insert the delta, revision and
+ # signature.
+ text_keys = set()
+ pending_deltas = []
+ pending_revisions = []
+ for tree in self.source.revision_trees(revision_ids):
+ current_revision_id = tree.get_revision_id()
+ delta = tree.inventory._make_delta(basis_tree.inventory)
+ for old_path, new_path, file_id, entry in delta:
+ if new_path is not None:
+ if not (new_path or self.target.supports_rich_root()):
+ # We leave the inventory delta in, because that
+ # will have the deserialised inventory root
+ # pointer.
+ continue
+ # TODO: Do we need:
+ # "if entry.revision == current_revision_id" ?
+ text_keys.add((file_id, entry.revision))
+ revision = self.source.get_revision(current_revision_id)
+ pending_deltas.append((basis_id, delta,
+ current_revision_id, revision.parent_ids))
+ pending_revisions.append(revision)
+ basis_id = current_revision_id
+ basis_tree = tree
+ # Copy file texts
+ from_texts = self.source.texts
+ to_texts = self.target.texts
+ to_texts.insert_record_stream(from_texts.get_record_stream(
+ text_keys, self.target._fetch_order,
+ not self.target._fetch_uses_deltas))
+ # insert deltas
+ for delta in pending_deltas:
+ self.target.add_inventory_by_delta(*delta)
+ # insert signatures and revisions
+ for revision in pending_revisions:
+ try:
+ signature = self.source.get_signature_text(
+ revision.revision_id)
+ self.target.add_signature_text(revision.revision_id,
+ signature)
+ except errors.NoSuchRevision:
+ pass
+ self.target.add_revision(revision.revision_id, revision)
+ return basis_id, basis_tree
+
+ def _fetch_all_revisions(self, revision_ids, pb):
+ """Fetch everything for the list of revisions.
+
+ :param revision_ids: The list of revisions to fetch. Must be in
+ topological order.
+ :param pb: A ProgressBar
+ :return: None
+ """
+ basis_id, basis_tree = self._get_basis(revision_ids[0])
+ batch_size = 100
+ for offset in range(0, len(revision_ids), batch_size):
+ self.target.start_write_group()
+ try:
+ pb.update('Transferring revisions', offset,
+ len(revision_ids))
+ batch = revision_ids[offset:offset+batch_size]
+ basis_id, basis_tree = self._fetch_batch(batch,
+ basis_id, basis_tree)
+ except:
+ self.target.abort_write_group()
+ raise
+ else:
+ self.target.commit_write_group()
+
@needs_write_lock
def fetch(self, revision_id=None, pb=None, find_ghosts=False):
"""See InterRepository.fetch()."""
revision_ids = self.target.search_missing_revision_ids(self.source,
revision_id, find_ghosts=find_ghosts).get_keys()
+ if not revision_ids:
+ return 0, 0
revision_ids = tsort.topo_sort(
self.source.get_graph().get_parent_map(revision_ids))
- def revisions_iterator():
- rev_ids = list(revision_ids)
- for offset in xrange(0, len(rev_ids), 100):
- current_revids = rev_ids[offset:offset+100]
- revisions = self.source.get_revisions(current_revids)
- trees = self.source.revision_trees(current_revids)
- keys = [(r,) for r in current_revids]
- sig_stream = self.source.signatures.get_record_stream(
- keys, 'unordered', True)
- sigs = {}
- for record in versionedfile.filter_absent(sig_stream):
- sigs[record.key[0]] = record.get_bytes_as('fulltext')
- for rev, tree in zip(revisions, trees):
- yield rev, tree, sigs.get(rev.revision_id, None)
if pb is None:
my_pb = ui.ui_factory.nested_progress_bar()
pb = my_pb
else:
my_pb = None
try:
- install_revisions(self.target, revisions_iterator(),
- len(revision_ids), pb)
+ self._fetch_all_revisions(revision_ids, pb)
finally:
if my_pb is not None:
my_pb.finished()
return len(revision_ids), 0
+ def _get_basis(self, first_revision_id):
+ """Get a revision and tree which exists in the target.
+
+ This assumes that first_revision_id is selected for transmission
+ because all other ancestors are already present. If we can't find an
+ ancestor we fall back to NULL_REVISION since we know that is safe.
+
+ :return: (basis_id, basis_tree)
+ """
+ first_rev = self.source.get_revision(first_revision_id)
+ try:
+ basis_id = first_rev.parent_ids[0]
+ # only valid as a basis if the target has it
+ self.target.get_revision(basis_id)
+ # Try to get a basis tree - if its a ghost it will hit the
+ # NoSuchRevision case.
+ basis_tree = self.source.revision_tree(basis_id)
+ except (IndexError, errors.NoSuchRevision):
+ basis_id = _mod_revision.NULL_REVISION
+ basis_tree = self.source.revision_tree(basis_id)
+ return basis_id, basis_tree
+
class InterOtherToRemote(InterRepository):
"""An InterRepository that simply delegates to the 'real' InterRepository
More information about the bazaar-commits
mailing list