Rev 3688: Start moving the readv code into a helper. in http://bzr.arbash-meinel.com/branches/bzr/1.7-dev/sftp_chunked

John Arbash Meinel john at arbash-meinel.com
Thu Sep 4 00:26:38 BST 2008


At http://bzr.arbash-meinel.com/branches/bzr/1.7-dev/sftp_chunked

------------------------------------------------------------
revno: 3688
revision-id: john at arbash-meinel.com-20080903232635-8009dsm2fai5nz3c
parent: john at arbash-meinel.com-20080903220310-1uwt7qt5p1istebv
committer: John Arbash Meinel <john at arbash-meinel.com>
branch nick: sftp_chunked
timestamp: Wed 2008-09-03 18:26:35 -0500
message:
  Start moving the readv code into a helper.
  We will optimize the helper for things being in order, because
  that is a very common case with packs. (90% of the time they
  don't care about data order, so they can always stream in
  file order.)
-------------- next part --------------
=== modified file 'bzrlib/tests/test_sftp_transport.py'
--- a/bzrlib/tests/test_sftp_transport.py	2007-12-11 14:26:18 +0000
+++ b/bzrlib/tests/test_sftp_transport.py	2008-09-03 23:26:35 +0000
@@ -30,6 +30,8 @@
 from bzrlib import (
     bzrdir,
     errors,
+    tests,
+    transport as _mod_transport,
     )
 from bzrlib.osutils import (
     pathjoin,
@@ -46,6 +48,7 @@
 import bzrlib.transport.http
 
 if paramiko_loaded:
+    from bzrlib.transport import sftp as _mod_sftp
     from bzrlib.transport.sftp import (
         SFTPAbsoluteServer,
         SFTPHomeDirServer,
@@ -76,7 +79,7 @@
         set_test_transport_to_sftp(self)
 
 
-class SFTPLockTests (TestCaseWithSFTPServer):
+class SFTPLockTests(TestCaseWithSFTPServer):
 
     def test_sftp_locks(self):
         from bzrlib.errors import LockError
@@ -459,3 +462,22 @@
         self.assertAlmostEqual(t2 - t1, 100 + 7)
 
 
+class Test_SFTPReadvHelper(tests.TestCase):
+
+    def assertGetRequests(self, expected_requests, offsets):
+        helper = _mod_sftp._SFTPReadvHelper(offsets, 'artificial_test')
+        self.assertEqual(expected_requests, helper._get_requests())
+
+    def test__get_requests(self):
+        # Small single requests become a single readv request
+        self.assertGetRequests([(0, 100)], [(0, 20), (30, 50), (20, 10),
+                                            (80, 20)])
+        # Non-contiguous ranges are given as multiple requests
+        self.assertGetRequests([(0, 20), (30, 50)],
+                               [(10, 10), (30, 20), (0, 10), (50, 30)])
+        # Ranges larger than _max_request_size (32kB) are broken up into
+        # multiple requests, even if it actually spans multiple logical
+        # requests
+        self.assertGetRequests([(0, 32768), (32768, 32768), (65536, 464)],
+                               [(0, 40000), (40000, 100), (40100, 1900),
+                                (42000, 24000)])

=== modified file 'bzrlib/transport/sftp.py'
--- a/bzrlib/transport/sftp.py	2008-09-03 22:03:10 +0000
+++ b/bzrlib/transport/sftp.py	2008-09-03 23:26:35 +0000
@@ -25,6 +25,7 @@
 # these methods when we officially drop support for those formats.
 
 import errno
+import itertools
 import os
 import random
 import select
@@ -131,6 +132,154 @@
             pass
 
 
+class _SFTPReadvHelper(object):
+    """A class to help with managing the state of a readv request."""
+
+    # See _get_requests for an explanation.
+    _max_request_size = 32768
+
+    def __init__(self, original_offsets, relpath):
+        """Create a new readv helper.
+
+        :param original_offsets: The original requests given by the caller of
+            readv()
+        :param relpath: The name of the file (if known)
+        """
+        self.original_offsets = list(original_offsets)
+        self.relpath = relpath
+
+    def _get_requests(self):
+        """Break up the offsets into individual requests over sftp.
+
+        The SFTP spec only requires implementers to support 32kB requests. We
+        could try something larger (openssh supports 64kB), but then we have to
+        handle requests that fail.
+        So instead, we just break up our maximum chunks into 32kB chunks, and
+        asyncronously requests them.
+        Newer versions of paramiko would do the chunking for us, but we want to
+        start processing results right away, so we do it ourselves.
+        """
+        # TODO: Because we issue async requests, we don't 'fudge' any extra
+        #       data.  I'm not 100% sure that is the best choice.
+
+        # The first thing we do, is to collapse the individual requests as much
+        # as possible, so we don't issues requests <32kB
+        sorted_offsets = sorted(self.original_offsets)
+        coalesced = list(ConnectedTransport._coalesce_offsets(sorted_offsets,
+                                                        limit=0, fudge_factor=0))
+        requests = []
+        for c_offset in coalesced:
+            start = c_offset.start
+            size = c_offset.length
+
+            # Break this up into 32kB requests
+            while size > 0:
+                next_size = min(size, self._max_request_size)
+                requests.append((start, next_size))
+                size -= next_size
+                start += next_size
+        mutter('SFTP.readv() %s offsets => %s coalesced => %s requests',
+                len(sorted_offsets), len(coalesced), len(requests))
+        return requests
+
+    def request_and_yield_offsets(self, fp):
+        """Request the data from the remote machine, yielding the results.
+
+        :param fp: A Paramiko SFTPFile object that supports readv.
+        :return: Yield the data requested by the original readv caller, one by
+            one.
+        """
+        import pdb; pdb.set_trace()
+        requests = self._get_requests()
+        offset_iter = iter(self.original_offsets)
+        cur_offset, cur_size = offset_iter.next()
+        # paramiko .readv() yields strings that are in the order of the requests
+        # So we track the current request to know where the next data is
+        # being returned from.
+        cur_start = None
+        last_end = None
+        buffered_data = []
+        buffered_len = 0
+
+        # This is used to buffer chunks which we couldn't process yet
+        # It is (start, end, data) tuples.
+        data_chunks = []
+        # Create an 'unlimited' data stream, so we stop based on requests,
+        # rather than just because the data stream ended. This lets us detect
+        # short readv.
+        data_stream = itertools.chain(fp.readv(requests),
+                                      itertools.repeat(None))
+        for request, data in itertools.izip(requests, data_stream):
+            if data is None:
+                if cur_coalesced is not None:
+                    raise errors.ShortReadvError(self.relpath,
+                        cur_start, 0, len(data))
+            start, length = request
+            assert len(data) == length
+            if last_end is None:
+                # This is the first request, just buffer it
+                buffered_data.append(data)
+                buffered_len = length
+                cur_start = start
+            elif start == last_end:
+                # The data we are reading fits neatly on the previous
+                # buffer, so this is all part of a larger coalesced range.
+                buffered_data.append(data)
+                buffered_len += length
+                last_end = start + length
+            else:
+                # We have an 'interrupt' in the data stream. So we know we are
+                # at a request boundary.
+                if buffered_len > 0:
+                    # We haven't consumed the buffer so far, so put it into
+                    # data_chunks, and continue.
+                    buffered = ''.join(buffered_data)
+                    assert len(buffered) == buffered_len
+                    data_chunks.append((cur_start, cur_start + buffered_len,
+                                        buffered))
+                cur_start = start
+                last_end = start + length
+                buffered_data = [data]
+                buffered_len = data
+            if cur_start == cur_offset and cur_size <= buffered_len:
+                # Simplify the next steps a bit by transforming buffered_data
+                # into a single string
+                buffered = ''.join(buffered_data)
+                buffered_offset = 0
+                while (cur_start == cur_offset
+                       and (buffered_offset + cur_size) <= buffered_len):
+                    # We've buffered enough data to process this request, spit it
+                    # out
+                    cur_data = buffered[buffered_offset:buffered_offset + cur_size]
+                    # move the direct pointer into our buffered data
+                    buffered_offset += cur_size
+                    # Move the start-of-buffer pointer
+                    cur_start += cur_size
+                    # Yield the requested data
+                    yield cur_offset, cur_data
+                    cur_offset, cur_size = offset_iter.next()
+                # at this point, we've consumed as much of buffered as we can,
+                # so break off the portion that we consumed
+                if buffered_offset == len(buffered_data):
+                    # No tail to leave behind
+                    buffered_data = []
+                    buffered_len = 0
+                else:
+                    buffered = buffered[buffered_offset:]
+                    buffered_data = [buffered]
+                    buffered_len = len(buffered)
+        if buffered_len:
+            buffered = ''.join(buffered_data)
+            data_chunks.append((cur_start, cur_start + buffered_len,
+                                buffered))
+        if data_chunks:
+            # We've processed all the readv data, at this point, anything we
+            # couldn't process is in data_chunks. This doesn't happen often, so
+            # this code path isn't optimized
+            assert sorted(data_chunks) == data_chunks
+            raise AssertionError('not actually implemented yet')
+
+
 class SFTPTransport(ConnectedTransport):
     """Transport implementation for SFTP access."""
 
@@ -265,102 +414,8 @@
         does not support ranges > 64K, so it caps the request size, and
         just reads until it gets all the stuff it wants
         """
-        offsets = list(offsets)
-        sorted_offsets = sorted(offsets)
-
-        # The algorithm works as follows:
-        # 1) Coalesce nearby reads into a single chunk
-        #    This generates a list of combined regions, the total size
-        #    and the size of the sub regions. This coalescing step is limited
-        #    in the number of nearby chunks to combine, and is allowed to
-        #    skip small breaks in the requests. Limiting it makes sure that
-        #    we can start yielding some data earlier, and skipping means we
-        #    make fewer requests. (Beneficial even when using async)
-        # 2) Break up this combined regions into chunks that are smaller
-        #    than 64KiB. Technically the limit is 65536, but we are a
-        #    little bit conservative. This is because sftp has a maximum
-        #    return chunk size of 64KiB (max size of an unsigned short)
-        # 3) Issue a readv() to paramiko to create an async request for
-        #    all of this data
-        # 4) Read in the data as it comes back, until we've read one
-        #    continuous section as determined in step 1
-        # 5) Break up the full sections into hunks for the original requested
-        #    offsets. And put them in a cache
-        # 6) Check if the next request is in the cache, and if it is, remove
-        #    it from the cache, and yield its data. Continue until no more
-        #    entries are in the cache.
-        # 7) loop back to step 4 until all data has been read
-        #
-        # TODO: jam 20060725 This could be optimized one step further, by
-        #       attempting to yield whatever data we have read, even before
-        #       the first coallesced section has been fully processed.
-
-        # When coalescing for use with readv(), we don't really need to
-        # use any fudge factor, because the requests are made asynchronously
-        coalesced = list(self._coalesce_offsets(sorted_offsets,
-                               limit=self._max_readv_combine,
-                               fudge_factor=0,
-                               ))
-        requests = []
-        for c_offset in coalesced:
-            start = c_offset.start
-            size = c_offset.length
-
-            # We need to break this up into multiple requests
-            while size > 0:
-                next_size = min(size, self._max_request_size)
-                requests.append((start, next_size))
-                size -= next_size
-                start += next_size
-
-        mutter('SFTP.readv() %s offsets => %s coalesced => %s requests',
-                len(offsets), len(coalesced), len(requests))
-
-        # Queue the current read until we have read the full coalesced section
-        cur_data = []
-        cur_data_len = 0
-        cur_coalesced_stack = iter(coalesced)
-        cur_coalesced = cur_coalesced_stack.next()
-
-        # Cache the results, but only until they have been fulfilled
-        data_map = {}
-        # turn the list of offsets into a stack
-        offset_stack = iter(offsets)
-        cur_offset_and_size = offset_stack.next()
-
-        for data in fp.readv(requests):
-            cur_data += data
-            cur_data_len += len(data)
-
-            if cur_data_len < cur_coalesced.length:
-                continue
-            if cur_data_len != cur_coalesced.length:
-                raise AssertionError(
-                    "Somehow we read too much: %s != %s" 
-                    % (cur_data_len, cur_coalesced.length))
-            all_data = ''.join(cur_data)
-            cur_data = []
-            cur_data_len = 0
-
-            for suboffset, subsize in cur_coalesced.ranges:
-                key = (cur_coalesced.start+suboffset, subsize)
-                data_map[key] = all_data[suboffset:suboffset+subsize]
-
-            # Now that we've read some data, see if we can yield anything back
-            while cur_offset_and_size in data_map:
-                this_data = data_map.pop(cur_offset_and_size)
-                yield cur_offset_and_size[0], this_data
-                cur_offset_and_size = offset_stack.next()
-
-            # We read a coalesced entry, so mark it as done
-            cur_coalesced = None
-            # Now that we've read all of the data for this coalesced section
-            # on to the next
-            cur_coalesced = cur_coalesced_stack.next()
-
-        if cur_coalesced is not None:
-            raise errors.ShortReadvError(relpath, cur_coalesced.start,
-                cur_coalesced.length, len(data))
+        helper = _SFTPReadvHelper(offsets, relpath)
+        return helper.request_and_yield_offsets(fp)
 
     def put_file(self, relpath, f, mode=None):
         """



More information about the bazaar-commits mailing list