Rev 3701: (jam) Convert sftp to processing as-you-go, rather than all-at-once. in file:///home/pqm/archives/thelove/bzr/%2Btrunk/

Canonical.com Patch Queue Manager pqm at pqm.ubuntu.com
Wed Sep 10 21:15:12 BST 2008


At file:///home/pqm/archives/thelove/bzr/%2Btrunk/

------------------------------------------------------------
revno: 3701
revision-id: pqm at pqm.ubuntu.com-20080910201504-ennm0qxaeb8yk02l
parent: pqm at pqm.ubuntu.com-20080910194512-tmwbxet1mn08eah8
parent: john at arbash-meinel.com-20080910191443-1cncrx1oo6os71nm
committer: Canonical.com Patch Queue Manager <pqm at pqm.ubuntu.com>
branch nick: +trunk
timestamp: Wed 2008-09-10 21:15:04 +0100
message:
  (jam) Convert sftp to processing as-you-go, rather than all-at-once.
modified:
  bzrlib/tests/test_sftp_transport.py testsftp.py-20051027032739-247570325fec7e7e
  bzrlib/tests/test_transport.py testtransport.py-20050718175618-e5cdb99f4555ddce
  bzrlib/transport/__init__.py   transport.py-20050711165921-4978aa7ce1285ad5
  bzrlib/transport/sftp.py       sftp.py-20051019050329-ab48ce71b7e32dfe
    ------------------------------------------------------------
    revno: 3686.1.9
    revision-id: john at arbash-meinel.com-20080910191443-1cncrx1oo6os71nm
    parent: john at arbash-meinel.com-20080910173301-niz0pw3mcat430o1
    committer: John Arbash Meinel <john at arbash-meinel.com>
    branch nick: sftp_chunked
    timestamp: Wed 2008-09-10 14:14:43 -0500
    message:
      Overlapping ranges are not allowed anymore.
    modified:
      bzrlib/tests/test_transport.py testtransport.py-20050718175618-e5cdb99f4555ddce
      bzrlib/transport/__init__.py   transport.py-20050711165921-4978aa7ce1285ad5
    ------------------------------------------------------------
    revno: 3686.1.8
    revision-id: john at arbash-meinel.com-20080910173301-niz0pw3mcat430o1
    parent: john at arbash-meinel.com-20080910172134-os9otcyg1rdbgi0s
    parent: pqm at pqm.ubuntu.com-20080910053334-fy7gihd6da2nyhy2
    committer: John Arbash Meinel <john at arbash-meinel.com>
    branch nick: sftp_chunked
    timestamp: Wed 2008-09-10 12:33:01 -0500
    message:
      Merge in bzr.dev 3698
    added:
      doc/developers/overview.txt    overview.txt-20080904022501-ww2ggomrs5elxfm0-1
    modified:
      NEWS                           NEWS-20050323055033-4e00b5db738777ff
      bzr                            bzr.py-20050313053754-5485f144c7006fa6
      bzr.ico                        bzr.ico-20060629083000-q18ip0hk7lq55i4y-1
      bzrlib/__init__.py             __init__.py-20050309040759-33e65acf91bbcd5d
      bzrlib/btree_index.py          index.py-20080624222253-p0x5f92uyh5hw734-7
      bzrlib/bundle/bundle_data.py   read_changeset.py-20050619171944-c0d95aa685537640
      bzrlib/errors.py               errors.py-20050309040759-20512168c4e14fbd
      bzrlib/index.py                index.py-20070712131115-lolkarso50vjr64s-1
      bzrlib/memorytree.py           memorytree.py-20060906023413-4wlkalbdpsxi2r4y-1
      bzrlib/merge.py                merge.py-20050513021216-953b65a438527106
      bzrlib/missing.py              missing.py-20050812153334-097f7097e2a8bcd1
      bzrlib/remote.py               remote.py-20060720103555-yeeg2x51vn0rbtdp-1
      bzrlib/repository.py           rev_storage.py-20051111201905-119e9401e46257e3
      bzrlib/smart/message.py        message.py-20080222013625-ncqmh3nrxjkxab87-1
      bzrlib/smart/repository.py     repository.py-20061128022038-vr5wy5bubyb8xttk-1
      bzrlib/tests/blackbox/test_outside_wt.py test_outside_wt.py-20060116200058-98edd33e7db8bdde
      bzrlib/tests/branch_implementations/test_locking.py test_locking.py-20060707151933-tav3o2hpibwi53u4-4
      bzrlib/tests/repository_implementations/test_commit_builder.py test_commit_builder.py-20060606110838-76e3ra5slucqus81-1
      bzrlib/tests/repository_implementations/test_repository.py test_repository.py-20060131092128-ad07f494f5c9d26c
      bzrlib/tests/test_btree_index.py test_index.py-20080624222253-p0x5f92uyh5hw734-13
      bzrlib/tests/test_diff.py      testdiff.py-20050727164403-d1a3496ebb12e339
      bzrlib/tests/test_dirstate.py  test_dirstate.py-20060728012006-d6mvoihjb3je9peu-2
      bzrlib/tests/test_errors.py    test_errors.py-20060210110251-41aba2deddf936a8
      bzrlib/tests/test_remote.py    test_remote.py-20060720103555-yeeg2x51vn0rbtdp-2
      bzrlib/tests/test_revisiontree.py test_revisiontree.py-20060615095324-aij44ndxbv1h4c9f-1
      bzrlib/tests/workingtree_implementations/test_parents.py test_set_parents.py-20060807231740-yicmnlci1mj8smu1-1
      bzrlib/tests/workingtree_implementations/test_workingtree.py test_workingtree.py-20060203003124-817757d3e31444fb
      bzrlib/workingtree.py          workingtree.py-20050511021032-29b6ec0a681e02e3
      bzrlib/workingtree_4.py        workingtree_4.py-20070208044105-5fgpc5j3ljlh5q6c-1
      doc/developers/HACKING.txt     HACKING-20050805200004-2a5dc975d870f78c
      doc/developers/index.txt       index.txt-20070508041241-qznziunkg0nffhiw-1
      doc/en/user-guide/undoing_mistakes.txt undoing_mistakes.txt-20071121092300-8fyacngt1w98e5mp-1
      setup.py                       setup.py-20050314065409-02f8a0a6e3f9bc70
    ------------------------------------------------------------
    revno: 3686.1.7
    revision-id: john at arbash-meinel.com-20080910172134-os9otcyg1rdbgi0s
    parent: john at arbash-meinel.com-20080905212334-69j0qvvr9trvfk1b
    committer: John Arbash Meinel <john at arbash-meinel.com>
    branch nick: sftp_chunked
    timestamp: Wed 2008-09-10 12:21:34 -0500
    message:
      Purge the list buffer when we have a simple string buffer.
    modified:
      bzrlib/transport/sftp.py       sftp.py-20051019050329-ab48ce71b7e32dfe
    ------------------------------------------------------------
    revno: 3686.1.6
    revision-id: john at arbash-meinel.com-20080905212334-69j0qvvr9trvfk1b
    parent: john at arbash-meinel.com-20080904030327-6smkelvc2rpjk3k6
    committer: John Arbash Meinel <john at arbash-meinel.com>
    branch nick: sftp_chunked
    timestamp: Fri 2008-09-05 16:23:34 -0500
    message:
      Respond to Martin's review comments.
    modified:
      bzrlib/errors.py               errors.py-20050309040759-20512168c4e14fbd
      bzrlib/tests/test_sftp_transport.py testsftp.py-20051027032739-247570325fec7e7e
      bzrlib/transport/__init__.py   transport.py-20050711165921-4978aa7ce1285ad5
    ------------------------------------------------------------
    revno: 3686.1.5
    revision-id: john at arbash-meinel.com-20080904030327-6smkelvc2rpjk3k6
    parent: john at arbash-meinel.com-20080904022255-cmyfd6ykpieggrv7
    committer: John Arbash Meinel <john at arbash-meinel.com>
    branch nick: sftp_chunked
    timestamp: Wed 2008-09-03 22:03:27 -0500
    message:
      Include the filename in the debug logging.
    modified:
      bzrlib/transport/sftp.py       sftp.py-20051019050329-ab48ce71b7e32dfe
    ------------------------------------------------------------
    revno: 3686.1.4
    revision-id: john at arbash-meinel.com-20080904022255-cmyfd6ykpieggrv7
    parent: john at arbash-meinel.com-20080904013723-6sa3ejscenhsxvjo
    committer: John Arbash Meinel <john at arbash-meinel.com>
    branch nick: sftp_chunked
    timestamp: Wed 2008-09-03 21:22:55 -0500
    message:
      rename cur_start => input_start for clarity.
    modified:
      bzrlib/transport/sftp.py       sftp.py-20051019050329-ab48ce71b7e32dfe
    ------------------------------------------------------------
    revno: 3686.1.3
    revision-id: john at arbash-meinel.com-20080904013723-6sa3ejscenhsxvjo
    parent: john at arbash-meinel.com-20080903232635-8009dsm2fai5nz3c
    committer: John Arbash Meinel <john at arbash-meinel.com>
    branch nick: sftp_chunked
    timestamp: Wed 2008-09-03 20:37:23 -0500
    message:
      Finish up handling out-of-order requests.
      Also handle short reads.
      The SFTP test suite now passes.
    modified:
      bzrlib/transport/sftp.py       sftp.py-20051019050329-ab48ce71b7e32dfe
    ------------------------------------------------------------
    revno: 3686.1.2
    revision-id: john at arbash-meinel.com-20080903232635-8009dsm2fai5nz3c
    parent: john at arbash-meinel.com-20080903220310-1uwt7qt5p1istebv
    committer: John Arbash Meinel <john at arbash-meinel.com>
    branch nick: sftp_chunked
    timestamp: Wed 2008-09-03 18:26:35 -0500
    message:
      Start moving the readv code into a helper.
      We will optimize the helper for things being in order, because
      that is a very common case with packs. (90% of the time they
      don't care about data order, so they can always stream in
      file order.)
    modified:
      bzrlib/tests/test_sftp_transport.py testsftp.py-20051027032739-247570325fec7e7e
      bzrlib/transport/sftp.py       sftp.py-20051019050329-ab48ce71b7e32dfe
    ------------------------------------------------------------
    revno: 3686.1.1
    revision-id: john at arbash-meinel.com-20080903220310-1uwt7qt5p1istebv
    parent: pqm at pqm.ubuntu.com-20080903205840-mteswj8dfvld7vo3
    parent: john at arbash-meinel.com-20071217233848-pq8zo3fyr9yt1rc1
    committer: John Arbash Meinel <john at arbash-meinel.com>
    branch nick: sftp_chunked
    timestamp: Wed 2008-09-03 17:03:10 -0500
    message:
      Merge in some of the changes from the old sftp_chunked branch.
      But revert the sftp code itself.
      We should start from scratch.
    modified:
      bzrlib/errors.py               errors.py-20050309040759-20512168c4e14fbd
      bzrlib/transport/__init__.py   transport.py-20050711165921-4978aa7ce1285ad5
    ------------------------------------------------------------
    revno: 3120.2.2
    revision-id: john at arbash-meinel.com-20071217233848-pq8zo3fyr9yt1rc1
    parent: john at arbash-meinel.com-20071217165633-unoib2xwcy3moixw
    committer: John Arbash Meinel <john at arbash-meinel.com>
    branch nick: sftp_chunked
    timestamp: Mon 2007-12-17 17:38:48 -0600
    message:
      finish polishing up the sftp code.
    modified:
      bzrlib/transport/sftp.py       sftp.py-20051019050329-ab48ce71b7e32dfe
    ------------------------------------------------------------
    revno: 3120.2.1
    revision-id: john at arbash-meinel.com-20071217165633-unoib2xwcy3moixw
    parent: pqm at pqm.ubuntu.com-20071217060447-sictlq5nibqhpuec
    committer: John Arbash Meinel <john at arbash-meinel.com>
    branch nick: sftp_chunked
    timestamp: Mon 2007-12-17 10:56:33 -0600
    message:
      Change the sftp_readv loop to buffer even less.
      Instead of waiting until we have a whole collapsed range, start trying to
      return data as soon as any data arrives.
      Also, there is a common case of having a request be made in
      sorted order. Which means that we don't need to buffer much at all.
    modified:
      bzrlib/errors.py               errors.py-20050309040759-20512168c4e14fbd
      bzrlib/transport/__init__.py   transport.py-20050711165921-4978aa7ce1285ad5
      bzrlib/transport/sftp.py       sftp.py-20051019050329-ab48ce71b7e32dfe
=== modified file 'bzrlib/tests/test_sftp_transport.py'
--- a/bzrlib/tests/test_sftp_transport.py	2007-12-11 14:26:18 +0000
+++ b/bzrlib/tests/test_sftp_transport.py	2008-09-05 21:23:34 +0000
@@ -30,6 +30,8 @@
 from bzrlib import (
     bzrdir,
     errors,
+    tests,
+    transport as _mod_transport,
     )
 from bzrlib.osutils import (
     pathjoin,
@@ -46,6 +48,7 @@
 import bzrlib.transport.http
 
 if paramiko_loaded:
+    from bzrlib.transport import sftp as _mod_sftp
     from bzrlib.transport.sftp import (
         SFTPAbsoluteServer,
         SFTPHomeDirServer,
@@ -76,7 +79,7 @@
         set_test_transport_to_sftp(self)
 
 
-class SFTPLockTests (TestCaseWithSFTPServer):
+class SFTPLockTests(TestCaseWithSFTPServer):
 
     def test_sftp_locks(self):
         from bzrlib.errors import LockError
@@ -459,3 +462,22 @@
         self.assertAlmostEqual(t2 - t1, 100 + 7)
 
 
+class Test_SFTPReadvHelper(tests.TestCase):
+
+    def checkGetRequests(self, expected_requests, offsets):
+        helper = _mod_sftp._SFTPReadvHelper(offsets, 'artificial_test')
+        self.assertEqual(expected_requests, helper._get_requests())
+
+    def test__get_requests(self):
+        # Small single requests become a single readv request
+        self.checkGetRequests([(0, 100)],
+                              [(0, 20), (30, 50), (20, 10), (80, 20)])
+        # Non-contiguous ranges are given as multiple requests
+        self.checkGetRequests([(0, 20), (30, 50)],
+                              [(10, 10), (30, 20), (0, 10), (50, 30)])
+        # Ranges larger than _max_request_size (32kB) are broken up into
+        # multiple requests, even if it actually spans multiple logical
+        # requests
+        self.checkGetRequests([(0, 32768), (32768, 32768), (65536, 464)],
+                              [(0, 40000), (40000, 100), (40100, 1900),
+                               (42000, 24000)])

=== modified file 'bzrlib/tests/test_transport.py'
--- a/bzrlib/tests/test_transport.py	2008-06-19 06:57:22 +0000
+++ b/bzrlib/tests/test_transport.py	2008-09-10 19:14:43 +0000
@@ -180,10 +180,10 @@
         self.check([(0, 20, [(0, 10), (10, 10)])],
                    [(0, 10), (10, 10)])
 
-    # XXX: scary, http.readv() can't handle that --vila20071209
     def test_coalesce_overlapped(self):
-        self.check([(0, 15, [(0, 10), (5, 10)])],
-                   [(0, 10), (5, 10)])
+        self.assertRaises(ValueError,
+            self.check, [(0, 15, [(0, 10), (5, 10)])],
+                        [(0, 10), (5, 10)])
 
     def test_coalesce_limit(self):
         self.check([(10, 50, [(0, 10), (10, 10), (20, 10),

=== modified file 'bzrlib/transport/__init__.py'
--- a/bzrlib/transport/__init__.py	2008-09-03 09:11:20 +0000
+++ b/bzrlib/transport/__init__.py	2008-09-10 19:14:43 +0000
@@ -758,29 +758,31 @@
         into a single large request, while retaining the original
         offsets.
         Turns  [(15, 10), (25, 10)] => [(15, 20, [(0, 10), (10, 10)])]
+        Note that overlapping requests are not permitted. (So [(15, 10), (20,
+        10)] will raise a ValueError.) This is because the data we access never
+        overlaps, and it allows callers to trust that we only need any byte of
+        data for 1 request (so nothing needs to be buffered to fulfill a second
+        request.)
 
         :param offsets: A list of (start, length) pairs
-
         :param limit: Only combine a maximum of this many pairs Some transports
                 penalize multiple reads more than others, and sometimes it is
                 better to return early.
                 0 means no limit
-
         :param fudge_factor: All transports have some level of 'it is
-                better to read some more data and throw it away rather 
+                better to read some more data and throw it away rather
                 than seek', so collapse if we are 'close enough'
-
         :param max_size: Create coalesced offsets no bigger than this size.
                 When a single offset is bigger than 'max_size', it will keep
                 its size and be alone in the coalesced offset.
                 0 means no maximum size.
-
-        :return: yield _CoalescedOffset objects, which have members for where
-                to start, how much to read, and how to split those 
-                chunks back up
+        :return: return a list of _CoalescedOffset objects, which have members
+            for where to start, how much to read, and how to split those chunks
+            back up
         """
         last_end = None
         cur = _CoalescedOffset(None, None, [])
+        coalesced_offsets = []
 
         for start, size in offsets:
             end = start + size
@@ -789,18 +791,21 @@
                 and start >= cur.start
                 and (limit <= 0 or len(cur.ranges) < limit)
                 and (max_size <= 0 or end - cur.start <= max_size)):
+                if start < last_end:
+                    raise ValueError('Overlapping range not allowed:'
+                        ' last range ended at %s, new one starts at %s'
+                        % (last_end, start))
                 cur.length = end - cur.start
                 cur.ranges.append((start-cur.start, size))
             else:
                 if cur.start is not None:
-                    yield cur
+                    coalesced_offsets.append(cur)
                 cur = _CoalescedOffset(start, size, [(0, size)])
             last_end = end
 
         if cur.start is not None:
-            yield cur
-
-        return
+            coalesced_offsets.append(cur)
+        return coalesced_offsets
 
     def get_multi(self, relpaths, pb=None):
         """Get a list of file-like objects, one for each entry in relpaths.

=== modified file 'bzrlib/transport/sftp.py'
--- a/bzrlib/transport/sftp.py	2008-09-04 02:42:34 +0000
+++ b/bzrlib/transport/sftp.py	2008-09-10 17:33:01 +0000
@@ -24,7 +24,9 @@
 # suite.  Those formats all date back to 0.7; so we should be able to remove
 # these methods when we officially drop support for those formats.
 
+import bisect
 import errno
+import itertools
 import os
 import random
 import select
@@ -131,6 +133,187 @@
             pass
 
 
+class _SFTPReadvHelper(object):
+    """A class to help with managing the state of a readv request."""
+
+    # See _get_requests for an explanation.
+    _max_request_size = 32768
+
+    def __init__(self, original_offsets, relpath):
+        """Create a new readv helper.
+
+        :param original_offsets: The original requests given by the caller of
+            readv()
+        :param relpath: The name of the file (if known)
+        """
+        self.original_offsets = list(original_offsets)
+        self.relpath = relpath
+
+    def _get_requests(self):
+        """Break up the offsets into individual requests over sftp.
+
+        The SFTP spec only requires implementers to support 32kB requests. We
+        could try something larger (openssh supports 64kB), but then we have to
+        handle requests that fail.
+        So instead, we just break up our maximum chunks into 32kB chunks, and
+        asyncronously requests them.
+        Newer versions of paramiko would do the chunking for us, but we want to
+        start processing results right away, so we do it ourselves.
+        """
+        # TODO: Because we issue async requests, we don't 'fudge' any extra
+        #       data.  I'm not 100% sure that is the best choice.
+
+        # The first thing we do, is to collapse the individual requests as much
+        # as possible, so we don't issues requests <32kB
+        sorted_offsets = sorted(self.original_offsets)
+        coalesced = list(ConnectedTransport._coalesce_offsets(sorted_offsets,
+                                                        limit=0, fudge_factor=0))
+        requests = []
+        for c_offset in coalesced:
+            start = c_offset.start
+            size = c_offset.length
+
+            # Break this up into 32kB requests
+            while size > 0:
+                next_size = min(size, self._max_request_size)
+                requests.append((start, next_size))
+                size -= next_size
+                start += next_size
+        mutter('SFTP.readv(%s) %s offsets => %s coalesced => %s requests',
+               self.relpath, len(sorted_offsets), len(coalesced),
+               len(requests))
+        return requests
+
+    def request_and_yield_offsets(self, fp):
+        """Request the data from the remote machine, yielding the results.
+
+        :param fp: A Paramiko SFTPFile object that supports readv.
+        :return: Yield the data requested by the original readv caller, one by
+            one.
+        """
+        requests = self._get_requests()
+        offset_iter = iter(self.original_offsets)
+        cur_offset, cur_size = offset_iter.next()
+        # paramiko .readv() yields strings that are in the order of the requests
+        # So we track the current request to know where the next data is
+        # being returned from.
+        input_start = None
+        last_end = None
+        buffered_data = []
+        buffered_len = 0
+
+        # This is used to buffer chunks which we couldn't process yet
+        # It is (start, end, data) tuples.
+        data_chunks = []
+        # Create an 'unlimited' data stream, so we stop based on requests,
+        # rather than just because the data stream ended. This lets us detect
+        # short readv.
+        data_stream = itertools.chain(fp.readv(requests),
+                                      itertools.repeat(None))
+        for (start, length), data in itertools.izip(requests, data_stream):
+            if data is None:
+                if cur_coalesced is not None:
+                    raise errors.ShortReadvError(self.relpath,
+                        start, length, len(data))
+            if len(data) != length:
+                raise errors.ShortReadvError(self.relpath,
+                    start, length, len(data))
+            if last_end is None:
+                # This is the first request, just buffer it
+                buffered_data = [data]
+                buffered_len = length
+                input_start = start
+            elif start == last_end:
+                # The data we are reading fits neatly on the previous
+                # buffer, so this is all part of a larger coalesced range.
+                buffered_data.append(data)
+                buffered_len += length
+            else:
+                # We have an 'interrupt' in the data stream. So we know we are
+                # at a request boundary.
+                if buffered_len > 0:
+                    # We haven't consumed the buffer so far, so put it into
+                    # data_chunks, and continue.
+                    buffered = ''.join(buffered_data)
+                    data_chunks.append((input_start, buffered))
+                input_start = start
+                buffered_data = [data]
+                buffered_len = length
+            last_end = start + length
+            if input_start == cur_offset and cur_size <= buffered_len:
+                # Simplify the next steps a bit by transforming buffered_data
+                # into a single string. We also have the nice property that
+                # when there is only one string ''.join([x]) == x, so there is
+                # no data copying.
+                buffered = ''.join(buffered_data)
+                # Clean out buffered data so that we keep memory
+                # consumption low
+                del buffered_data[:]
+                buffered_offset = 0
+                # TODO: We *could* also consider the case where cur_offset is in
+                #       in the buffered range, even though it doesn't *start*
+                #       the buffered range. But for packs we pretty much always
+                #       read in order, so you won't get any extra data in the
+                #       middle.
+                while (input_start == cur_offset
+                       and (buffered_offset + cur_size) <= buffered_len):
+                    # We've buffered enough data to process this request, spit it
+                    # out
+                    cur_data = buffered[buffered_offset:buffered_offset + cur_size]
+                    # move the direct pointer into our buffered data
+                    buffered_offset += cur_size
+                    # Move the start-of-buffer pointer
+                    input_start += cur_size
+                    # Yield the requested data
+                    yield cur_offset, cur_data
+                    cur_offset, cur_size = offset_iter.next()
+                # at this point, we've consumed as much of buffered as we can,
+                # so break off the portion that we consumed
+                if buffered_offset == len(buffered_data):
+                    # No tail to leave behind
+                    buffered_data = []
+                    buffered_len = 0
+                else:
+                    buffered = buffered[buffered_offset:]
+                    buffered_data = [buffered]
+                    buffered_len = len(buffered)
+        if buffered_len:
+            buffered = ''.join(buffered_data)
+            del buffered_data[:]
+            data_chunks.append((input_start, buffered))
+        if data_chunks:
+            mutter('SFTP readv left with %d out-of-order bytes',
+                   sum(map(lambda x: len(x[1]), data_chunks)))
+            # We've processed all the readv data, at this point, anything we
+            # couldn't process is in data_chunks. This doesn't happen often, so
+            # this code path isn't optimized
+            # We use an interesting process for data_chunks
+            # Specifically if we have "bisect_left([(start, len, entries)],
+            #                                       (qstart,)])
+            # If start == qstart, then we get the specific node. Otherwise we
+            # get the previous node
+            while True:
+                idx = bisect.bisect_left(data_chunks, (cur_offset,))
+                if data_chunks[idx][0] == cur_offset: # The data starts here
+                    data = data_chunks[idx][1][:cur_size]
+                elif idx > 0:
+                    # The data is in a portion of a previous page
+                    idx -= 1
+                    sub_offset = cur_offset - data_chunks[idx][0]
+                    data = data_chunks[idx][1]
+                    data = data[sub_offset:sub_offset + cur_size]
+                else:
+                    # We are missing the page where the data should be found,
+                    # something is wrong
+                    data = ''
+                if len(data) != cur_size:
+                    raise AssertionError('We must have miscalulated.'
+                        ' We expected %d bytes, but only found %d'
+                        % (cur_size, len(data)))
+                yield cur_offset, data
+                cur_offset, cur_size = offset_iter.next()
+
+
 class SFTPTransport(ConnectedTransport):
     """Transport implementation for SFTP access."""
 
@@ -265,102 +448,8 @@
         does not support ranges > 64K, so it caps the request size, and
         just reads until it gets all the stuff it wants
         """
-        offsets = list(offsets)
-        sorted_offsets = sorted(offsets)
-
-        # The algorithm works as follows:
-        # 1) Coalesce nearby reads into a single chunk
-        #    This generates a list of combined regions, the total size
-        #    and the size of the sub regions. This coalescing step is limited
-        #    in the number of nearby chunks to combine, and is allowed to
-        #    skip small breaks in the requests. Limiting it makes sure that
-        #    we can start yielding some data earlier, and skipping means we
-        #    make fewer requests. (Beneficial even when using async)
-        # 2) Break up this combined regions into chunks that are smaller
-        #    than 64KiB. Technically the limit is 65536, but we are a
-        #    little bit conservative. This is because sftp has a maximum
-        #    return chunk size of 64KiB (max size of an unsigned short)
-        # 3) Issue a readv() to paramiko to create an async request for
-        #    all of this data
-        # 4) Read in the data as it comes back, until we've read one
-        #    continuous section as determined in step 1
-        # 5) Break up the full sections into hunks for the original requested
-        #    offsets. And put them in a cache
-        # 6) Check if the next request is in the cache, and if it is, remove
-        #    it from the cache, and yield its data. Continue until no more
-        #    entries are in the cache.
-        # 7) loop back to step 4 until all data has been read
-        #
-        # TODO: jam 20060725 This could be optimized one step further, by
-        #       attempting to yield whatever data we have read, even before
-        #       the first coallesced section has been fully processed.
-
-        # When coalescing for use with readv(), we don't really need to
-        # use any fudge factor, because the requests are made asynchronously
-        coalesced = list(self._coalesce_offsets(sorted_offsets,
-                               limit=self._max_readv_combine,
-                               fudge_factor=0,
-                               ))
-        requests = []
-        for c_offset in coalesced:
-            start = c_offset.start
-            size = c_offset.length
-
-            # We need to break this up into multiple requests
-            while size > 0:
-                next_size = min(size, self._max_request_size)
-                requests.append((start, next_size))
-                size -= next_size
-                start += next_size
-
-        mutter('SFTP.readv() %s offsets => %s coalesced => %s requests',
-                len(offsets), len(coalesced), len(requests))
-
-        # Queue the current read until we have read the full coalesced section
-        cur_data = []
-        cur_data_len = 0
-        cur_coalesced_stack = iter(coalesced)
-        cur_coalesced = cur_coalesced_stack.next()
-
-        # Cache the results, but only until they have been fulfilled
-        data_map = {}
-        # turn the list of offsets into a stack
-        offset_stack = iter(offsets)
-        cur_offset_and_size = offset_stack.next()
-
-        for data in fp.readv(requests):
-            cur_data.append(data)
-            cur_data_len += len(data)
-
-            if cur_data_len < cur_coalesced.length:
-                continue
-            if cur_data_len != cur_coalesced.length:
-                raise AssertionError(
-                    "Somehow we read too much: %s != %s" 
-                    % (cur_data_len, cur_coalesced.length))
-            all_data = ''.join(cur_data)
-            cur_data = []
-            cur_data_len = 0
-
-            for suboffset, subsize in cur_coalesced.ranges:
-                key = (cur_coalesced.start+suboffset, subsize)
-                data_map[key] = all_data[suboffset:suboffset+subsize]
-
-            # Now that we've read some data, see if we can yield anything back
-            while cur_offset_and_size in data_map:
-                this_data = data_map.pop(cur_offset_and_size)
-                yield cur_offset_and_size[0], this_data
-                cur_offset_and_size = offset_stack.next()
-
-            # We read a coalesced entry, so mark it as done
-            cur_coalesced = None
-            # Now that we've read all of the data for this coalesced section
-            # on to the next
-            cur_coalesced = cur_coalesced_stack.next()
-
-        if cur_coalesced is not None:
-            raise errors.ShortReadvError(relpath, cur_coalesced.start,
-                cur_coalesced.length, len(data))
+        helper = _SFTPReadvHelper(offsets, relpath)
+        return helper.request_and_yield_offsets(fp)
 
     def put_file(self, relpath, f, mode=None):
         """




More information about the bazaar-commits mailing list