Rev 3121: Change the sftp_readv loop to buffer even less. in http://bzr.arbash-meinel.com/branches/bzr/1.1-dev/sftp_chunked

John Arbash Meinel john at arbash-meinel.com
Mon Dec 17 16:57:10 GMT 2007


At http://bzr.arbash-meinel.com/branches/bzr/1.1-dev/sftp_chunked

------------------------------------------------------------
revno: 3121
revision-id:john at arbash-meinel.com-20071217165633-unoib2xwcy3moixw
parent: pqm at pqm.ubuntu.com-20071217060447-sictlq5nibqhpuec
committer: John Arbash Meinel <john at arbash-meinel.com>
branch nick: sftp_chunked
timestamp: Mon 2007-12-17 10:56:33 -0600
message:
  Change the sftp_readv loop to buffer even less.
  Instead of waiting until we have a whole collapsed range, start trying to
  return data as soon as any data arrives.
  Also, there is a common case of having a request be made in
  sorted order. Which means that we don't need to buffer much at all.
modified:
  bzrlib/errors.py               errors.py-20050309040759-20512168c4e14fbd
  bzrlib/transport/__init__.py   transport.py-20050711165921-4978aa7ce1285ad5
  bzrlib/transport/sftp.py       sftp.py-20051019050329-ab48ce71b7e32dfe
-------------- next part --------------
=== modified file 'bzrlib/errors.py'
--- a/bzrlib/errors.py	2007-12-15 16:04:18 +0000
+++ b/bzrlib/errors.py	2007-12-17 16:56:33 +0000
@@ -570,6 +570,16 @@
         self.actual = actual
 
 
+class OverlappingReadv(BzrError):
+    """Raised when a readv() requests overlapping chunks of data.
+
+    Not all transports supports this, so the api should generally forbid it.
+    (It isn't a feature we need anyway.
+    """
+
+    _fmt = 'Requested readv ranges overlap'
+
+
 class PathNotChild(PathError):
 
     _fmt = 'Path "%(path)s" is not a child of path "%(base)s"%(extra)s'

=== modified file 'bzrlib/transport/__init__.py'
--- a/bzrlib/transport/__init__.py	2007-12-08 23:15:18 +0000
+++ b/bzrlib/transport/__init__.py	2007-12-17 16:56:33 +0000
@@ -792,7 +792,8 @@
         return offsets
 
     @staticmethod
-    def _coalesce_offsets(offsets, limit=0, fudge_factor=0, max_size=0):
+    def _coalesce_offsets(offsets, limit=0, fudge_factor=0, max_size=0,
+                          allow_overlap=False):
         """Yield coalesced offsets.
 
         With a long list of neighboring requests, combine them
@@ -801,27 +802,26 @@
         Turns  [(15, 10), (25, 10)] => [(15, 20, [(0, 10), (10, 10)])]
 
         :param offsets: A list of (start, length) pairs
-
         :param limit: Only combine a maximum of this many pairs Some transports
                 penalize multiple reads more than others, and sometimes it is
                 better to return early.
                 0 means no limit
-
         :param fudge_factor: All transports have some level of 'it is
-                better to read some more data and throw it away rather 
+                better to read some more data and throw it away rather
                 than seek', so collapse if we are 'close enough'
-
         :param max_size: Create coalesced offsets no bigger than this size.
                 When a single offset is bigger than 'max_size', it will keep
                 its size and be alone in the coalesced offset.
                 0 means no maximum size.
-
-        :return: yield _CoalescedOffset objects, which have members for where
-                to start, how much to read, and how to split those 
-                chunks back up
+        :param allow_overlap: If False, raise an error if requested ranges
+            overlap.
+        :return: return a list of _CoalescedOffset objects, which have members
+            for where to start, how much to read, and how to split those chunks
+            back up
         """
         last_end = None
         cur = _CoalescedOffset(None, None, [])
+        coalesced_offsets = []
 
         for start, size in offsets:
             end = start + size
@@ -830,18 +830,19 @@
                 and start >= cur.start
                 and (limit <= 0 or len(cur.ranges) < limit)
                 and (max_size <= 0 or end - cur.start <= max_size)):
+                if not allow_overlap and start < last_end:
+                    raise errors.OverlappingReadv()
                 cur.length = end - cur.start
                 cur.ranges.append((start-cur.start, size))
             else:
                 if cur.start is not None:
-                    yield cur
+                    coalesced_offsets.append(cur)
                 cur = _CoalescedOffset(start, size, [(0, size)])
             last_end = end
 
         if cur.start is not None:
-            yield cur
-
-        return
+            coalesced_offsets.append(cur)
+        return coalesced_offsets
 
     def get_multi(self, relpaths, pb=None):
         """Get a list of file-like objects, one for each entry in relpaths.

=== modified file 'bzrlib/transport/sftp.py'
--- a/bzrlib/transport/sftp.py	2007-10-24 18:19:51 +0000
+++ b/bzrlib/transport/sftp.py	2007-12-17 16:56:33 +0000
@@ -328,34 +328,73 @@
         mutter('SFTP.readv() %s offsets => %s coalesced => %s requests',
                 len(offsets), len(coalesced), len(requests))
 
-        # Queue the current read until we have read the full coalesced section
-        cur_data = []
-        cur_data_len = 0
+        # We have several layers of indirection, so lets try to list them all.
+        # 1) The requested offsets from the user, which are in (start, length)
+        #    pairs starting at the beginning of the file.
+        # 2) CoalescedOffsets which combine those absolute locations, and give
+        #    an overall start and length in absolute terms, and then a list of
+        #    ranges offset by CoalescedOffsets.start.
+        # 3) For large Coalesced ranges, we further split them up into 32KB
+        #    chunks (sftp protocol minimum supported size).
+        # 4) As we read data back from the requests keep track of where we are
+        #    and return any data that we can.
+        # Keep track of the currently coalesced data
+        buffered_data = ''
+        buffered_start = 0
         cur_coalesced_stack = iter(coalesced)
         cur_coalesced = cur_coalesced_stack.next()
+        buffered_start = cur_coalesced.start
 
-        # Cache the results, but only until they have been fulfilled
+        # Cache the results, but only until they have been fulfilled, this
+        # mostly just stores the out-of-order data
         data_map = {}
         # turn the list of offsets into a stack
         offset_stack = iter(offsets)
         cur_offset_and_size = offset_stack.next()
 
         for data in fp.readv(requests):
-            cur_data += data
-            cur_data_len += len(data)
-
-            if cur_data_len < cur_coalesced.length:
-                continue
-            assert cur_data_len == cur_coalesced.length, \
-                "Somehow we read too much: %s != %s" % (cur_data_len,
-                                                        cur_coalesced.length)
-            all_data = ''.join(cur_data)
-            cur_data = []
-            cur_data_len = 0
-
+            # TODO: jam 20071217 We could buffer into a list, and then handle
+            #       the sub-chunks, rather than buffering into a string which
+            #       needs to be repeatedly memory allocated. However, it makes
+            #       the processing code a lot more complex (you have to build
+            #       another list with possibly parts of these buffered
+            #       sections, etc)
+            buffered_data += data
+            buffered_end = buffered_start + len(buffered_data)
+
+            # Check to see if this is enough data to satisfy some of the
+            # coalesced subranges
+
+            new_ranges = []
             for suboffset, subsize in cur_coalesced.ranges:
-                key = (cur_coalesced.start+suboffset, subsize)
-                data_map[key] = all_data[suboffset:suboffset+subsize]
+                start = cur_coalesced.start + suboffset
+                end = start + subsize
+                if buffered_start > start:
+                    raise AssertionError('We messed up. The data we are'
+                                         ' reading is starting after the'
+                                         ' point of the sub offset')
+                if (buffered_start < start
+                    or buffered_end < end):
+                    # Either we haven't started reading, or we haven't finished
+                    new_ranges.append((suboffset, subsize))
+                    continue
+
+                # We have read enough data, collect it into a single string
+                buf_start = start - buffered_start
+                buf_end = buf_start + subsize
+                data = buffered_data[buf_start:buf_end]
+
+                # Because this data won't be repeated, shrink the buffer
+                buffered_data = buffered_data[buf_end:]
+                buffered_start += buf_end
+
+                # Is this exactly the next requested data chunk?
+                key = (start, subsize)
+                if key == cur_offset_and_size:
+                    yield start, data
+                    cur_offset_and_size = offset_stack.next()
+                else:
+                    data_map[key] = data
 
             # Now that we've read some data, see if we can yield anything back
             while cur_offset_and_size in data_map:
@@ -363,11 +402,15 @@
                 yield cur_offset_and_size[0], this_data
                 cur_offset_and_size = offset_stack.next()
 
-            # We read a coalesced entry, so mark it as done
-            cur_coalesced = None
-            # Now that we've read all of the data for this coalesced section
-            # on to the next
-            cur_coalesced = cur_coalesced_stack.next()
+            if buffered_end == cur_coalesced.start + cur_coalesced.length:
+                # We reached the end of this section, go on to the next
+                try:
+                    cur_coalesced = cur_coalesced_stack.next()
+                except StopIteration:
+                    cur_coalesced = None
+            elif buffered_end >= cur_coalesced.start + cur_coalesced.length:
+                raise AssertionError('Somehow we read too much data: %s > %s'
+                    % (buffered_end, cur_coalesced.start + cur_coalesced.length))
 
         if cur_coalesced is not None:
             raise errors.ShortReadvError(relpath, cur_coalesced.start,



More information about the bazaar-commits mailing list