Rev 3121: Change the sftp_readv loop to buffer even less. in http://bzr.arbash-meinel.com/branches/bzr/1.1-dev/sftp_chunked
John Arbash Meinel
john at arbash-meinel.com
Mon Dec 17 16:57:10 GMT 2007
At http://bzr.arbash-meinel.com/branches/bzr/1.1-dev/sftp_chunked
------------------------------------------------------------
revno: 3121
revision-id:john at arbash-meinel.com-20071217165633-unoib2xwcy3moixw
parent: pqm at pqm.ubuntu.com-20071217060447-sictlq5nibqhpuec
committer: John Arbash Meinel <john at arbash-meinel.com>
branch nick: sftp_chunked
timestamp: Mon 2007-12-17 10:56:33 -0600
message:
Change the sftp_readv loop to buffer even less.
Instead of waiting until we have a whole collapsed range, start trying to
return data as soon as any data arrives.
Also, there is a common case of having a request be made in
sorted order. Which means that we don't need to buffer much at all.
modified:
bzrlib/errors.py errors.py-20050309040759-20512168c4e14fbd
bzrlib/transport/__init__.py transport.py-20050711165921-4978aa7ce1285ad5
bzrlib/transport/sftp.py sftp.py-20051019050329-ab48ce71b7e32dfe
-------------- next part --------------
=== modified file 'bzrlib/errors.py'
--- a/bzrlib/errors.py 2007-12-15 16:04:18 +0000
+++ b/bzrlib/errors.py 2007-12-17 16:56:33 +0000
@@ -570,6 +570,16 @@
self.actual = actual
+class OverlappingReadv(BzrError):
+ """Raised when a readv() requests overlapping chunks of data.
+
+ Not all transports supports this, so the api should generally forbid it.
+ (It isn't a feature we need anyway.
+ """
+
+ _fmt = 'Requested readv ranges overlap'
+
+
class PathNotChild(PathError):
_fmt = 'Path "%(path)s" is not a child of path "%(base)s"%(extra)s'
=== modified file 'bzrlib/transport/__init__.py'
--- a/bzrlib/transport/__init__.py 2007-12-08 23:15:18 +0000
+++ b/bzrlib/transport/__init__.py 2007-12-17 16:56:33 +0000
@@ -792,7 +792,8 @@
return offsets
@staticmethod
- def _coalesce_offsets(offsets, limit=0, fudge_factor=0, max_size=0):
+ def _coalesce_offsets(offsets, limit=0, fudge_factor=0, max_size=0,
+ allow_overlap=False):
"""Yield coalesced offsets.
With a long list of neighboring requests, combine them
@@ -801,27 +802,26 @@
Turns [(15, 10), (25, 10)] => [(15, 20, [(0, 10), (10, 10)])]
:param offsets: A list of (start, length) pairs
-
:param limit: Only combine a maximum of this many pairs Some transports
penalize multiple reads more than others, and sometimes it is
better to return early.
0 means no limit
-
:param fudge_factor: All transports have some level of 'it is
- better to read some more data and throw it away rather
+ better to read some more data and throw it away rather
than seek', so collapse if we are 'close enough'
-
:param max_size: Create coalesced offsets no bigger than this size.
When a single offset is bigger than 'max_size', it will keep
its size and be alone in the coalesced offset.
0 means no maximum size.
-
- :return: yield _CoalescedOffset objects, which have members for where
- to start, how much to read, and how to split those
- chunks back up
+ :param allow_overlap: If False, raise an error if requested ranges
+ overlap.
+ :return: return a list of _CoalescedOffset objects, which have members
+ for where to start, how much to read, and how to split those chunks
+ back up
"""
last_end = None
cur = _CoalescedOffset(None, None, [])
+ coalesced_offsets = []
for start, size in offsets:
end = start + size
@@ -830,18 +830,19 @@
and start >= cur.start
and (limit <= 0 or len(cur.ranges) < limit)
and (max_size <= 0 or end - cur.start <= max_size)):
+ if not allow_overlap and start < last_end:
+ raise errors.OverlappingReadv()
cur.length = end - cur.start
cur.ranges.append((start-cur.start, size))
else:
if cur.start is not None:
- yield cur
+ coalesced_offsets.append(cur)
cur = _CoalescedOffset(start, size, [(0, size)])
last_end = end
if cur.start is not None:
- yield cur
-
- return
+ coalesced_offsets.append(cur)
+ return coalesced_offsets
def get_multi(self, relpaths, pb=None):
"""Get a list of file-like objects, one for each entry in relpaths.
=== modified file 'bzrlib/transport/sftp.py'
--- a/bzrlib/transport/sftp.py 2007-10-24 18:19:51 +0000
+++ b/bzrlib/transport/sftp.py 2007-12-17 16:56:33 +0000
@@ -328,34 +328,73 @@
mutter('SFTP.readv() %s offsets => %s coalesced => %s requests',
len(offsets), len(coalesced), len(requests))
- # Queue the current read until we have read the full coalesced section
- cur_data = []
- cur_data_len = 0
+ # We have several layers of indirection, so lets try to list them all.
+ # 1) The requested offsets from the user, which are in (start, length)
+ # pairs starting at the beginning of the file.
+ # 2) CoalescedOffsets which combine those absolute locations, and give
+ # an overall start and length in absolute terms, and then a list of
+ # ranges offset by CoalescedOffsets.start.
+ # 3) For large Coalesced ranges, we further split them up into 32KB
+ # chunks (sftp protocol minimum supported size).
+ # 4) As we read data back from the requests keep track of where we are
+ # and return any data that we can.
+ # Keep track of the currently coalesced data
+ buffered_data = ''
+ buffered_start = 0
cur_coalesced_stack = iter(coalesced)
cur_coalesced = cur_coalesced_stack.next()
+ buffered_start = cur_coalesced.start
- # Cache the results, but only until they have been fulfilled
+ # Cache the results, but only until they have been fulfilled, this
+ # mostly just stores the out-of-order data
data_map = {}
# turn the list of offsets into a stack
offset_stack = iter(offsets)
cur_offset_and_size = offset_stack.next()
for data in fp.readv(requests):
- cur_data += data
- cur_data_len += len(data)
-
- if cur_data_len < cur_coalesced.length:
- continue
- assert cur_data_len == cur_coalesced.length, \
- "Somehow we read too much: %s != %s" % (cur_data_len,
- cur_coalesced.length)
- all_data = ''.join(cur_data)
- cur_data = []
- cur_data_len = 0
-
+ # TODO: jam 20071217 We could buffer into a list, and then handle
+ # the sub-chunks, rather than buffering into a string which
+ # needs to be repeatedly memory allocated. However, it makes
+ # the processing code a lot more complex (you have to build
+ # another list with possibly parts of these buffered
+ # sections, etc)
+ buffered_data += data
+ buffered_end = buffered_start + len(buffered_data)
+
+ # Check to see if this is enough data to satisfy some of the
+ # coalesced subranges
+
+ new_ranges = []
for suboffset, subsize in cur_coalesced.ranges:
- key = (cur_coalesced.start+suboffset, subsize)
- data_map[key] = all_data[suboffset:suboffset+subsize]
+ start = cur_coalesced.start + suboffset
+ end = start + subsize
+ if buffered_start > start:
+ raise AssertionError('We messed up. The data we are'
+ ' reading is starting after the'
+ ' point of the sub offset')
+ if (buffered_start < start
+ or buffered_end < end):
+ # Either we haven't started reading, or we haven't finished
+ new_ranges.append((suboffset, subsize))
+ continue
+
+ # We have read enough data, collect it into a single string
+ buf_start = start - buffered_start
+ buf_end = buf_start + subsize
+ data = buffered_data[buf_start:buf_end]
+
+ # Because this data won't be repeated, shrink the buffer
+ buffered_data = buffered_data[buf_end:]
+ buffered_start += buf_end
+
+ # Is this exactly the next requested data chunk?
+ key = (start, subsize)
+ if key == cur_offset_and_size:
+ yield start, data
+ cur_offset_and_size = offset_stack.next()
+ else:
+ data_map[key] = data
# Now that we've read some data, see if we can yield anything back
while cur_offset_and_size in data_map:
@@ -363,11 +402,15 @@
yield cur_offset_and_size[0], this_data
cur_offset_and_size = offset_stack.next()
- # We read a coalesced entry, so mark it as done
- cur_coalesced = None
- # Now that we've read all of the data for this coalesced section
- # on to the next
- cur_coalesced = cur_coalesced_stack.next()
+ if buffered_end == cur_coalesced.start + cur_coalesced.length:
+ # We reached the end of this section, go on to the next
+ try:
+ cur_coalesced = cur_coalesced_stack.next()
+ except StopIteration:
+ cur_coalesced = None
+ elif buffered_end >= cur_coalesced.start + cur_coalesced.length:
+ raise AssertionError('Somehow we read too much data: %s > %s'
+ % (buffered_end, cur_coalesced.start + cur_coalesced.length))
if cur_coalesced is not None:
raise errors.ShortReadvError(relpath, cur_coalesced.start,
More information about the bazaar-commits
mailing list