Rev 3897: Change _LazyGroupContentManager.get_record_stream() to invalidate the nodes. in http://bazaar.launchpad.net/%7Ebzr/bzr/brisbane-core

John Arbash Meinel john at arbash-meinel.com
Mon Mar 23 20:13:04 GMT 2009


At http://bazaar.launchpad.net/%7Ebzr/bzr/brisbane-core

------------------------------------------------------------
revno: 3897
revision-id: john at arbash-meinel.com-20090323201046-ek580vnq69i270lp
parent: john at arbash-meinel.com-20090321032222-n2wbqe0ozhhizwxm
parent: john at arbash-meinel.com-20090323200442-10qwt4ws636wwjwl
committer: John Arbash Meinel <john at arbash-meinel.com>
branch nick: brisbane-core
timestamp: Mon 2009-03-23 15:10:46 -0500
message:
  Change _LazyGroupContentManager.get_record_stream() to invalidate the nodes.
  This means that records are genuinely only valid until you go to the next one.
  This breaks the reference cycle with the manager, and drops peak memory by a decent amount.
  (as otherwise we have to wait until gc.collect() runs).
  Also, change get_bytes_as() so that it caches the bytes (though this cache is also removed
  when we go to the next object).
  This is mostly useful for CHK pages, which get processed at several layers. The
  get_record_stream() code uses it to determine what to send next, and the
  insert_record_stream() needs the raw bytes to put into the target repo.
  This saves an extract() on every chk page during 'pack'. apply_delta is rather fast,
  but there is no reason to waste calls.
modified:
  bzrlib/groupcompress.py        groupcompress.py-20080705181503-ccbxd6xuy1bdnrpu-8
  bzrlib/repofmt/groupcompress_repo.py repofmt.py-20080715094215-wp1qfvoo7093c8qr-1
    ------------------------------------------------------------
    revno: 3893.1.3
    revision-id: john at arbash-meinel.com-20090323200442-10qwt4ws636wwjwl
    parent: john at arbash-meinel.com-20090323032950-lmbrocu79l90dqn5
    committer: John Arbash Meinel <john at arbash-meinel.com>
    branch nick: refcycles
    timestamp: Mon 2009-03-23 15:04:42 -0500
    message:
      Cleanup, in preparation for merging to brisbane-core.
    modified:
      bzrlib/groupcompress.py        groupcompress.py-20080705181503-ccbxd6xuy1bdnrpu-8
      bzrlib/lru_cache.py            lru_cache.py-20070119165515-tlw203kuwh0id5gv-1
      bzrlib/repofmt/groupcompress_repo.py repofmt.py-20080715094215-wp1qfvoo7093c8qr-1
    ------------------------------------------------------------
    revno: 3893.1.2
    revision-id: john at arbash-meinel.com-20090323032950-lmbrocu79l90dqn5
    parent: john at arbash-meinel.com-20090320150205-kcmh70biyo76p0kn
    parent: john at arbash-meinel.com-20090321032222-n2wbqe0ozhhizwxm
    committer: John Arbash Meinel <john at arbash-meinel.com>
    branch nick: refcycles
    timestamp: Sun 2009-03-22 22:29:50 -0500
    message:
      Merge brisbane-core tip, resolve differences.
      Finish making various get_record_stream() calls clean up refcycles
      and memory consumption after yielding the record.
    modified:
      bzrlib/groupcompress.py        groupcompress.py-20080705181503-ccbxd6xuy1bdnrpu-8
      bzrlib/repofmt/groupcompress_repo.py repofmt.py-20080715094215-wp1qfvoo7093c8qr-1
      bzrlib/tests/test_groupcompress.py test_groupcompress.p-20080705181503-ccbxd6xuy1bdnrpu-13
    ------------------------------------------------------------
    revno: 3893.1.1
    revision-id: john at arbash-meinel.com-20090320150205-kcmh70biyo76p0kn
    parent: john at arbash-meinel.com-20090320032107-bm9wg421rtcacy5i
    committer: John Arbash Meinel <john at arbash-meinel.com>
    branch nick: refcycles
    timestamp: Fri 2009-03-20 10:02:05 -0500
    message:
      Some testing to see if we can decrease the peak memory consumption a bit.
      It looks like we can, just need some more perf, etc.
    modified:
      bzrlib/groupcompress.py        groupcompress.py-20080705181503-ccbxd6xuy1bdnrpu-8
      bzrlib/lru_cache.py            lru_cache.py-20070119165515-tlw203kuwh0id5gv-1
      bzrlib/repofmt/groupcompress_repo.py repofmt.py-20080715094215-wp1qfvoo7093c8qr-1
-------------- next part --------------
=== modified file 'bzrlib/groupcompress.py'
--- a/bzrlib/groupcompress.py	2009-03-21 03:22:22 +0000
+++ b/bzrlib/groupcompress.py	2009-03-23 20:04:42 +0000
@@ -339,8 +339,6 @@
         :param sha1: TODO (should we validate only when sha1 is supplied?)
         :return: The bytes for the content
         """
-        # Handle the 'Empty Content' record, even if we don't always write it
-        # yet.
         if start == end == 0:
             return ''
         self._ensure_content(end)
@@ -477,6 +475,7 @@
         #       get_bytes_as call? After Manager.get_record_stream() returns
         #       the object?
         self._manager = manager
+        self._bytes = None
         self.storage_kind = 'groupcompress-block'
         if not first:
             self.storage_kind = 'groupcompress-block-ref'
@@ -496,15 +495,20 @@
             else:
                 return ''
         if storage_kind in ('fulltext', 'chunked'):
-            self._manager._prepare_for_extract()
-            block = self._manager._block
-            bytes = block.extract(self.key, self._start, self._end)
+            if self._bytes is None:
+                # Grab and cache the raw bytes for this entry
+                # and break the ref-cycle with _manager since we don't need it
+                # anymore
+                self._manager._prepare_for_extract()
+                block = self._manager._block
+                self._bytes = block.extract(self.key, self._start, self._end)
+                self._manager = None
             if storage_kind == 'fulltext':
-                return bytes
+                return self._bytes
             else:
-                return [bytes]
+                return [self._bytes]
         raise errors.UnavailableRepresentation(self.key, storage_kind,
-            self.storage_kind)
+                                               self.storage_kind)
 
 
 class _LazyGroupContentManager(object):
@@ -531,6 +535,9 @@
         """Get a record for all keys added so far."""
         for factory in self._factories:
             yield factory
+            # Break the ref-cycle
+            factory._bytes = None
+            factory._manager = None
         # TODO: Consider setting self._factories = None after the above loop,
         #       as it will break the reference cycle
 
@@ -1281,6 +1288,7 @@
         for key in missing:
             yield AbsentContentFactory(key)
         manager = None
+        last_read_memo = None
         # TODO: This works fairly well at batching up existing groups into a
         #       streamable format, and possibly allowing for taking one big
         #       group and splitting it when it isn't fully utilized.
@@ -1295,39 +1303,39 @@
                 for key in keys:
                     if key in self._unadded_refs:
                         if manager is not None:
-                            # Yield everything buffered so far
                             for factory in manager.get_record_stream():
                                 yield factory
-                            manager = None
+                            last_read_memo = manager = None
                         bytes, sha1 = self._compressor.extract(key)
                         parents = self._unadded_refs[key]
                         yield FulltextContentFactory(key, parents, sha1, bytes)
                     else:
                         index_memo, _, parents, (method, _) = locations[key]
-                        block = self._get_block(index_memo)
+                        read_memo = index_memo[0:3]
+                        if last_read_memo != read_memo:
+                            # We are starting a new block. If we have a
+                            # manager, we have found everything that fits for
+                            # now, so yield records
+                            if manager is not None:
+                                for factory in manager.get_record_stream():
+                                    yield factory
+                            # Now start a new manager
+                            block = self._get_block(index_memo)
+                            manager = _LazyGroupContentManager(block)
+                            last_read_memo = read_memo
                         start, end = index_memo[3:5]
-                        if manager is None:
-                            manager = _LazyGroupContentManager(block)
-                        elif manager._block is not block:
-                            # Flush and create a new manager
-                            for factory in manager.get_record_stream():
-                                yield factory
-                            manager = _LazyGroupContentManager(block)
                         manager.add_factory(key, parents, start, end)
             else:
                 if manager is not None:
-                    # Yield everything buffered so far
                     for factory in manager.get_record_stream():
                         yield factory
-                    manager = None
+                    last_read_memo = manager = None
                 for record in source.get_record_stream(keys, ordering,
                                                        include_delta_closure):
                     yield record
         if manager is not None:
-            # Yield everything buffered so far
             for factory in manager.get_record_stream():
                 yield factory
-            manager = None
 
     def get_sha1s(self, keys):
         """See VersionedFiles.get_sha1s()."""

=== modified file 'bzrlib/repofmt/groupcompress_repo.py'
--- a/bzrlib/repofmt/groupcompress_repo.py	2009-03-20 15:53:00 +0000
+++ b/bzrlib/repofmt/groupcompress_repo.py	2009-03-23 20:04:42 +0000
@@ -302,7 +302,7 @@
                 # next_keys = next_keys.intersection(remaining_keys)
                 cur_keys = []
                 for prefix in sorted(keys_by_search_prefix):
-                    cur_keys.extend(keys_by_search_prefix[prefix])
+                    cur_keys.extend(keys_by_search_prefix.pop(prefix))
         for stream in _get_referenced_stream(self._chk_id_roots,
                                              self._gather_text_refs):
             yield stream



More information about the bazaar-commits mailing list