Rev 3701: (jam) Convert sftp to processing as-you-go, rather than all-at-once. in file:///home/pqm/archives/thelove/bzr/%2Btrunk/
Canonical.com Patch Queue Manager
pqm at pqm.ubuntu.com
Wed Sep 10 21:15:12 BST 2008
At file:///home/pqm/archives/thelove/bzr/%2Btrunk/
------------------------------------------------------------
revno: 3701
revision-id: pqm at pqm.ubuntu.com-20080910201504-ennm0qxaeb8yk02l
parent: pqm at pqm.ubuntu.com-20080910194512-tmwbxet1mn08eah8
parent: john at arbash-meinel.com-20080910191443-1cncrx1oo6os71nm
committer: Canonical.com Patch Queue Manager <pqm at pqm.ubuntu.com>
branch nick: +trunk
timestamp: Wed 2008-09-10 21:15:04 +0100
message:
(jam) Convert sftp to processing as-you-go, rather than all-at-once.
modified:
bzrlib/tests/test_sftp_transport.py testsftp.py-20051027032739-247570325fec7e7e
bzrlib/tests/test_transport.py testtransport.py-20050718175618-e5cdb99f4555ddce
bzrlib/transport/__init__.py transport.py-20050711165921-4978aa7ce1285ad5
bzrlib/transport/sftp.py sftp.py-20051019050329-ab48ce71b7e32dfe
------------------------------------------------------------
revno: 3686.1.9
revision-id: john at arbash-meinel.com-20080910191443-1cncrx1oo6os71nm
parent: john at arbash-meinel.com-20080910173301-niz0pw3mcat430o1
committer: John Arbash Meinel <john at arbash-meinel.com>
branch nick: sftp_chunked
timestamp: Wed 2008-09-10 14:14:43 -0500
message:
Overlapping ranges are not allowed anymore.
modified:
bzrlib/tests/test_transport.py testtransport.py-20050718175618-e5cdb99f4555ddce
bzrlib/transport/__init__.py transport.py-20050711165921-4978aa7ce1285ad5
------------------------------------------------------------
revno: 3686.1.8
revision-id: john at arbash-meinel.com-20080910173301-niz0pw3mcat430o1
parent: john at arbash-meinel.com-20080910172134-os9otcyg1rdbgi0s
parent: pqm at pqm.ubuntu.com-20080910053334-fy7gihd6da2nyhy2
committer: John Arbash Meinel <john at arbash-meinel.com>
branch nick: sftp_chunked
timestamp: Wed 2008-09-10 12:33:01 -0500
message:
Merge in bzr.dev 3698
added:
doc/developers/overview.txt overview.txt-20080904022501-ww2ggomrs5elxfm0-1
modified:
NEWS NEWS-20050323055033-4e00b5db738777ff
bzr bzr.py-20050313053754-5485f144c7006fa6
bzr.ico bzr.ico-20060629083000-q18ip0hk7lq55i4y-1
bzrlib/__init__.py __init__.py-20050309040759-33e65acf91bbcd5d
bzrlib/btree_index.py index.py-20080624222253-p0x5f92uyh5hw734-7
bzrlib/bundle/bundle_data.py read_changeset.py-20050619171944-c0d95aa685537640
bzrlib/errors.py errors.py-20050309040759-20512168c4e14fbd
bzrlib/index.py index.py-20070712131115-lolkarso50vjr64s-1
bzrlib/memorytree.py memorytree.py-20060906023413-4wlkalbdpsxi2r4y-1
bzrlib/merge.py merge.py-20050513021216-953b65a438527106
bzrlib/missing.py missing.py-20050812153334-097f7097e2a8bcd1
bzrlib/remote.py remote.py-20060720103555-yeeg2x51vn0rbtdp-1
bzrlib/repository.py rev_storage.py-20051111201905-119e9401e46257e3
bzrlib/smart/message.py message.py-20080222013625-ncqmh3nrxjkxab87-1
bzrlib/smart/repository.py repository.py-20061128022038-vr5wy5bubyb8xttk-1
bzrlib/tests/blackbox/test_outside_wt.py test_outside_wt.py-20060116200058-98edd33e7db8bdde
bzrlib/tests/branch_implementations/test_locking.py test_locking.py-20060707151933-tav3o2hpibwi53u4-4
bzrlib/tests/repository_implementations/test_commit_builder.py test_commit_builder.py-20060606110838-76e3ra5slucqus81-1
bzrlib/tests/repository_implementations/test_repository.py test_repository.py-20060131092128-ad07f494f5c9d26c
bzrlib/tests/test_btree_index.py test_index.py-20080624222253-p0x5f92uyh5hw734-13
bzrlib/tests/test_diff.py testdiff.py-20050727164403-d1a3496ebb12e339
bzrlib/tests/test_dirstate.py test_dirstate.py-20060728012006-d6mvoihjb3je9peu-2
bzrlib/tests/test_errors.py test_errors.py-20060210110251-41aba2deddf936a8
bzrlib/tests/test_remote.py test_remote.py-20060720103555-yeeg2x51vn0rbtdp-2
bzrlib/tests/test_revisiontree.py test_revisiontree.py-20060615095324-aij44ndxbv1h4c9f-1
bzrlib/tests/workingtree_implementations/test_parents.py test_set_parents.py-20060807231740-yicmnlci1mj8smu1-1
bzrlib/tests/workingtree_implementations/test_workingtree.py test_workingtree.py-20060203003124-817757d3e31444fb
bzrlib/workingtree.py workingtree.py-20050511021032-29b6ec0a681e02e3
bzrlib/workingtree_4.py workingtree_4.py-20070208044105-5fgpc5j3ljlh5q6c-1
doc/developers/HACKING.txt HACKING-20050805200004-2a5dc975d870f78c
doc/developers/index.txt index.txt-20070508041241-qznziunkg0nffhiw-1
doc/en/user-guide/undoing_mistakes.txt undoing_mistakes.txt-20071121092300-8fyacngt1w98e5mp-1
setup.py setup.py-20050314065409-02f8a0a6e3f9bc70
------------------------------------------------------------
revno: 3686.1.7
revision-id: john at arbash-meinel.com-20080910172134-os9otcyg1rdbgi0s
parent: john at arbash-meinel.com-20080905212334-69j0qvvr9trvfk1b
committer: John Arbash Meinel <john at arbash-meinel.com>
branch nick: sftp_chunked
timestamp: Wed 2008-09-10 12:21:34 -0500
message:
Purge the list buffer when we have a simple string buffer.
modified:
bzrlib/transport/sftp.py sftp.py-20051019050329-ab48ce71b7e32dfe
------------------------------------------------------------
revno: 3686.1.6
revision-id: john at arbash-meinel.com-20080905212334-69j0qvvr9trvfk1b
parent: john at arbash-meinel.com-20080904030327-6smkelvc2rpjk3k6
committer: John Arbash Meinel <john at arbash-meinel.com>
branch nick: sftp_chunked
timestamp: Fri 2008-09-05 16:23:34 -0500
message:
Respond to Martin's review comments.
modified:
bzrlib/errors.py errors.py-20050309040759-20512168c4e14fbd
bzrlib/tests/test_sftp_transport.py testsftp.py-20051027032739-247570325fec7e7e
bzrlib/transport/__init__.py transport.py-20050711165921-4978aa7ce1285ad5
------------------------------------------------------------
revno: 3686.1.5
revision-id: john at arbash-meinel.com-20080904030327-6smkelvc2rpjk3k6
parent: john at arbash-meinel.com-20080904022255-cmyfd6ykpieggrv7
committer: John Arbash Meinel <john at arbash-meinel.com>
branch nick: sftp_chunked
timestamp: Wed 2008-09-03 22:03:27 -0500
message:
Include the filename in the debug logging.
modified:
bzrlib/transport/sftp.py sftp.py-20051019050329-ab48ce71b7e32dfe
------------------------------------------------------------
revno: 3686.1.4
revision-id: john at arbash-meinel.com-20080904022255-cmyfd6ykpieggrv7
parent: john at arbash-meinel.com-20080904013723-6sa3ejscenhsxvjo
committer: John Arbash Meinel <john at arbash-meinel.com>
branch nick: sftp_chunked
timestamp: Wed 2008-09-03 21:22:55 -0500
message:
rename cur_start => input_start for clarity.
modified:
bzrlib/transport/sftp.py sftp.py-20051019050329-ab48ce71b7e32dfe
------------------------------------------------------------
revno: 3686.1.3
revision-id: john at arbash-meinel.com-20080904013723-6sa3ejscenhsxvjo
parent: john at arbash-meinel.com-20080903232635-8009dsm2fai5nz3c
committer: John Arbash Meinel <john at arbash-meinel.com>
branch nick: sftp_chunked
timestamp: Wed 2008-09-03 20:37:23 -0500
message:
Finish up handling out-of-order requests.
Also handle short reads.
The SFTP test suite now passes.
modified:
bzrlib/transport/sftp.py sftp.py-20051019050329-ab48ce71b7e32dfe
------------------------------------------------------------
revno: 3686.1.2
revision-id: john at arbash-meinel.com-20080903232635-8009dsm2fai5nz3c
parent: john at arbash-meinel.com-20080903220310-1uwt7qt5p1istebv
committer: John Arbash Meinel <john at arbash-meinel.com>
branch nick: sftp_chunked
timestamp: Wed 2008-09-03 18:26:35 -0500
message:
Start moving the readv code into a helper.
We will optimize the helper for things being in order, because
that is a very common case with packs. (90% of the time they
don't care about data order, so they can always stream in
file order.)
modified:
bzrlib/tests/test_sftp_transport.py testsftp.py-20051027032739-247570325fec7e7e
bzrlib/transport/sftp.py sftp.py-20051019050329-ab48ce71b7e32dfe
------------------------------------------------------------
revno: 3686.1.1
revision-id: john at arbash-meinel.com-20080903220310-1uwt7qt5p1istebv
parent: pqm at pqm.ubuntu.com-20080903205840-mteswj8dfvld7vo3
parent: john at arbash-meinel.com-20071217233848-pq8zo3fyr9yt1rc1
committer: John Arbash Meinel <john at arbash-meinel.com>
branch nick: sftp_chunked
timestamp: Wed 2008-09-03 17:03:10 -0500
message:
Merge in some of the changes from the old sftp_chunked branch.
But revert the sftp code itself.
We should start from scratch.
modified:
bzrlib/errors.py errors.py-20050309040759-20512168c4e14fbd
bzrlib/transport/__init__.py transport.py-20050711165921-4978aa7ce1285ad5
------------------------------------------------------------
revno: 3120.2.2
revision-id: john at arbash-meinel.com-20071217233848-pq8zo3fyr9yt1rc1
parent: john at arbash-meinel.com-20071217165633-unoib2xwcy3moixw
committer: John Arbash Meinel <john at arbash-meinel.com>
branch nick: sftp_chunked
timestamp: Mon 2007-12-17 17:38:48 -0600
message:
finish polishing up the sftp code.
modified:
bzrlib/transport/sftp.py sftp.py-20051019050329-ab48ce71b7e32dfe
------------------------------------------------------------
revno: 3120.2.1
revision-id: john at arbash-meinel.com-20071217165633-unoib2xwcy3moixw
parent: pqm at pqm.ubuntu.com-20071217060447-sictlq5nibqhpuec
committer: John Arbash Meinel <john at arbash-meinel.com>
branch nick: sftp_chunked
timestamp: Mon 2007-12-17 10:56:33 -0600
message:
Change the sftp_readv loop to buffer even less.
Instead of waiting until we have a whole collapsed range, start trying to
return data as soon as any data arrives.
Also, there is a common case of having a request be made in
sorted order. Which means that we don't need to buffer much at all.
modified:
bzrlib/errors.py errors.py-20050309040759-20512168c4e14fbd
bzrlib/transport/__init__.py transport.py-20050711165921-4978aa7ce1285ad5
bzrlib/transport/sftp.py sftp.py-20051019050329-ab48ce71b7e32dfe
=== modified file 'bzrlib/tests/test_sftp_transport.py'
--- a/bzrlib/tests/test_sftp_transport.py 2007-12-11 14:26:18 +0000
+++ b/bzrlib/tests/test_sftp_transport.py 2008-09-05 21:23:34 +0000
@@ -30,6 +30,8 @@
from bzrlib import (
bzrdir,
errors,
+ tests,
+ transport as _mod_transport,
)
from bzrlib.osutils import (
pathjoin,
@@ -46,6 +48,7 @@
import bzrlib.transport.http
if paramiko_loaded:
+ from bzrlib.transport import sftp as _mod_sftp
from bzrlib.transport.sftp import (
SFTPAbsoluteServer,
SFTPHomeDirServer,
@@ -76,7 +79,7 @@
set_test_transport_to_sftp(self)
-class SFTPLockTests (TestCaseWithSFTPServer):
+class SFTPLockTests(TestCaseWithSFTPServer):
def test_sftp_locks(self):
from bzrlib.errors import LockError
@@ -459,3 +462,22 @@
self.assertAlmostEqual(t2 - t1, 100 + 7)
+class Test_SFTPReadvHelper(tests.TestCase):
+
+ def checkGetRequests(self, expected_requests, offsets):
+ helper = _mod_sftp._SFTPReadvHelper(offsets, 'artificial_test')
+ self.assertEqual(expected_requests, helper._get_requests())
+
+ def test__get_requests(self):
+ # Small single requests become a single readv request
+ self.checkGetRequests([(0, 100)],
+ [(0, 20), (30, 50), (20, 10), (80, 20)])
+ # Non-contiguous ranges are given as multiple requests
+ self.checkGetRequests([(0, 20), (30, 50)],
+ [(10, 10), (30, 20), (0, 10), (50, 30)])
+ # Ranges larger than _max_request_size (32kB) are broken up into
+ # multiple requests, even if it actually spans multiple logical
+ # requests
+ self.checkGetRequests([(0, 32768), (32768, 32768), (65536, 464)],
+ [(0, 40000), (40000, 100), (40100, 1900),
+ (42000, 24000)])
=== modified file 'bzrlib/tests/test_transport.py'
--- a/bzrlib/tests/test_transport.py 2008-06-19 06:57:22 +0000
+++ b/bzrlib/tests/test_transport.py 2008-09-10 19:14:43 +0000
@@ -180,10 +180,10 @@
self.check([(0, 20, [(0, 10), (10, 10)])],
[(0, 10), (10, 10)])
- # XXX: scary, http.readv() can't handle that --vila20071209
def test_coalesce_overlapped(self):
- self.check([(0, 15, [(0, 10), (5, 10)])],
- [(0, 10), (5, 10)])
+ self.assertRaises(ValueError,
+ self.check, [(0, 15, [(0, 10), (5, 10)])],
+ [(0, 10), (5, 10)])
def test_coalesce_limit(self):
self.check([(10, 50, [(0, 10), (10, 10), (20, 10),
=== modified file 'bzrlib/transport/__init__.py'
--- a/bzrlib/transport/__init__.py 2008-09-03 09:11:20 +0000
+++ b/bzrlib/transport/__init__.py 2008-09-10 19:14:43 +0000
@@ -758,29 +758,31 @@
into a single large request, while retaining the original
offsets.
Turns [(15, 10), (25, 10)] => [(15, 20, [(0, 10), (10, 10)])]
+ Note that overlapping requests are not permitted. (So [(15, 10), (20,
+ 10)] will raise a ValueError.) This is because the data we access never
+ overlaps, and it allows callers to trust that we only need any byte of
+ data for 1 request (so nothing needs to be buffered to fulfill a second
+ request.)
:param offsets: A list of (start, length) pairs
-
:param limit: Only combine a maximum of this many pairs Some transports
penalize multiple reads more than others, and sometimes it is
better to return early.
0 means no limit
-
:param fudge_factor: All transports have some level of 'it is
- better to read some more data and throw it away rather
+ better to read some more data and throw it away rather
than seek', so collapse if we are 'close enough'
-
:param max_size: Create coalesced offsets no bigger than this size.
When a single offset is bigger than 'max_size', it will keep
its size and be alone in the coalesced offset.
0 means no maximum size.
-
- :return: yield _CoalescedOffset objects, which have members for where
- to start, how much to read, and how to split those
- chunks back up
+ :return: return a list of _CoalescedOffset objects, which have members
+ for where to start, how much to read, and how to split those chunks
+ back up
"""
last_end = None
cur = _CoalescedOffset(None, None, [])
+ coalesced_offsets = []
for start, size in offsets:
end = start + size
@@ -789,18 +791,21 @@
and start >= cur.start
and (limit <= 0 or len(cur.ranges) < limit)
and (max_size <= 0 or end - cur.start <= max_size)):
+ if start < last_end:
+ raise ValueError('Overlapping range not allowed:'
+ ' last range ended at %s, new one starts at %s'
+ % (last_end, start))
cur.length = end - cur.start
cur.ranges.append((start-cur.start, size))
else:
if cur.start is not None:
- yield cur
+ coalesced_offsets.append(cur)
cur = _CoalescedOffset(start, size, [(0, size)])
last_end = end
if cur.start is not None:
- yield cur
-
- return
+ coalesced_offsets.append(cur)
+ return coalesced_offsets
def get_multi(self, relpaths, pb=None):
"""Get a list of file-like objects, one for each entry in relpaths.
=== modified file 'bzrlib/transport/sftp.py'
--- a/bzrlib/transport/sftp.py 2008-09-04 02:42:34 +0000
+++ b/bzrlib/transport/sftp.py 2008-09-10 17:33:01 +0000
@@ -24,7 +24,9 @@
# suite. Those formats all date back to 0.7; so we should be able to remove
# these methods when we officially drop support for those formats.
+import bisect
import errno
+import itertools
import os
import random
import select
@@ -131,6 +133,187 @@
pass
+class _SFTPReadvHelper(object):
+ """A class to help with managing the state of a readv request."""
+
+ # See _get_requests for an explanation.
+ _max_request_size = 32768
+
+ def __init__(self, original_offsets, relpath):
+ """Create a new readv helper.
+
+ :param original_offsets: The original requests given by the caller of
+ readv()
+ :param relpath: The name of the file (if known)
+ """
+ self.original_offsets = list(original_offsets)
+ self.relpath = relpath
+
+ def _get_requests(self):
+ """Break up the offsets into individual requests over sftp.
+
+ The SFTP spec only requires implementers to support 32kB requests. We
+ could try something larger (openssh supports 64kB), but then we have to
+ handle requests that fail.
+ So instead, we just break up our maximum chunks into 32kB chunks, and
+ asyncronously requests them.
+ Newer versions of paramiko would do the chunking for us, but we want to
+ start processing results right away, so we do it ourselves.
+ """
+ # TODO: Because we issue async requests, we don't 'fudge' any extra
+ # data. I'm not 100% sure that is the best choice.
+
+ # The first thing we do, is to collapse the individual requests as much
+ # as possible, so we don't issues requests <32kB
+ sorted_offsets = sorted(self.original_offsets)
+ coalesced = list(ConnectedTransport._coalesce_offsets(sorted_offsets,
+ limit=0, fudge_factor=0))
+ requests = []
+ for c_offset in coalesced:
+ start = c_offset.start
+ size = c_offset.length
+
+ # Break this up into 32kB requests
+ while size > 0:
+ next_size = min(size, self._max_request_size)
+ requests.append((start, next_size))
+ size -= next_size
+ start += next_size
+ mutter('SFTP.readv(%s) %s offsets => %s coalesced => %s requests',
+ self.relpath, len(sorted_offsets), len(coalesced),
+ len(requests))
+ return requests
+
+ def request_and_yield_offsets(self, fp):
+ """Request the data from the remote machine, yielding the results.
+
+ :param fp: A Paramiko SFTPFile object that supports readv.
+ :return: Yield the data requested by the original readv caller, one by
+ one.
+ """
+ requests = self._get_requests()
+ offset_iter = iter(self.original_offsets)
+ cur_offset, cur_size = offset_iter.next()
+ # paramiko .readv() yields strings that are in the order of the requests
+ # So we track the current request to know where the next data is
+ # being returned from.
+ input_start = None
+ last_end = None
+ buffered_data = []
+ buffered_len = 0
+
+ # This is used to buffer chunks which we couldn't process yet
+ # It is (start, end, data) tuples.
+ data_chunks = []
+ # Create an 'unlimited' data stream, so we stop based on requests,
+ # rather than just because the data stream ended. This lets us detect
+ # short readv.
+ data_stream = itertools.chain(fp.readv(requests),
+ itertools.repeat(None))
+ for (start, length), data in itertools.izip(requests, data_stream):
+ if data is None:
+ if cur_coalesced is not None:
+ raise errors.ShortReadvError(self.relpath,
+ start, length, len(data))
+ if len(data) != length:
+ raise errors.ShortReadvError(self.relpath,
+ start, length, len(data))
+ if last_end is None:
+ # This is the first request, just buffer it
+ buffered_data = [data]
+ buffered_len = length
+ input_start = start
+ elif start == last_end:
+ # The data we are reading fits neatly on the previous
+ # buffer, so this is all part of a larger coalesced range.
+ buffered_data.append(data)
+ buffered_len += length
+ else:
+ # We have an 'interrupt' in the data stream. So we know we are
+ # at a request boundary.
+ if buffered_len > 0:
+ # We haven't consumed the buffer so far, so put it into
+ # data_chunks, and continue.
+ buffered = ''.join(buffered_data)
+ data_chunks.append((input_start, buffered))
+ input_start = start
+ buffered_data = [data]
+ buffered_len = length
+ last_end = start + length
+ if input_start == cur_offset and cur_size <= buffered_len:
+ # Simplify the next steps a bit by transforming buffered_data
+ # into a single string. We also have the nice property that
+ # when there is only one string ''.join([x]) == x, so there is
+ # no data copying.
+ buffered = ''.join(buffered_data)
+ # Clean out buffered data so that we keep memory
+ # consumption low
+ del buffered_data[:]
+ buffered_offset = 0
+ # TODO: We *could* also consider the case where cur_offset is in
+ # in the buffered range, even though it doesn't *start*
+ # the buffered range. But for packs we pretty much always
+ # read in order, so you won't get any extra data in the
+ # middle.
+ while (input_start == cur_offset
+ and (buffered_offset + cur_size) <= buffered_len):
+ # We've buffered enough data to process this request, spit it
+ # out
+ cur_data = buffered[buffered_offset:buffered_offset + cur_size]
+ # move the direct pointer into our buffered data
+ buffered_offset += cur_size
+ # Move the start-of-buffer pointer
+ input_start += cur_size
+ # Yield the requested data
+ yield cur_offset, cur_data
+ cur_offset, cur_size = offset_iter.next()
+ # at this point, we've consumed as much of buffered as we can,
+ # so break off the portion that we consumed
+ if buffered_offset == len(buffered_data):
+ # No tail to leave behind
+ buffered_data = []
+ buffered_len = 0
+ else:
+ buffered = buffered[buffered_offset:]
+ buffered_data = [buffered]
+ buffered_len = len(buffered)
+ if buffered_len:
+ buffered = ''.join(buffered_data)
+ del buffered_data[:]
+ data_chunks.append((input_start, buffered))
+ if data_chunks:
+ mutter('SFTP readv left with %d out-of-order bytes',
+ sum(map(lambda x: len(x[1]), data_chunks)))
+ # We've processed all the readv data, at this point, anything we
+ # couldn't process is in data_chunks. This doesn't happen often, so
+ # this code path isn't optimized
+ # We use an interesting process for data_chunks
+ # Specifically if we have "bisect_left([(start, len, entries)],
+ # (qstart,)])
+ # If start == qstart, then we get the specific node. Otherwise we
+ # get the previous node
+ while True:
+ idx = bisect.bisect_left(data_chunks, (cur_offset,))
+ if data_chunks[idx][0] == cur_offset: # The data starts here
+ data = data_chunks[idx][1][:cur_size]
+ elif idx > 0:
+ # The data is in a portion of a previous page
+ idx -= 1
+ sub_offset = cur_offset - data_chunks[idx][0]
+ data = data_chunks[idx][1]
+ data = data[sub_offset:sub_offset + cur_size]
+ else:
+ # We are missing the page where the data should be found,
+ # something is wrong
+ data = ''
+ if len(data) != cur_size:
+ raise AssertionError('We must have miscalulated.'
+ ' We expected %d bytes, but only found %d'
+ % (cur_size, len(data)))
+ yield cur_offset, data
+ cur_offset, cur_size = offset_iter.next()
+
+
class SFTPTransport(ConnectedTransport):
"""Transport implementation for SFTP access."""
@@ -265,102 +448,8 @@
does not support ranges > 64K, so it caps the request size, and
just reads until it gets all the stuff it wants
"""
- offsets = list(offsets)
- sorted_offsets = sorted(offsets)
-
- # The algorithm works as follows:
- # 1) Coalesce nearby reads into a single chunk
- # This generates a list of combined regions, the total size
- # and the size of the sub regions. This coalescing step is limited
- # in the number of nearby chunks to combine, and is allowed to
- # skip small breaks in the requests. Limiting it makes sure that
- # we can start yielding some data earlier, and skipping means we
- # make fewer requests. (Beneficial even when using async)
- # 2) Break up this combined regions into chunks that are smaller
- # than 64KiB. Technically the limit is 65536, but we are a
- # little bit conservative. This is because sftp has a maximum
- # return chunk size of 64KiB (max size of an unsigned short)
- # 3) Issue a readv() to paramiko to create an async request for
- # all of this data
- # 4) Read in the data as it comes back, until we've read one
- # continuous section as determined in step 1
- # 5) Break up the full sections into hunks for the original requested
- # offsets. And put them in a cache
- # 6) Check if the next request is in the cache, and if it is, remove
- # it from the cache, and yield its data. Continue until no more
- # entries are in the cache.
- # 7) loop back to step 4 until all data has been read
- #
- # TODO: jam 20060725 This could be optimized one step further, by
- # attempting to yield whatever data we have read, even before
- # the first coallesced section has been fully processed.
-
- # When coalescing for use with readv(), we don't really need to
- # use any fudge factor, because the requests are made asynchronously
- coalesced = list(self._coalesce_offsets(sorted_offsets,
- limit=self._max_readv_combine,
- fudge_factor=0,
- ))
- requests = []
- for c_offset in coalesced:
- start = c_offset.start
- size = c_offset.length
-
- # We need to break this up into multiple requests
- while size > 0:
- next_size = min(size, self._max_request_size)
- requests.append((start, next_size))
- size -= next_size
- start += next_size
-
- mutter('SFTP.readv() %s offsets => %s coalesced => %s requests',
- len(offsets), len(coalesced), len(requests))
-
- # Queue the current read until we have read the full coalesced section
- cur_data = []
- cur_data_len = 0
- cur_coalesced_stack = iter(coalesced)
- cur_coalesced = cur_coalesced_stack.next()
-
- # Cache the results, but only until they have been fulfilled
- data_map = {}
- # turn the list of offsets into a stack
- offset_stack = iter(offsets)
- cur_offset_and_size = offset_stack.next()
-
- for data in fp.readv(requests):
- cur_data.append(data)
- cur_data_len += len(data)
-
- if cur_data_len < cur_coalesced.length:
- continue
- if cur_data_len != cur_coalesced.length:
- raise AssertionError(
- "Somehow we read too much: %s != %s"
- % (cur_data_len, cur_coalesced.length))
- all_data = ''.join(cur_data)
- cur_data = []
- cur_data_len = 0
-
- for suboffset, subsize in cur_coalesced.ranges:
- key = (cur_coalesced.start+suboffset, subsize)
- data_map[key] = all_data[suboffset:suboffset+subsize]
-
- # Now that we've read some data, see if we can yield anything back
- while cur_offset_and_size in data_map:
- this_data = data_map.pop(cur_offset_and_size)
- yield cur_offset_and_size[0], this_data
- cur_offset_and_size = offset_stack.next()
-
- # We read a coalesced entry, so mark it as done
- cur_coalesced = None
- # Now that we've read all of the data for this coalesced section
- # on to the next
- cur_coalesced = cur_coalesced_stack.next()
-
- if cur_coalesced is not None:
- raise errors.ShortReadvError(relpath, cur_coalesced.start,
- cur_coalesced.length, len(data))
+ helper = _SFTPReadvHelper(offsets, relpath)
+ return helper.request_and_yield_offsets(fp)
def put_file(self, relpath, f, mode=None):
"""
More information about the bazaar-commits
mailing list