Rev 4064: Streaming fetch from remote servers. in http://people.ubuntu.com/~robertc/baz2.0/branch.roundtrips
Robert Collins
robertc at robertcollins.net
Mon Mar 2 03:38:16 GMT 2009
At http://people.ubuntu.com/~robertc/baz2.0/branch.roundtrips
------------------------------------------------------------
revno: 4064
revision-id: robertc at robertcollins.net-20090302033807-af1b7awmfueyawts
parent: robertc at robertcollins.net-20090227130536-wsqoyhyt3n11nc8d
committer: Robert Collins <robertc at robertcollins.net>
branch nick: branch.roundtrips
timestamp: Mon 2009-03-02 14:38:07 +1100
message:
Streaming fetch from remote servers.
=== modified file 'NEWS'
--- a/NEWS 2009-02-27 02:44:10 +0000
+++ b/NEWS 2009-03-02 03:38:07 +0000
@@ -117,8 +117,9 @@
rather than the sometimes-absent disk label. (Robert Collins)
* ``bzrlib.fetch`` is now composed of a sender and a sink component
- allowing for decoupling over a network connection. Fetching into
- a RemoteRepository uses this to stream the operation.
+ allowing for decoupling over a network connection. Fetching from
+ or into a RemoteRepository with a 1.13 server will use this to
+ stream the operation.
(Andrew Bennetts, Robert Collins)
* ``bzrlib.tests.run_suite`` accepts a runner_class parameter
=== modified file 'bzrlib/fetch.py'
--- a/bzrlib/fetch.py 2009-02-27 13:05:36 +0000
+++ b/bzrlib/fetch.py 2009-03-02 03:38:07 +0000
@@ -40,7 +40,7 @@
from bzrlib.tsort import topo_sort
from bzrlib.trace import mutter
import bzrlib.ui
-from bzrlib.versionedfile import filter_absent, FulltextContentFactory
+from bzrlib.versionedfile import FulltextContentFactory
# TODO: Avoid repeatedly opening weaves so many times.
=== modified file 'bzrlib/pack.py'
--- a/bzrlib/pack.py 2009-01-17 01:30:58 +0000
+++ b/bzrlib/pack.py 2009-03-02 03:38:07 +0000
@@ -411,10 +411,15 @@
self._state_handler()
cur_buffer_length = len(self._buffer)
- def read_pending_records(self):
- records = self._parsed_records
- self._parsed_records = []
- return records
+ def read_pending_records(self, max=None):
+ if max:
+ records = self._parsed_records[:max]
+ del self._parsed_records[:max]
+ return records
+ else:
+ records = self._parsed_records
+ self._parsed_records = []
+ return records
def _consume_line(self):
"""Take a line out of the buffer, and return the line.
=== modified file 'bzrlib/remote.py'
--- a/bzrlib/remote.py 2009-02-27 13:05:36 +0000
+++ b/bzrlib/remote.py 2009-03-02 03:38:07 +0000
@@ -40,11 +40,10 @@
SmartProtocolError,
)
from bzrlib.lockable_files import LockableFiles
-from bzrlib.smart import client, vfs
+from bzrlib.smart import client, vfs, repository as smart_repo
from bzrlib.revision import ensure_null, NULL_REVISION
from bzrlib.trace import mutter, note, warning
from bzrlib.util import bencode
-from bzrlib.versionedfile import record_to_fulltext_bytes
class _RpcHelper(object):
@@ -1434,14 +1433,15 @@
# do not fallback when actually pushing the stream. A cleanup patch
# is going to look at rewinding/restarting the stream/partial
# buffering etc.
- byte_stream = self._stream_to_byte_stream([], src_format)
+ byte_stream = smart_repo._stream_to_byte_stream([], src_format)
try:
response = client.call_with_body_stream(
('Repository.insert_stream', path, ''), byte_stream)
except errors.UnknownSmartMethod:
medium._remember_remote_is_before((1,13))
return self._insert_real(stream, src_format, resume_tokens)
- byte_stream = self._stream_to_byte_stream(stream, src_format)
+ byte_stream = smart_repo._stream_to_byte_stream(
+ stream, src_format)
resume_tokens = ' '.join(resume_tokens)
response = client.call_with_body_stream(
('Repository.insert_stream', path, resume_tokens), byte_stream)
@@ -1459,42 +1459,45 @@
collection.reload_pack_names()
return [], set()
- def _stream_to_byte_stream(self, stream, src_format):
- bytes = []
- pack_writer = pack.ContainerWriter(bytes.append)
- pack_writer.begin()
- pack_writer.add_bytes_record(src_format.network_name(), '')
- adapters = {}
- def get_adapter(adapter_key):
- try:
- return adapters[adapter_key]
- except KeyError:
- adapter_factory = adapter_registry.get(adapter_key)
- adapter = adapter_factory(self)
- adapters[adapter_key] = adapter
- return adapter
- for substream_type, substream in stream:
- for record in substream:
- if record.storage_kind in ('chunked', 'fulltext'):
- serialised = record_to_fulltext_bytes(record)
- else:
- serialised = record.get_bytes_as(record.storage_kind)
- if serialised:
- # Some streams embed the whole stream into the wire
- # representation of the first record, which means that
- # later records have no wire representation: we skip them.
- pack_writer.add_bytes_record(serialised, [(substream_type,)])
- for b in bytes:
- yield b
- del bytes[:]
- pack_writer.end()
- for b in bytes:
- yield b
-
class RemoteStreamSource(repository.StreamSource):
"""Stream data from a remote server."""
+ def get_stream(self, search):
+ # streaming with fallback repositories is not well defined yet: The
+ # remote repository cannot see the fallback repositories, and thus
+ # cannot satisfy the entire search in the general case. Likewise the
+ # fallback repositories cannot reify the search to determine what they
+ # should send. It likely needs a return value in the stream listing the
+ # edge of the search to resume from in fallback repositories.
+ if self.from_repository._fallback_repositories:
+ return repository.StreamSource.get_stream(self, search)
+ repo = self.from_repository
+ client = repo._client
+ medium = client._medium
+ if medium._is_remote_before((1, 13)):
+ # No possible way this can work.
+ return repository.StreamSource.get_stream(self, search)
+ path = repo.bzrdir._path_for_remote_call(client)
+ try:
+ recipe = repo._serialise_search_recipe(search._recipe)
+ response = repo._call_with_body_bytes_expecting_body(
+ 'Repository.StreamSource.get_stream',
+ (path, self.to_format.network_name()), recipe)
+ response_tuple, response_handler = response
+ except errors.UnknownSmartMethod:
+ medium._remember_remote_is_before((1,13))
+ return repository.StreamSource.get_stream(self, search)
+ if response_tuple[0] != 'ok':
+ raise errors.UnexpectedSmartServerResponse(response_tuple)
+ byte_stream = response_handler.read_streamed_body()
+ src_format, stream = smart_repo._byte_stream_to_stream(byte_stream)
+ if src_format.network_name() != repo._format.network_name():
+ raise AssertionError(
+ "Mismatched RemoteRepository and stream src %r, %r" % (
+ src_format.network_name(), repo._format.network_name()))
+ return stream
+
class RemoteBranchLockableFiles(LockableFiles):
"""A 'LockableFiles' implementation that talks to a smart server.
=== modified file 'bzrlib/repository.py'
--- a/bzrlib/repository.py 2009-02-27 13:05:36 +0000
+++ b/bzrlib/repository.py 2009-03-02 03:38:07 +0000
@@ -3675,7 +3675,7 @@
from_sf = self.from_repository.signatures
# A missing signature is just skipped.
keys = [(rev_id,) for rev_id in revs]
- signatures = filter_absent(from_sf.get_record_stream(
+ signatures = versionedfile.filter_absent(from_sf.get_record_stream(
keys,
self.to_format._fetch_order,
not self.to_format._fetch_uses_deltas))
=== modified file 'bzrlib/smart/repository.py'
--- a/bzrlib/smart/repository.py 2009-02-25 00:31:09 +0000
+++ b/bzrlib/smart/repository.py 2009-03-02 03:38:07 +0000
@@ -39,7 +39,7 @@
from bzrlib.repository import _strip_NULL_ghosts, network_format_registry
from bzrlib import revision as _mod_revision
from bzrlib.util import bencode
-from bzrlib.versionedfile import NetworkRecordStream
+from bzrlib.versionedfile import NetworkRecordStream, record_to_fulltext_bytes
class SmartServerRepositoryRequest(SmartServerRequest):
@@ -333,6 +333,104 @@
return SuccessfulSmartServerResponse(('ok', token))
+class SmartServerRepositoryStreamSourceGetStream(SmartServerRepositoryRequest):
+
+ def do_repository_request(self, repository, to_network_name):
+ """Get a stream for inserting into a to_format repository.
+
+ :param repository: The repository to stream from.
+ :param to_network_name: The network name of the format of the target
+ repository.
+ """
+ self._to_format = network_format_registry.get(to_network_name)
+ return None # Signal that we want a body.
+
+ def do_body(self, body_bytes):
+ repository = self._repository
+ repository.lock_read()
+ try:
+ search, error = self.recreate_search(repository, body_bytes)
+ if error is not None:
+ repository.unlock()
+ return error
+ search = search.get_result()
+ source = repository._get_source(self._to_format)
+ stream = source.get_stream(search)
+ except Exception:
+ exc_info = sys.exc_info()
+ try:
+ # On non-error, unlocking is done by the body stream handler.
+ repository.unlock()
+ finally:
+ raise exc_info[0], exc_info[1], exc_info[2]
+ return SuccessfulSmartServerResponse(('ok',),
+ body_stream=self.body_stream(stream, repository))
+
+ def body_stream(self, stream, repository):
+ byte_stream = _stream_to_byte_stream(stream, repository._format)
+ try:
+ for bytes in byte_stream:
+ yield bytes
+ except errors.RevisionNotPresent, e:
+ # This shouldn't be able to happen, but as we don't buffer
+ # everything it can in theory happen.
+ repository.unlock()
+ yield FailedSmartServerResponse(('NoSuchRevision', e.revision_id))
+ else:
+ repository.unlock()
+
+
+def _stream_to_byte_stream(stream, src_format):
+ """Convert a record stream to a self delimited byte stream."""
+ pack_writer = pack.ContainerSerialiser()
+ yield pack_writer.begin()
+ yield pack_writer.bytes_record(src_format.network_name(), '')
+ for substream_type, substream in stream:
+ for record in substream:
+ if record.storage_kind in ('chunked', 'fulltext'):
+ serialised = record_to_fulltext_bytes(record)
+ else:
+ serialised = record.get_bytes_as(record.storage_kind)
+ if serialised:
+ # Some streams embed the whole stream into the wire
+ # representation of the first record, which means that
+ # later records have no wire representation: we skip them.
+ yield pack_writer.bytes_record(serialised, [(substream_type,)])
+ yield pack_writer.end()
+
+
+def _byte_stream_to_stream(byte_stream):
+ """Convert a byte stream into a format and a StreamSource stream.
+
+ :param byte_stream: A bytes iterator, as output by _stream_to_byte_stream.
+ :return: (RepositoryFormat, stream_generator)
+ """
+ stream_decoder = pack.ContainerPushParser()
+ def record_stream():
+ """Closure to return the substreams."""
+ # May have fully parsed records already.
+ for record in stream_decoder.read_pending_records():
+ record_names, record_bytes = record
+ record_name, = record_names
+ substream_type = record_name[0]
+ substream = NetworkRecordStream([record_bytes])
+ yield substream_type, substream.read()
+ for bytes in byte_stream:
+ stream_decoder.accept_bytes(bytes)
+ for record in stream_decoder.read_pending_records():
+ record_names, record_bytes = record
+ record_name, = record_names
+ substream_type = record_name[0]
+ substream = NetworkRecordStream([record_bytes])
+ yield substream_type, substream.read()
+ for bytes in byte_stream:
+ stream_decoder.accept_bytes(bytes)
+ for record in stream_decoder.read_pending_records(max=1):
+ record_names, src_format_name = record
+ src_format = network_format_registry.get(src_format_name)
+ return src_format, record_stream()
+
+
class SmartServerRepositoryUnlock(SmartServerRepositoryRequest):
def do_repository_request(self, repository, token):
@@ -415,6 +513,12 @@
class SmartServerRepositoryInsertStream(SmartServerRepositoryRequest):
+ """Insert a record stream from a RemoteSink into a repository.
+
+ This gets bytes pushed to it by the network infrastructure and turns that
+ into a bytes iterator using a thread. That is then processed by
+ _byte_stream_to_stream.
+ """
def do_repository_request(self, repository, resume_tokens):
"""StreamSink.insert_stream for a remote repository."""
@@ -422,44 +526,31 @@
tokens = [token for token in resume_tokens.split(' ') if token]
self.tokens = tokens
self.repository = repository
- self.stream_decoder = pack.ContainerPushParser()
- self.src_format = None
self.queue = Queue.Queue()
- self.insert_thread = None
+ self.insert_thread = threading.Thread(target=self._inserter_thread)
+ self.insert_thread.start()
def do_chunk(self, body_stream_chunk):
- self.stream_decoder.accept_bytes(body_stream_chunk)
- for record in self.stream_decoder.read_pending_records():
- record_names, record_bytes = record
- if self.src_format is None:
- src_format_name = record_bytes
- src_format = network_format_registry.get(src_format_name)
- self.src_format = src_format
- self.insert_thread = threading.Thread(target=self._inserter_thread)
- self.insert_thread.start()
- else:
- record_name, = record_names
- substream_type = record_name[0]
- stream = NetworkRecordStream([record_bytes])
- for record in stream.read():
- self.queue.put((substream_type, [record]))
+ self.queue.put(body_stream_chunk)
def _inserter_thread(self):
try:
+ src_format, stream = _byte_stream_to_stream(
+ self.blocking_byte_stream())
self.insert_result = self.repository._get_sink().insert_stream(
- self.blocking_read_stream(), self.src_format, self.tokens)
+ stream, src_format, self.tokens)
self.insert_ok = True
except:
self.insert_exception = sys.exc_info()
self.insert_ok = False
- def blocking_read_stream(self):
+ def blocking_byte_stream(self):
while True:
- item = self.queue.get()
- if item is StopIteration:
+ bytes = self.queue.get()
+ if bytes is StopIteration:
return
else:
- yield item
+ yield bytes
def do_end(self):
self.queue.put(StopIteration)
=== modified file 'bzrlib/smart/request.py'
--- a/bzrlib/smart/request.py 2009-02-26 04:25:00 +0000
+++ b/bzrlib/smart/request.py 2009-03-02 03:38:07 +0000
@@ -465,6 +465,9 @@
request_handlers.register_lazy(
'Repository.unlock', 'bzrlib.smart.repository', 'SmartServerRepositoryUnlock')
request_handlers.register_lazy(
+ 'Repository.StreamSource.get_stream', 'bzrlib.smart.repository',
+ 'SmartServerRepositoryStreamSourceGetStream')
+request_handlers.register_lazy(
'Repository.tarball', 'bzrlib.smart.repository',
'SmartServerRepositoryTarball')
request_handlers.register_lazy(
=== modified file 'bzrlib/tests/blackbox/test_branch.py'
--- a/bzrlib/tests/blackbox/test_branch.py 2009-02-27 03:54:39 +0000
+++ b/bzrlib/tests/blackbox/test_branch.py 2009-03-02 03:38:07 +0000
@@ -259,6 +259,22 @@
class TestSmartServerBranching(ExternalBase):
+ def test_branch_from_trivial_branch_to_same_server_branch_acceptance(self):
+ self.setup_smart_server_with_call_log()
+ t = self.make_branch_and_tree('from')
+ for count in range(9):
+ t.commit(message='commit %d' % count)
+ self.reset_smart_call_log()
+ out, err = self.run_bzr(['branch', self.get_url('from'),
+ self.get_url('target')])
+ rpc_count = len(self.hpss_calls)
+ # This figure represent the amount of work to perform this use case. It
+ # is entirely ok to reduce this number if a test fails due to rpc_count
+ # being too low. If rpc_count increases, more network roundtrips have
+ # become necessary for this use case. Please do not adjust this number
+ # upwards without agreement from bzr's network support maintainers.
+ self.assertEqual(99, rpc_count)
+
def test_branch_from_trivial_branch_streaming_acceptance(self):
self.setup_smart_server_with_call_log()
t = self.make_branch_and_tree('from')
@@ -273,7 +289,7 @@
# being too low. If rpc_count increases, more network roundtrips have
# become necessary for this use case. Please do not adjust this number
# upwards without agreement from bzr's network support maintainers.
- self.assertEqual(78, rpc_count)
+ self.assertEqual(25, rpc_count)
class TestRemoteBranch(TestCaseWithSFTPServer):
=== modified file 'bzrlib/tests/branch_implementations/test_branch.py'
--- a/bzrlib/tests/branch_implementations/test_branch.py 2009-02-24 08:09:17 +0000
+++ b/bzrlib/tests/branch_implementations/test_branch.py 2009-03-02 03:38:07 +0000
@@ -109,7 +109,7 @@
wt.commit('lala!', rev_id='revision-1', allow_pointless=False)
b2 = self.make_branch('b2')
- self.assertEqual((1, []), b2.fetch(b1))
+ b2.fetch(b1)
rev = b2.repository.get_revision('revision-1')
tree = b2.repository.revision_tree('revision-1')
=== modified file 'bzrlib/tests/bzrdir_implementations/test_bzrdir.py'
--- a/bzrlib/tests/bzrdir_implementations/test_bzrdir.py 2009-02-23 15:29:35 +0000
+++ b/bzrlib/tests/bzrdir_implementations/test_bzrdir.py 2009-03-02 03:38:07 +0000
@@ -553,7 +553,7 @@
# Ensure no format data is cached
a_dir = bzrlib.branch.Branch.open_from_transport(
self.get_transport('source')).bzrdir
- target_transport = a_dir.root_transport.clone('..').clone('target')
+ target_transport = self.get_transport('target')
target_bzrdir = a_dir.clone_on_transport(target_transport)
target_repo = target_bzrdir.open_repository()
source_branch = bzrlib.branch.Branch.open(
@@ -655,7 +655,7 @@
def test_clone_respects_stacked(self):
branch = self.make_branch('parent')
- child_transport = branch.bzrdir.root_transport.clone('../child')
+ child_transport = self.get_transport('child')
child = branch.bzrdir.clone_on_transport(child_transport,
stacked_on=branch.base)
self.assertEqual(child.open_branch().get_stacked_on_url(), branch.base)
=== modified file 'bzrlib/tests/per_repository/test_fetch.py'
--- a/bzrlib/tests/per_repository/test_fetch.py 2009-01-17 01:30:58 +0000
+++ b/bzrlib/tests/per_repository/test_fetch.py 2009-03-02 03:38:07 +0000
@@ -46,7 +46,7 @@
revision_id=None)
## pb=bzrlib.progress.DummyProgress())
- def test_fetch_knit3(self):
+ def test_fetch_to_knit3(self):
# create a repository of the sort we are testing.
tree_a = self.make_branch_and_tree('a')
self.build_tree(['a/foo'])
@@ -80,7 +80,10 @@
try:
tree_b = b_bzrdir.create_workingtree()
except errors.NotLocalUrl:
- raise TestSkipped("cannot make working tree with transport %r"
+ try:
+ tree_b = b_branch.create_checkout('b', lightweight=True)
+ except errors.NotLocalUrl:
+ raise TestSkipped("cannot make working tree with transport %r"
% b_bzrdir.transport)
tree_b.commit('no change', rev_id='rev2')
rev2_tree = knit3_repo.revision_tree('rev2')
=== modified file 'bzrlib/tests/test_smart.py'
--- a/bzrlib/tests/test_smart.py 2009-02-27 01:02:40 +0000
+++ b/bzrlib/tests/test_smart.py 2009-03-02 03:38:07 +0000
@@ -1201,6 +1201,9 @@
smart.request.request_handlers.get('Repository.lock_write'),
smart.repository.SmartServerRepositoryLockWrite)
self.assertEqual(
+ smart.request.request_handlers.get('Repository.StreamSource.get_stream'),
+ smart.repository.SmartServerRepositoryStreamSourceGetStream)
+ self.assertEqual(
smart.request.request_handlers.get('Repository.tarball'),
smart.repository.SmartServerRepositoryTarball)
self.assertEqual(
=== modified file 'bzrlib/versionedfile.py'
--- a/bzrlib/versionedfile.py 2009-02-23 15:29:35 +0000
+++ b/bzrlib/versionedfile.py 2009-03-02 03:38:07 +0000
@@ -1534,19 +1534,19 @@
def fulltext_network_to_record(kind, bytes, line_end):
"""Convert a network fulltext record to record."""
meta_len, = struct.unpack('!L', bytes[line_end:line_end+4])
- record_meta = record_bytes[line_end+4:line_end+4+meta_len]
+ record_meta = bytes[line_end+4:line_end+4+meta_len]
key, parents = bencode.bdecode_as_tuple(record_meta)
if parents == 'nil':
parents = None
- fulltext = record_bytes[line_end+4+meta_len:]
- return FulltextContentFactory(key, parents, None, fulltext)
+ fulltext = bytes[line_end+4+meta_len:]
+ return [FulltextContentFactory(key, parents, None, fulltext)]
def _length_prefix(bytes):
return struct.pack('!L', len(bytes))
-def record_to_fulltext_bytes(self, record):
+def record_to_fulltext_bytes(record):
if record.parents is None:
parents = 'nil'
else:
More information about the bazaar-commits
mailing list