Rev 3688: Start moving the readv code into a helper. in http://bzr.arbash-meinel.com/branches/bzr/1.7-dev/sftp_chunked
John Arbash Meinel
john at arbash-meinel.com
Thu Sep 4 00:26:38 BST 2008
At http://bzr.arbash-meinel.com/branches/bzr/1.7-dev/sftp_chunked
------------------------------------------------------------
revno: 3688
revision-id: john at arbash-meinel.com-20080903232635-8009dsm2fai5nz3c
parent: john at arbash-meinel.com-20080903220310-1uwt7qt5p1istebv
committer: John Arbash Meinel <john at arbash-meinel.com>
branch nick: sftp_chunked
timestamp: Wed 2008-09-03 18:26:35 -0500
message:
Start moving the readv code into a helper.
We will optimize the helper for things being in order, because
that is a very common case with packs. (90% of the time they
don't care about data order, so they can always stream in
file order.)
-------------- next part --------------
=== modified file 'bzrlib/tests/test_sftp_transport.py'
--- a/bzrlib/tests/test_sftp_transport.py 2007-12-11 14:26:18 +0000
+++ b/bzrlib/tests/test_sftp_transport.py 2008-09-03 23:26:35 +0000
@@ -30,6 +30,8 @@
from bzrlib import (
bzrdir,
errors,
+ tests,
+ transport as _mod_transport,
)
from bzrlib.osutils import (
pathjoin,
@@ -46,6 +48,7 @@
import bzrlib.transport.http
if paramiko_loaded:
+ from bzrlib.transport import sftp as _mod_sftp
from bzrlib.transport.sftp import (
SFTPAbsoluteServer,
SFTPHomeDirServer,
@@ -76,7 +79,7 @@
set_test_transport_to_sftp(self)
-class SFTPLockTests (TestCaseWithSFTPServer):
+class SFTPLockTests(TestCaseWithSFTPServer):
def test_sftp_locks(self):
from bzrlib.errors import LockError
@@ -459,3 +462,22 @@
self.assertAlmostEqual(t2 - t1, 100 + 7)
+class Test_SFTPReadvHelper(tests.TestCase):
+
+ def assertGetRequests(self, expected_requests, offsets):
+ helper = _mod_sftp._SFTPReadvHelper(offsets, 'artificial_test')
+ self.assertEqual(expected_requests, helper._get_requests())
+
+ def test__get_requests(self):
+ # Small single requests become a single readv request
+ self.assertGetRequests([(0, 100)], [(0, 20), (30, 50), (20, 10),
+ (80, 20)])
+ # Non-contiguous ranges are given as multiple requests
+ self.assertGetRequests([(0, 20), (30, 50)],
+ [(10, 10), (30, 20), (0, 10), (50, 30)])
+ # Ranges larger than _max_request_size (32kB) are broken up into
+ # multiple requests, even if it actually spans multiple logical
+ # requests
+ self.assertGetRequests([(0, 32768), (32768, 32768), (65536, 464)],
+ [(0, 40000), (40000, 100), (40100, 1900),
+ (42000, 24000)])
=== modified file 'bzrlib/transport/sftp.py'
--- a/bzrlib/transport/sftp.py 2008-09-03 22:03:10 +0000
+++ b/bzrlib/transport/sftp.py 2008-09-03 23:26:35 +0000
@@ -25,6 +25,7 @@
# these methods when we officially drop support for those formats.
import errno
+import itertools
import os
import random
import select
@@ -131,6 +132,154 @@
pass
+class _SFTPReadvHelper(object):
+ """A class to help with managing the state of a readv request."""
+
+ # See _get_requests for an explanation.
+ _max_request_size = 32768
+
+ def __init__(self, original_offsets, relpath):
+ """Create a new readv helper.
+
+ :param original_offsets: The original requests given by the caller of
+ readv()
+ :param relpath: The name of the file (if known)
+ """
+ self.original_offsets = list(original_offsets)
+ self.relpath = relpath
+
+ def _get_requests(self):
+ """Break up the offsets into individual requests over sftp.
+
+ The SFTP spec only requires implementers to support 32kB requests. We
+ could try something larger (openssh supports 64kB), but then we have to
+ handle requests that fail.
+ So instead, we just break up our maximum chunks into 32kB chunks, and
+ asyncronously requests them.
+ Newer versions of paramiko would do the chunking for us, but we want to
+ start processing results right away, so we do it ourselves.
+ """
+ # TODO: Because we issue async requests, we don't 'fudge' any extra
+ # data. I'm not 100% sure that is the best choice.
+
+ # The first thing we do, is to collapse the individual requests as much
+ # as possible, so we don't issues requests <32kB
+ sorted_offsets = sorted(self.original_offsets)
+ coalesced = list(ConnectedTransport._coalesce_offsets(sorted_offsets,
+ limit=0, fudge_factor=0))
+ requests = []
+ for c_offset in coalesced:
+ start = c_offset.start
+ size = c_offset.length
+
+ # Break this up into 32kB requests
+ while size > 0:
+ next_size = min(size, self._max_request_size)
+ requests.append((start, next_size))
+ size -= next_size
+ start += next_size
+ mutter('SFTP.readv() %s offsets => %s coalesced => %s requests',
+ len(sorted_offsets), len(coalesced), len(requests))
+ return requests
+
+ def request_and_yield_offsets(self, fp):
+ """Request the data from the remote machine, yielding the results.
+
+ :param fp: A Paramiko SFTPFile object that supports readv.
+ :return: Yield the data requested by the original readv caller, one by
+ one.
+ """
+ import pdb; pdb.set_trace()
+ requests = self._get_requests()
+ offset_iter = iter(self.original_offsets)
+ cur_offset, cur_size = offset_iter.next()
+ # paramiko .readv() yields strings that are in the order of the requests
+ # So we track the current request to know where the next data is
+ # being returned from.
+ cur_start = None
+ last_end = None
+ buffered_data = []
+ buffered_len = 0
+
+ # This is used to buffer chunks which we couldn't process yet
+ # It is (start, end, data) tuples.
+ data_chunks = []
+ # Create an 'unlimited' data stream, so we stop based on requests,
+ # rather than just because the data stream ended. This lets us detect
+ # short readv.
+ data_stream = itertools.chain(fp.readv(requests),
+ itertools.repeat(None))
+ for request, data in itertools.izip(requests, data_stream):
+ if data is None:
+ if cur_coalesced is not None:
+ raise errors.ShortReadvError(self.relpath,
+ cur_start, 0, len(data))
+ start, length = request
+ assert len(data) == length
+ if last_end is None:
+ # This is the first request, just buffer it
+ buffered_data.append(data)
+ buffered_len = length
+ cur_start = start
+ elif start == last_end:
+ # The data we are reading fits neatly on the previous
+ # buffer, so this is all part of a larger coalesced range.
+ buffered_data.append(data)
+ buffered_len += length
+ last_end = start + length
+ else:
+ # We have an 'interrupt' in the data stream. So we know we are
+ # at a request boundary.
+ if buffered_len > 0:
+ # We haven't consumed the buffer so far, so put it into
+ # data_chunks, and continue.
+ buffered = ''.join(buffered_data)
+ assert len(buffered) == buffered_len
+ data_chunks.append((cur_start, cur_start + buffered_len,
+ buffered))
+ cur_start = start
+ last_end = start + length
+ buffered_data = [data]
+ buffered_len = data
+ if cur_start == cur_offset and cur_size <= buffered_len:
+ # Simplify the next steps a bit by transforming buffered_data
+ # into a single string
+ buffered = ''.join(buffered_data)
+ buffered_offset = 0
+ while (cur_start == cur_offset
+ and (buffered_offset + cur_size) <= buffered_len):
+ # We've buffered enough data to process this request, spit it
+ # out
+ cur_data = buffered[buffered_offset:buffered_offset + cur_size]
+ # move the direct pointer into our buffered data
+ buffered_offset += cur_size
+ # Move the start-of-buffer pointer
+ cur_start += cur_size
+ # Yield the requested data
+ yield cur_offset, cur_data
+ cur_offset, cur_size = offset_iter.next()
+ # at this point, we've consumed as much of buffered as we can,
+ # so break off the portion that we consumed
+ if buffered_offset == len(buffered_data):
+ # No tail to leave behind
+ buffered_data = []
+ buffered_len = 0
+ else:
+ buffered = buffered[buffered_offset:]
+ buffered_data = [buffered]
+ buffered_len = len(buffered)
+ if buffered_len:
+ buffered = ''.join(buffered_data)
+ data_chunks.append((cur_start, cur_start + buffered_len,
+ buffered))
+ if data_chunks:
+ # We've processed all the readv data, at this point, anything we
+ # couldn't process is in data_chunks. This doesn't happen often, so
+ # this code path isn't optimized
+ assert sorted(data_chunks) == data_chunks
+ raise AssertionError('not actually implemented yet')
+
+
class SFTPTransport(ConnectedTransport):
"""Transport implementation for SFTP access."""
@@ -265,102 +414,8 @@
does not support ranges > 64K, so it caps the request size, and
just reads until it gets all the stuff it wants
"""
- offsets = list(offsets)
- sorted_offsets = sorted(offsets)
-
- # The algorithm works as follows:
- # 1) Coalesce nearby reads into a single chunk
- # This generates a list of combined regions, the total size
- # and the size of the sub regions. This coalescing step is limited
- # in the number of nearby chunks to combine, and is allowed to
- # skip small breaks in the requests. Limiting it makes sure that
- # we can start yielding some data earlier, and skipping means we
- # make fewer requests. (Beneficial even when using async)
- # 2) Break up this combined regions into chunks that are smaller
- # than 64KiB. Technically the limit is 65536, but we are a
- # little bit conservative. This is because sftp has a maximum
- # return chunk size of 64KiB (max size of an unsigned short)
- # 3) Issue a readv() to paramiko to create an async request for
- # all of this data
- # 4) Read in the data as it comes back, until we've read one
- # continuous section as determined in step 1
- # 5) Break up the full sections into hunks for the original requested
- # offsets. And put them in a cache
- # 6) Check if the next request is in the cache, and if it is, remove
- # it from the cache, and yield its data. Continue until no more
- # entries are in the cache.
- # 7) loop back to step 4 until all data has been read
- #
- # TODO: jam 20060725 This could be optimized one step further, by
- # attempting to yield whatever data we have read, even before
- # the first coallesced section has been fully processed.
-
- # When coalescing for use with readv(), we don't really need to
- # use any fudge factor, because the requests are made asynchronously
- coalesced = list(self._coalesce_offsets(sorted_offsets,
- limit=self._max_readv_combine,
- fudge_factor=0,
- ))
- requests = []
- for c_offset in coalesced:
- start = c_offset.start
- size = c_offset.length
-
- # We need to break this up into multiple requests
- while size > 0:
- next_size = min(size, self._max_request_size)
- requests.append((start, next_size))
- size -= next_size
- start += next_size
-
- mutter('SFTP.readv() %s offsets => %s coalesced => %s requests',
- len(offsets), len(coalesced), len(requests))
-
- # Queue the current read until we have read the full coalesced section
- cur_data = []
- cur_data_len = 0
- cur_coalesced_stack = iter(coalesced)
- cur_coalesced = cur_coalesced_stack.next()
-
- # Cache the results, but only until they have been fulfilled
- data_map = {}
- # turn the list of offsets into a stack
- offset_stack = iter(offsets)
- cur_offset_and_size = offset_stack.next()
-
- for data in fp.readv(requests):
- cur_data += data
- cur_data_len += len(data)
-
- if cur_data_len < cur_coalesced.length:
- continue
- if cur_data_len != cur_coalesced.length:
- raise AssertionError(
- "Somehow we read too much: %s != %s"
- % (cur_data_len, cur_coalesced.length))
- all_data = ''.join(cur_data)
- cur_data = []
- cur_data_len = 0
-
- for suboffset, subsize in cur_coalesced.ranges:
- key = (cur_coalesced.start+suboffset, subsize)
- data_map[key] = all_data[suboffset:suboffset+subsize]
-
- # Now that we've read some data, see if we can yield anything back
- while cur_offset_and_size in data_map:
- this_data = data_map.pop(cur_offset_and_size)
- yield cur_offset_and_size[0], this_data
- cur_offset_and_size = offset_stack.next()
-
- # We read a coalesced entry, so mark it as done
- cur_coalesced = None
- # Now that we've read all of the data for this coalesced section
- # on to the next
- cur_coalesced = cur_coalesced_stack.next()
-
- if cur_coalesced is not None:
- raise errors.ShortReadvError(relpath, cur_coalesced.start,
- cur_coalesced.length, len(data))
+ helper = _SFTPReadvHelper(offsets, relpath)
+ return helper.request_and_yield_offsets(fp)
def put_file(self, relpath, f, mode=None):
"""
More information about the bazaar-commits
mailing list