Rev 3886: Merge the refcycle changes. in http://bzr.arbash-meinel.com/branches/bzr/brisbane/hack3

John Arbash Meinel john at arbash-meinel.com
Mon Mar 23 03:43:57 GMT 2009


At http://bzr.arbash-meinel.com/branches/bzr/brisbane/hack3

------------------------------------------------------------
revno: 3886
revision-id: john at arbash-meinel.com-20090323034347-fubmlsnyfvx5vvup
parent: john at arbash-meinel.com-20090323033446-wc9ispgsj118rgl6
parent: john at arbash-meinel.com-20090323032950-lmbrocu79l90dqn5
committer: John Arbash Meinel <john at arbash-meinel.com>
branch nick: hack3
timestamp: Sun 2009-03-22 22:43:47 -0500
message:
  Merge the refcycle changes.
modified:
  bzrlib/groupcompress.py        groupcompress.py-20080705181503-ccbxd6xuy1bdnrpu-8
  bzrlib/lru_cache.py            lru_cache.py-20070119165515-tlw203kuwh0id5gv-1
  bzrlib/repofmt/groupcompress_repo.py repofmt.py-20080715094215-wp1qfvoo7093c8qr-1
    ------------------------------------------------------------
    revno: 3869.9.2
    revision-id: john at arbash-meinel.com-20090323032950-lmbrocu79l90dqn5
    parent: john at arbash-meinel.com-20090320150205-kcmh70biyo76p0kn
    parent: john at arbash-meinel.com-20090321032222-n2wbqe0ozhhizwxm
    committer: John Arbash Meinel <john at arbash-meinel.com>
    branch nick: refcycles
    timestamp: Sun 2009-03-22 22:29:50 -0500
    message:
      Merge brisbane-core tip, resolve differences.
      Finish making various get_record_stream() calls clean up refcycles
      and memory consumption after yielding the record.
    modified:
      bzrlib/groupcompress.py        groupcompress.py-20080705181503-ccbxd6xuy1bdnrpu-8
      bzrlib/repofmt/groupcompress_repo.py repofmt.py-20080715094215-wp1qfvoo7093c8qr-1
      bzrlib/tests/test_groupcompress.py test_groupcompress.p-20080705181503-ccbxd6xuy1bdnrpu-13
    ------------------------------------------------------------
    revno: 3869.9.1
    revision-id: john at arbash-meinel.com-20090320150205-kcmh70biyo76p0kn
    parent: john at arbash-meinel.com-20090320032107-bm9wg421rtcacy5i
    committer: John Arbash Meinel <john at arbash-meinel.com>
    branch nick: refcycles
    timestamp: Fri 2009-03-20 10:02:05 -0500
    message:
      Some testing to see if we can decrease the peak memory consumption a bit.
      It looks like we can, just need some more perf, etc.
    modified:
      bzrlib/groupcompress.py        groupcompress.py-20080705181503-ccbxd6xuy1bdnrpu-8
      bzrlib/lru_cache.py            lru_cache.py-20070119165515-tlw203kuwh0id5gv-1
      bzrlib/repofmt/groupcompress_repo.py repofmt.py-20080715094215-wp1qfvoo7093c8qr-1
-------------- next part --------------
=== modified file 'bzrlib/groupcompress.py'
--- a/bzrlib/groupcompress.py	2009-03-23 03:34:46 +0000
+++ b/bzrlib/groupcompress.py	2009-03-23 03:43:47 +0000
@@ -339,8 +339,6 @@
         :param sha1: TODO (should we validate only when sha1 is supplied?)
         :return: The bytes for the content
         """
-        # Handle the 'Empty Content' record, even if we don't always write it
-        # yet.
         if start == end == 0:
             return ''
         self._ensure_content(end)
@@ -477,6 +475,7 @@
         #       get_bytes_as call? After Manager.get_record_stream() returns
         #       the object?
         self._manager = manager
+        self._bytes = None
         self.storage_kind = 'groupcompress-block'
         if not first:
             self.storage_kind = 'groupcompress-block-ref'
@@ -495,14 +494,18 @@
                 return self._manager._wire_bytes()
             else:
                 return ''
+            self._manager = None # safe?
         if storage_kind in ('fulltext', 'chunked'):
-            self._manager._prepare_for_extract()
-            block = self._manager._block
-            bytes = block.extract(self.key, self._start, self._end)
+            if self._bytes is None:
+                # Grab the raw bytes for this entry, and break the ref-cycle
+                self._manager._prepare_for_extract()
+                block = self._manager._block
+                self._bytes = block.extract(self.key, self._start, self._end)
+                self._manager = None
             if storage_kind == 'fulltext':
-                return bytes
+                return self._bytes
             else:
-                return [bytes]
+                return [self._bytes]
         raise errors.UnavailableRepresentation(self.key, storage_kind,
             self.storage_kind)
 
@@ -531,6 +534,8 @@
         """Get a record for all keys added so far."""
         for factory in self._factories:
             yield factory
+            factory._bytes = None
+            factory._manager = None
         # TODO: Consider setting self._factories = None after the above loop,
         #       as it will break the reference cycle
 
@@ -1310,6 +1315,8 @@
         for key in missing:
             yield AbsentContentFactory(key)
         manager = None
+        last_block = None
+        last_memo = None
         # TODO: This works fairly well at batching up existing groups into a
         #       streamable format, and possibly allowing for taking one big
         #       group and splitting it when it isn't fully utilized.
@@ -1327,13 +1334,24 @@
                             # Yield everything buffered so far
                             for factory in manager.get_record_stream():
                                 yield factory
+                                # Disable this record, breaks the refcycle, and
+                                # saves memory. But this means clients really
+                                # *cannot* hang on to objects.
+                                factory._bytes = None
+                                factory._manager = None
                             manager = None
                         bytes, sha1 = self._compressor.extract(key)
                         parents = self._unadded_refs[key]
                         yield FulltextContentFactory(key, parents, sha1, bytes)
                     else:
                         index_memo, _, parents, (method, _) = locations[key]
-                        block = self._get_block(index_memo)
+                        read_memo = index_memo[0:3]
+                        if last_memo == read_memo:
+                            block = last_block
+                        else:
+                            block = self._get_block(index_memo)
+                            last_block = block
+                            last_memo = read_memo
                         start, end = index_memo[3:5]
                         if manager is None:
                             manager = _LazyGroupContentManager(block)

=== modified file 'bzrlib/lru_cache.py'
--- a/bzrlib/lru_cache.py	2008-12-09 22:31:56 +0000
+++ b/bzrlib/lru_cache.py	2009-03-20 15:02:05 +0000
@@ -151,8 +151,12 @@
     def clear(self):
         """Clear out all of the cache."""
         # Clean up in LRU order
-        while self._cache:
-            self._remove_lru()
+        for key in self._cache.keys():
+            self._remove(key)
+        assert not self._cache
+        assert not self._cleanup
+        self._queue = deque()
+        self._refcount = {}
 
     def resize(self, max_cache, after_cleanup_count=None):
         """Change the number of entries that will be cached."""
@@ -247,6 +251,10 @@
         val = LRUCache._remove(self, key)
         self._value_size -= self._compute_size(val)
 
+    def clear(self):
+        LRUCache.clear(self)
+        self._value_size = 0
+
     def resize(self, max_size, after_cleanup_size=None):
         """Change the number of bytes that will be cached."""
         self._update_max_size(max_size, after_cleanup_size=after_cleanup_size)

=== modified file 'bzrlib/repofmt/groupcompress_repo.py'
--- a/bzrlib/repofmt/groupcompress_repo.py	2009-03-21 02:51:34 +0000
+++ b/bzrlib/repofmt/groupcompress_repo.py	2009-03-23 03:43:47 +0000
@@ -190,6 +190,8 @@
                 if pb is not None:
                     pb.update(message, idx + 1, len(keys))
                 yield record
+                record._manager = None
+                record._bytes = None
         return pb_stream()
 
     def _get_filtered_inv_stream(self, source_vf, keys, message, pb=None):
@@ -216,6 +218,8 @@
                     p_id_roots_set.add(key)
                     self._chk_p_id_roots.append(key)
                 yield record
+                record._manager = None
+                record._bytes = None
             # We have finished processing all of the inventory records, we
             # don't need these sets anymore
             id_roots_set.clear()
@@ -291,6 +295,8 @@
                         if pb is not None:
                             pb.update('chk node', counter[0], total_keys)
                         yield record
+                        record._manager = None
+                        record._bytes = None
                 yield next_stream()
                 # Double check that we won't be emitting any keys twice
                 # If we get rid of the pre-calculation of all keys, we could
@@ -302,7 +308,7 @@
                 # next_keys = next_keys.intersection(remaining_keys)
                 cur_keys = []
                 for prefix in sorted(keys_by_search_prefix):
-                    cur_keys.extend(keys_by_search_prefix[prefix])
+                    cur_keys.extend(keys_by_search_prefix.pop(prefix))
         for stream in _get_referenced_stream(self._chk_id_roots,
                                              self._gather_text_refs):
             yield stream
@@ -390,11 +396,17 @@
             self.revision_keys = source_vf.keys()
         self._copy_stream(source_vf, target_vf, self.revision_keys,
                           'revisions', self._get_progress_stream, 1)
+        for index in source_vf._index._graph_index._indices:
+            index._leaf_node_cache.clear()
+        # target_vf._index._graph_index._spill_mem_keys_to_disk()
 
     def _copy_inventory_texts(self):
         source_vf, target_vf = self._build_vfs('inventory', True, True)
         self._copy_stream(source_vf, target_vf, self.revision_keys,
                           'inventories', self._get_filtered_inv_stream, 2)
+        for index in source_vf._index._graph_index._indices:
+            index._leaf_node_cache.clear()
+        # target_vf._index._graph_index._spill_mem_keys_to_disk()
 
     def _copy_chk_texts(self):
         source_vf, target_vf = self._build_vfs('chk', False, False)
@@ -416,6 +428,9 @@
                     pass
         finally:
             child_pb.finished()
+        for index in source_vf._index._graph_index._indices:
+            index._leaf_node_cache.clear()
+        # target_vf._index._graph_index._spill_mem_keys_to_disk()
 
     def _copy_text_texts(self):
         source_vf, target_vf = self._build_vfs('text', True, True)
@@ -427,6 +442,9 @@
         text_keys = source_vf.keys()
         self._copy_stream(source_vf, target_vf, text_keys,
                           'text', self._get_progress_stream, 4)
+        for index in source_vf._index._graph_index._indices:
+            index._leaf_node_cache.clear()
+        # target_vf._index._graph_index._spill_mem_keys_to_disk()
 
     def _copy_signature_texts(self):
         source_vf, target_vf = self._build_vfs('signature', False, False)
@@ -434,6 +452,9 @@
         signature_keys.intersection(self.revision_keys)
         self._copy_stream(source_vf, target_vf, signature_keys,
                           'signatures', self._get_progress_stream, 5)
+        for index in source_vf._index._graph_index._indices:
+            index._leaf_node_cache.clear()
+        # target_vf._index._graph_index._spill_mem_keys_to_disk()
 
     def _create_pack_from_packs(self):
         self.pb.update('repacking', 0, 7)
@@ -446,6 +467,7 @@
         self._copy_text_texts()
         self._copy_signature_texts()
         self.new_pack._check_references()
+        trace.debug_memory('after fetch')
         if not self._use_pack(self.new_pack):
             self.new_pack.abort()
             return None



More information about the bazaar-commits mailing list