Rev 2829: Teach NewPack how to buffer for pack operations. in http://people.ubuntu.com/~robertc/baz2.0/repository

Robert Collins robertc at robertcollins.net
Tue Oct 16 06:41:33 BST 2007


At http://people.ubuntu.com/~robertc/baz2.0/repository

------------------------------------------------------------
revno: 2829
revision-id: robertc at robertcollins.net-20071016054124-clp9nztnaiydmxoa
parent: robertc at robertcollins.net-20071016045856-w4b1pjj0a6fp8itd
committer: Robert Collins <robertc at robertcollins.net>
branch nick: repository
timestamp: Tue 2007-10-16 15:41:24 +1000
message:
  Teach NewPack how to buffer for pack operations.
modified:
  bzrlib/repofmt/pack_repo.py    pack_repo.py-20070813041115-gjv5ma7ktfqwsjgn-1
=== modified file 'bzrlib/repofmt/pack_repo.py'
--- a/bzrlib/repofmt/pack_repo.py	2007-10-16 04:58:56 +0000
+++ b/bzrlib/repofmt/pack_repo.py	2007-10-16 05:41:24 +0000
@@ -221,6 +221,11 @@
         # a four-tuple with the length in bytes of the indices, once the pack
         # is finalised. (rev, inv, text, sigs)
         self.index_sizes = None
+        # How much data to cache when writing packs. Note that this is not
+        # synchronised with reads, because its not in the transport layer, so
+        # is not safe unless the client knows it won't be reading from the pack
+        # under creation.
+        self._cache_limit = 0
         # the temporary pack file name.
         self.random_name = rand_chars(20) + upload_suffix
         # when was this pack started ?
@@ -232,11 +237,18 @@
             mutter('%s: create_pack: pack stream open: %s%s t+%6.3fs',
                 time.ctime(), self.upload_transport.base, self.random_name,
                 time.time() - self.start_time)
+        self._buffer = [[], 0]
         # create a callable for adding data
-        def _write_data(bytes, write=self.write_stream.write,
-                       update=self._hash.update):
-            write(bytes)
-            update(bytes)
+        def _write_data(bytes, flush=False, _buffer=self._buffer,
+            _write=self.write_stream.write, _update=self._hash.update):
+            _buffer[0].append(bytes)
+            _buffer[1] += len(bytes)
+            # 1MB buffer cap
+            if _buffer[1] > self._cache_limit or flush:
+                bytes = ''.join(_buffer[0])
+                _write(bytes)
+                _update(bytes)
+                _buffer[:] = [[], 0]
         # expose this on self, for the occasion when clients want to add data.
         self._write_data = _write_data
 
@@ -265,7 +277,10 @@
          - stores the index size tuple for the pack in the index_sizes
            attribute.
         """
+        if self._buffer[1]:
+            self._write_data('', flush=True)
         self.name = self._hash.hexdigest()
+        # write indices
         self.index_sizes = [None, None, None, None]
         self._write_index('revision', self.revision_index, 'revision')
         self._write_index('inventory', self.inventory_index, 'inventory')
@@ -289,6 +304,9 @@
         """Get the position in a index_size array for a given index type."""
         return NewPack.indices[index_type][1]
 
+    def set_write_cache_size(self, size):
+        self._cache_limit = size
+
     def _write_index(self, index_type, index, label):
         """Write out an index.
 
@@ -531,6 +549,9 @@
             return None
         new_pack = NewPack(self._upload_transport, self._index_transport,
             self._pack_transport, upload_suffix=suffix)
+        # buffer data - we won't be reading-back during the pack creation and
+        # this makes a significant difference on sftp pushes.
+        new_pack.set_write_cache_size(1024*1024)
         random_name = new_pack.random_name
         if 'fetch' in debug.debug_flags:
             plain_pack_list = ['%s%s' % (a_pack.pack_transport.base, a_pack.name)
@@ -543,14 +564,7 @@
                 '%s%s %s revisions wanted %s t=0',
                 time.ctime(), self._upload_transport.base, random_name,
                 plain_pack_list, rev_count)
-        buffer = []
-        def write_data(bytes, write=new_pack._write_data):
-            buffer.append(bytes)
-            if len(buffer) == 640:
-                bytes = ''.join(buffer)
-                write(bytes)
-                del buffer[:]
-        writer = pack.ContainerWriter(write_data)
+        writer = pack.ContainerWriter(new_pack._write_data)
         writer.begin()
         # select revisions
         if revision_ids:
@@ -640,16 +654,11 @@
                 time.ctime(), self._upload_transport.base, random_name,
                 new_pack.signature_index.key_count(),
                 time.time() - new_pack.start_time)
+        if not new_pack.data_inserted():
+            new_pack.abort()
+            return None
         # finish the pack
         writer.end()
-        # flush the write-cache if needed.
-        if len(buffer):
-            bytes = ''.join(buffer)
-            new_pack._write_data(bytes)
-        if not new_pack.data_inserted():
-            new_pack.abort()
-            return None
-        # write indices
         new_pack.finish()
         # add to the repository
         self.allocate(new_pack)



More information about the bazaar-commits mailing list