Rev 20: Change how we do the batch processing. in lp:///~jameinel/bzr/fix277537

John Arbash Meinel john at arbash-meinel.com
Fri Jan 30 20:44:10 GMT 2009


At lp:///~jameinel/bzr/fix277537

------------------------------------------------------------
revno: 20
revision-id: john at arbash-meinel.com-20090130204404-n0blm8d9mogaade9
parent: john at arbash-meinel.com-20090130203710-bzbj99qssq20ka6x
committer: John Arbash Meinel <john at arbash-meinel.com>
branch nick: fix277537
timestamp: Fri 2009-01-30 14:44:04 -0600
message:
  Change how we do the batch processing.
  The old code used a fixed 100-node batch, the new code batches based on file-id,
  so that no batch gets larger than 100, but we can still batch smaller groups
  together.
  We also now track the parent content cache, so less work needs to be done
  for some of the text insertions.
  Because we track parent texts, we don't seem to need to flush after every
  record. This doesn't quite fit what I thought, but it seems to work just fine.
  The hit rate isn't terrific, but it is reasonable. Memory still seems reasonable,
  and the total time should be in the 10-minute range.
  A little bit more could be gained by changing the bzr add_lines() code
  to pass the parent_texts cache down into get_content_maps(), but it would
  also require updating _get_record_map() to realize it doesn't need to
  extract those specific entries.
  (applying the delta in get_content_map is very fast versus extracting the
  text in the first place in get_record_map)
-------------- next part --------------
=== modified file 'reconcile.py'
--- a/reconcile.py	2009-01-30 15:56:25 +0000
+++ b/reconcile.py	2009-01-30 20:44:04 +0000
@@ -107,7 +107,8 @@
                             fixed_nodes[node[1]] = (wrong_parents,
                                                     ideal_parents)
                         else:
-                            import pdb; pdb.set_trace()
+                            assert False, "Node layout does not conform to"\
+                                " expections, aborting"
                         self._data_changed = True
                         self.wrong_parents_fixed += 1
                     else:
@@ -126,6 +127,7 @@
             self.new_pack.text_index, readv_group_iter, total_items))
         # 4) Now copy the fixed nodes
         self._copy_fixed_nodes(fixed_nodes)
+        trace.note('Counter: %s', knit._counter)
         self._log_copied_texts()
 
     def _copy_fixed_nodes(self, fixed_nodes):
@@ -133,6 +135,8 @@
         if not fixed_nodes:
             # Nothing to copy
             return
+        # Make sure any deltas are able to access the new texts
+        self.new_pack.flush()
         # This is the revision_id parent map, note that we use revision_ids,
         # not keys in this map
         parent_map = dict([(key[0], tuple(ref[0] for ref in refs[0])) for
@@ -161,22 +165,58 @@
             def grouped_and_topo_order(key):
                 # key == (file_id, revision_id)
                 return (key[0], revid_to_offset[key[1]])
-            sorted_nodes = sorted(fixed_nodes.iterkeys(),
-                                  key=grouped_and_topo_order)
-            for start in xrange(0, num_texts, 100):
-                batch = sorted_nodes[start:start+100]
-                for idx, record in enumerate(repo.texts.get_record_stream(
-                                                  batch, 'topological', True)):
+            # We use reverse sorted order, because we will pop nodes off the
+            # end
+            remaining_nodes = sorted(fixed_nodes.iterkeys(),
+                                     key=grouped_and_topo_order,
+                                     reverse=True)
+            next_batch = []
+            file_id = None
+            count = 0
+            parent_texts = {}
+            parent_file_id = None
+            while remaining_nodes:
+                next_key = remaining_nodes.pop()
+                if (file_id is None
+                    or len(next_batch) < 10
+                    or (len(next_batch) < 100 and file_id == next_key[0])):
+                    next_batch.append(next_key)
+                    file_id = next_key[0]
+                    continue
+                this_batch = next_batch
+                next_batch = [next_key]
+                file_id = next_key[0]
+                chunks = {}
+                trace.mutter('copying batch %s len=%s, counter=%s', count,
+                             len(this_batch), knit._counter)
+                for record in repo.texts.get_record_stream(this_batch,
+                                                           'unordered', True):
+                    chunks[record.key] = record.get_bytes_as('chunked')
+                    wrong_parents, ideal_parents = fixed_nodes[record.key]
+                    assert record.parents == wrong_parents
+
+                for key in this_batch:
+                    if parent_file_id is None or parent_file_id != key[0]:
+                        # We group the requests by file-id, so if we are on a
+                        # new file_id, then we know the parent entries won't
+                        # help us
+                        parent_texts.clear()
+                    count += 1
                     # Flush the new_pack, so that deltas can be written against
                     # the just-written data
                     self.new_pack.flush()
-                    pb.update('Copying fulltext', start+idx, num_texts)
-                    wrong_parents, ideal_parents = fixed_nodes[record.key]
-                    assert record.parents == wrong_parents
-                    lines = osutils.chunks_to_lines(
-                                record.get_bytes_as('chunked'))
-                    output_texts.add_lines(record.key, ideal_parents, lines,
-                                           random_id=True, check_content=False)
+                    pb.update('Copying fulltext', count, num_texts)
+                    wrong_parents, ideal_parents = fixed_nodes[key]
+                    lines = osutils.chunks_to_lines(chunks.pop(key))
+                    try:
+                        _, _, content = output_texts.add_lines(key,
+                            ideal_parents, lines, random_id=True,
+                            check_content=False,
+                            parent_texts=parent_texts)
+                    except KeyError, e:
+                        import pdb; pdb.set_trace()
+                    parent_texts[key] = content
+                    parent_file_id = key[0]
         finally:
             pb.finished()
 



More information about the bazaar-commits mailing list