Rev 31: Implement spooling to disk for indices with more than 100000 nodes (which is configurable via the constructor.) in http://people.ubuntu.com/~robertc/baz2.0/plugins/index2/trunk
Robert Collins
robertc at robertcollins.net
Sun Jul 13 18:39:11 BST 2008
At http://people.ubuntu.com/~robertc/baz2.0/plugins/index2/trunk
------------------------------------------------------------
revno: 31
revision-id: robertc at robertcollins.net-20080713173905-dxz6v9ke8iynar4l
parent: robertc at robertcollins.net-20080713115155-k1nu89eo932mpp1l
committer: Robert Collins <robertc at robertcollins.net>
branch nick: trunk
timestamp: Mon 2008-07-14 03:39:05 +1000
message:
Implement spooling to disk for indices with more than 100000 nodes (which is configurable via the constructor.)
modified:
btree_index.py index.py-20080624222253-p0x5f92uyh5hw734-7
tests/test_btree_index.py test_index.py-20080624222253-p0x5f92uyh5hw734-13
=== modified file 'btree_index.py'
--- a/btree_index.py 2008-07-13 11:51:55 +0000
+++ b/btree_index.py 2008-07-13 17:39:05 +0000
@@ -30,8 +30,10 @@
from bzrlib import debug, index, lru_cache, osutils, trace
from bzrlib import errors as bzrerrors
from bzrlib.index import _OPTION_NODE_REFS, _OPTION_KEY_ELEMENTS, _OPTION_LEN
+from bzrlib.osutils import basename, dirname
from bzrlib.plugins.index2 import errors, chunk_writer
from bzrlib.plugins.pybloom.pybloom import BloomSHA1
+from bzrlib.transport import get_transport
_BTSIGNATURE = "B+Tree Graph Index 1\n"
@@ -178,6 +180,58 @@
VALUE := no-newline-no-null-bytes
"""
+ def __init__(self, reference_lists=0, key_elements=1, spill_at=100000):
+ """See GraphIndexBuilder.__init__.
+
+ :param spill_at: Optional parameter controlling the maximum number
+ of nodes that BTreeBuilder will hold in memory.
+ """
+ index.GraphIndexBuilder.__init__(self, reference_lists=reference_lists,
+ key_elements=key_elements)
+ self._spill_at = spill_at
+ self._backing_indices = []
+
+ def add_node(self, key, value, references=()):
+ """Add a node to the index.
+
+ If adding the node causes the builder to reach its spill_at threshold,
+ disk spilling will be triggered.
+
+ :param key: The key. keys are non-empty tuples containing
+ as many whitespace-free utf8 bytestrings as the key length
+ defined for this index.
+ :param references: An iterable of iterables of keys. Each is a
+ reference to another key.
+ :param value: The value to associate with the key. It may be any
+ bytes as long as it does not contain \0 or \n.
+ """
+ index.GraphIndexBuilder.add_node(self, key, value, references=references)
+ if len(self._keys) < self._spill_at:
+ return
+ iterators_to_combine = [iter(sorted(self._iter_mem_nodes()))]
+ pos = -1
+ for pos, backing in enumerate(self._backing_indices):
+ if backing is None:
+ pos -= 1
+ break
+ iterators_to_combine.append(backing.iter_all_entries())
+ backing_pos = pos + 1
+ new_backing_file, size = \
+ self._write_nodes(self._iter_smallest(iterators_to_combine))
+ new_backing = BTreeGraphIndex(
+ get_transport(dirname(new_backing_file.name)),
+ basename(new_backing_file.name), size)
+ # GC will clean up the file
+ new_backing._file = new_backing_file
+ if len(self._backing_indices) == backing_pos:
+ self._backing_indices.append(None)
+ self._backing_indices[backing_pos] = new_backing
+ for pos in range(backing_pos):
+ self._backing_indices[pos] = None
+ self._keys = set()
+ self._nodes = {}
+ self._nodes_by_key = {}
+
def add_nodes(self, nodes):
"""Add nodes to the index.
@@ -201,6 +255,33 @@
if not absent:
yield self, key, value
+ def _iter_smallest(self, iterators_to_combine):
+ current_values = []
+ for iterator in iterators_to_combine:
+ try:
+ current_values.append(iterator.next())
+ except StopIteration:
+ current_values.append(None)
+ def getter(item):
+ return item[1][1]
+ last = None
+ while True:
+ candidates = [item for item
+ in enumerate(current_values) if item[1] is not None]
+ if not len(candidates):
+ return
+ selected = min(candidates, key=getter)
+ if last == selected[1][1]:
+ raise bzrerrors.BadIndexDuplicateKey(last, self)
+ last = selected[1][1]
+ # Yield, with self as the index
+ yield (self,) + selected[1][1:]
+ pos = selected[0]
+ try:
+ current_values[pos] = iterators_to_combine[pos].next()
+ except StopIteration:
+ current_values[pos] = None
+
def _write_nodes(self, node_iterator):
"""Write node_iterator out as a B+Tree.
@@ -336,7 +417,7 @@
add_key(string_key, line)
for row in reversed(rows):
row.finish_node()
- result = tempfile.TemporaryFile()
+ result = tempfile.NamedTemporaryFile()
lines = [_BTSIGNATURE]
lines.append(_OPTION_NODE_REFS + str(self.reference_lists) + '\n')
lines.append(_OPTION_KEY_ELEMENTS + str(self._key_length) + '\n')
@@ -385,8 +466,9 @@
num_bloom_pages * _PAGE_SIZE, len(bloom_str))
result.write(bloom_str)
result.flush()
+ size = result.tell()
result.seek(0)
- return result
+ return result, size
def finish(self):
"""Finalise the index.
@@ -394,8 +476,7 @@
:return: A file handle for a temporary file containing the nodes added
to the index.
"""
- nodes = sorted(self._iter_mem_nodes())
- return self._write_nodes(nodes)
+ return self._write_nodes(self.iter_all_entries())[0]
def iter_all_entries(self):
"""Iterate over all keys within the index
@@ -407,7 +488,13 @@
if 'evil' in debug.debug_flags:
trace.mutter_callsite(3,
"iter_all_entries scales with size of history.")
- return self._iter_mem_nodes()
+ # Doing serial rather than ordered would be faster; but this shouldn't
+ # be getting called routinely anyway.
+ iterators = [iter(sorted(self._iter_mem_nodes()))]
+ for backing in self._backing_indices:
+ if backing is not None:
+ iterators.append(backing.iter_all_entries())
+ return self._iter_smallest(iterators)
def iter_entries(self, keys):
"""Iterate over keys within the index.
@@ -428,6 +515,15 @@
node = self._nodes[key]
if not node[0]:
yield self, key, node[2]
+ keys.difference_update(self._keys)
+ for backing in self._backing_indices:
+ if backing is None:
+ continue
+ if not keys:
+ return
+ for node in backing.iter_entries(keys):
+ keys.remove(node[1])
+ yield (self,) + node[1:]
def iter_entries_prefix(self, keys):
"""Iterate over keys within the index using prefix matching.
@@ -451,6 +547,11 @@
keys = set(keys)
if not keys:
return
+ for backing in self._backing_indices:
+ if backing is None:
+ continue
+ for node in backing.iter_entries_prefix(keys):
+ yield (self,) + node[1:]
if self._key_length == 1:
for key in keys:
# sanity check
@@ -458,7 +559,10 @@
raise errors.BadIndexKey(key)
if len(key) != self._key_length:
raise errors.BadIndexKey(key)
- node = self._nodes[key]
+ try:
+ node = self._nodes[key]
+ except KeyError:
+ continue
if node[0]:
continue
if self.reference_lists:
@@ -504,7 +608,8 @@
For InMemoryGraphIndex the estimate is exact.
"""
- return len(self._keys)
+ return len(self._keys) + sum(backing.key_count() for backing in
+ self._backing_indices if backing is not None)
def validate(self):
"""In memory index's have no known corruption at the moment."""
@@ -703,15 +808,15 @@
start_of_leaves = self._row_offsets[-2]
end_of_leaves = self._row_offsets[-1]
needed_nodes = range(start_of_leaves, end_of_leaves)
- # TODO: we *might* want to check in the cache for nodes which we
- # already have parsed
+ # We iterate strictly in-order so that we can use this function
+ # for spilling index builds to disk.
if self.node_ref_lists:
for _, node in self._read_nodes(needed_nodes):
- for key, (value, refs) in node.keys.items():
+ for key, (value, refs) in sorted(node.keys.items()):
yield (self, key, value, refs)
else:
for _, node in self._read_nodes(needed_nodes):
- for key, (value, refs) in node.keys.items():
+ for key, (value, refs) in sorted(node.keys.items()):
yield (self, key, value)
@staticmethod
@@ -1019,11 +1124,14 @@
raise bzrerrors.BadIndexKey(key)
if len(key) != self._key_length:
raise bzrerrors.BadIndexKey(key)
- if self.node_ref_lists:
- value, node_refs = nodes[key]
- yield self, key, value, node_refs
- else:
- yield self, key, nodes[key]
+ try:
+ if self.node_ref_lists:
+ value, node_refs = nodes[key]
+ yield self, key, value, node_refs
+ else:
+ yield self, key, nodes[key]
+ except KeyError:
+ pass
return
for key in keys:
# sanity check
=== modified file 'tests/test_btree_index.py'
--- a/tests/test_btree_index.py 2008-07-13 07:28:15 +0000
+++ b/tests/test_btree_index.py 2008-07-13 17:39:05 +0000
@@ -108,7 +108,10 @@
def test_empty_1_0(self):
builder = btree_index.BTreeBuilder(key_elements=1, reference_lists=0)
- content = builder.finish().read()
+ # NamedTemporaryFile dies on builder.finish().read(). weird.
+ temp_file = builder.finish()
+ content = temp_file.read()
+ del temp_file
self.assertEqual(
"B+Tree Graph Index 1\nnode_ref_lists=0\nkey_elements=1\nlen=0\n"
"row_lengths=\nbloom_pages=0\n",
@@ -116,7 +119,10 @@
def test_empty_2_1(self):
builder = btree_index.BTreeBuilder(key_elements=2, reference_lists=1)
- content = builder.finish().read()
+ # NamedTemporaryFile dies on builder.finish().read(). weird.
+ temp_file = builder.finish()
+ content = temp_file.read()
+ del temp_file
self.assertEqual(
"B+Tree Graph Index 1\nnode_ref_lists=1\nkey_elements=2\nlen=0\n"
"row_lengths=\nbloom_pages=0\n",
@@ -127,7 +133,10 @@
nodes = self.make_nodes(5, 1, 0)
for node in nodes:
builder.add_node(*node)
- content = builder.finish().read()
+ # NamedTemporaryFile dies on builder.finish().read(). weird.
+ temp_file = builder.finish()
+ content = temp_file.read()
+ del temp_file
self.assertEqual(4096, len(content))
self.assertEqual(
"B+Tree Graph Index 1\nnode_ref_lists=0\nkey_elements=1\nlen=5\n"
@@ -148,7 +157,10 @@
nodes = self.make_nodes(5, 2, 2)
for node in nodes:
builder.add_node(*node)
- content = builder.finish().read()
+ # NamedTemporaryFile dies on builder.finish().read(). weird.
+ temp_file = builder.finish()
+ content = temp_file.read()
+ del temp_file
self.assertEqual(4096, len(content))
self.assertEqual(
"B+Tree Graph Index 1\nnode_ref_lists=2\nkey_elements=2\nlen=10\n"
@@ -177,7 +189,10 @@
nodes = self.make_nodes(800, 1, 0)
for node in nodes:
builder.add_node(*node)
- content = builder.finish().read()
+ # NamedTemporaryFile dies on builder.finish().read(). weird.
+ temp_file = builder.finish()
+ content = temp_file.read()
+ del temp_file
self.assertEqual(4096*4, len(content))
self.assertEqual(
"B+Tree Graph Index 1\nnode_ref_lists=0\nkey_elements=1\nlen=800\n"
@@ -275,7 +290,10 @@
nodes = self.make_nodes(200, 2, 2)
for node in nodes:
builder.add_node(*node)
- content = builder.finish().read()
+ # NamedTemporaryFile dies on builder.finish().read(). weird.
+ temp_file = builder.finish()
+ content = temp_file.read()
+ del temp_file
self.assertEqual(4096*4, len(content))
self.assertEqual(
"B+Tree Graph Index 1\nnode_ref_lists=2\nkey_elements=2\nlen=400\n"
@@ -308,6 +326,192 @@
self.assertEqual(global_bloom._array.tostring(), global_bloom_bytes)
# We assume the other leaf nodes have been written correctly - layering FTW.
+ def test_spill_index_stress_1_1(self):
+ builder = btree_index.BTreeBuilder(key_elements=1, spill_at=2)
+ nodes = [node[0:2] for node in self.make_nodes(16, 1, 0)]
+ builder.add_node(*nodes[0])
+ # Test the parts of the index that take up memory are doing so
+ # predictably.
+ self.assertEqual(1, len(builder._nodes))
+ self.assertEqual(1, len(builder._keys))
+ self.assertEqual({}, builder._nodes_by_key)
+ builder.add_node(*nodes[1])
+ self.assertEqual(0, len(builder._nodes))
+ self.assertEqual(0, len(builder._keys))
+ self.assertEqual({}, builder._nodes_by_key)
+ self.assertEqual(1, len(builder._backing_indices))
+ self.assertEqual(2, builder._backing_indices[0].key_count())
+ # now back to memory
+ builder.add_node(*nodes[2])
+ self.assertEqual(1, len(builder._nodes))
+ self.assertEqual(1, len(builder._keys))
+ self.assertEqual({}, builder._nodes_by_key)
+ # And spills to a second backing index combing all
+ builder.add_node(*nodes[3])
+ self.assertEqual(0, len(builder._nodes))
+ self.assertEqual(0, len(builder._keys))
+ self.assertEqual({}, builder._nodes_by_key)
+ self.assertEqual(2, len(builder._backing_indices))
+ self.assertEqual(None, builder._backing_indices[0])
+ self.assertEqual(4, builder._backing_indices[1].key_count())
+ # The next spills to the 2-len slot
+ builder.add_node(*nodes[4])
+ builder.add_node(*nodes[5])
+ self.assertEqual(0, len(builder._nodes))
+ self.assertEqual(0, len(builder._keys))
+ self.assertEqual({}, builder._nodes_by_key)
+ self.assertEqual(2, len(builder._backing_indices))
+ self.assertEqual(2, builder._backing_indices[0].key_count())
+ self.assertEqual(4, builder._backing_indices[1].key_count())
+ # Next spill combines
+ builder.add_node(*nodes[6])
+ builder.add_node(*nodes[7])
+ self.assertEqual(3, len(builder._backing_indices))
+ self.assertEqual(None, builder._backing_indices[0])
+ self.assertEqual(None, builder._backing_indices[1])
+ self.assertEqual(8, builder._backing_indices[2].key_count())
+ # And so forth - counting up in binary.
+ builder.add_node(*nodes[8])
+ builder.add_node(*nodes[9])
+ self.assertEqual(3, len(builder._backing_indices))
+ self.assertEqual(2, builder._backing_indices[0].key_count())
+ self.assertEqual(None, builder._backing_indices[1])
+ self.assertEqual(8, builder._backing_indices[2].key_count())
+ builder.add_node(*nodes[10])
+ builder.add_node(*nodes[11])
+ self.assertEqual(3, len(builder._backing_indices))
+ self.assertEqual(None, builder._backing_indices[0])
+ self.assertEqual(4, builder._backing_indices[1].key_count())
+ self.assertEqual(8, builder._backing_indices[2].key_count())
+ builder.add_node(*nodes[12])
+ # Test that memory and disk are both used for query methods; and that
+ # None is skipped over happily.
+ self.assertEqual([(builder,) + node for node in sorted(nodes[:13])],
+ list(builder.iter_all_entries()))
+ # Two nodes - one memory one disk
+ self.assertEqual(set([(builder,) + node for node in nodes[11:13]]),
+ set(builder.iter_entries([nodes[12][0], nodes[11][0]])))
+ self.assertEqual(13, builder.key_count())
+ self.assertEqual(set([(builder,) + node for node in nodes[11:13]]),
+ set(builder.iter_entries_prefix([nodes[12][0], nodes[11][0]])))
+ builder.add_node(*nodes[13])
+ self.assertEqual(3, len(builder._backing_indices))
+ self.assertEqual(2, builder._backing_indices[0].key_count())
+ self.assertEqual(4, builder._backing_indices[1].key_count())
+ self.assertEqual(8, builder._backing_indices[2].key_count())
+ builder.add_node(*nodes[14])
+ builder.add_node(*nodes[15])
+ self.assertEqual(4, len(builder._backing_indices))
+ self.assertEqual(None, builder._backing_indices[0])
+ self.assertEqual(None, builder._backing_indices[1])
+ self.assertEqual(None, builder._backing_indices[2])
+ self.assertEqual(16, builder._backing_indices[3].key_count())
+ # Now finish, and check we got a correctly ordered tree
+ transport = self.get_transport('')
+ size = transport.put_file('index', builder.finish())
+ index = btree_index.BTreeGraphIndex(transport, 'index', size)
+ nodes = list(index.iter_all_entries())
+ self.assertEqual(sorted(nodes), nodes)
+ self.assertEqual(16, len(nodes))
+
+ def test_spill_index_stress_2_2(self):
+ # test that references and longer keys don't confuse things.
+ builder = btree_index.BTreeBuilder(key_elements=2, reference_lists=2,
+ spill_at=2)
+ nodes = self.make_nodes(16, 2, 2)
+ builder.add_node(*nodes[0])
+ # Test the parts of the index that take up memory are doing so
+ # predictably.
+ self.assertEqual(1, len(builder._keys))
+ self.assertEqual(2, len(builder._nodes))
+ self.assertNotEqual({}, builder._nodes_by_key)
+ builder.add_node(*nodes[1])
+ self.assertEqual(0, len(builder._keys))
+ self.assertEqual(0, len(builder._nodes))
+ self.assertEqual({}, builder._nodes_by_key)
+ self.assertEqual(1, len(builder._backing_indices))
+ self.assertEqual(2, builder._backing_indices[0].key_count())
+ # now back to memory
+ builder.add_node(*nodes[2])
+ self.assertEqual(2, len(builder._nodes))
+ self.assertEqual(1, len(builder._keys))
+ self.assertNotEqual({}, builder._nodes_by_key)
+ # And spills to a second backing index combing all
+ builder.add_node(*nodes[3])
+ self.assertEqual(0, len(builder._nodes))
+ self.assertEqual(0, len(builder._keys))
+ self.assertEqual({}, builder._nodes_by_key)
+ self.assertEqual(2, len(builder._backing_indices))
+ self.assertEqual(None, builder._backing_indices[0])
+ self.assertEqual(4, builder._backing_indices[1].key_count())
+ # The next spills to the 2-len slot
+ builder.add_node(*nodes[4])
+ builder.add_node(*nodes[5])
+ self.assertEqual(0, len(builder._nodes))
+ self.assertEqual(0, len(builder._keys))
+ self.assertEqual({}, builder._nodes_by_key)
+ self.assertEqual(2, len(builder._backing_indices))
+ self.assertEqual(2, builder._backing_indices[0].key_count())
+ self.assertEqual(4, builder._backing_indices[1].key_count())
+ # Next spill combines
+ builder.add_node(*nodes[6])
+ builder.add_node(*nodes[7])
+ self.assertEqual(3, len(builder._backing_indices))
+ self.assertEqual(None, builder._backing_indices[0])
+ self.assertEqual(None, builder._backing_indices[1])
+ self.assertEqual(8, builder._backing_indices[2].key_count())
+ # And so forth - counting up in binary.
+ builder.add_node(*nodes[8])
+ builder.add_node(*nodes[9])
+ self.assertEqual(3, len(builder._backing_indices))
+ self.assertEqual(2, builder._backing_indices[0].key_count())
+ self.assertEqual(None, builder._backing_indices[1])
+ self.assertEqual(8, builder._backing_indices[2].key_count())
+ builder.add_node(*nodes[10])
+ builder.add_node(*nodes[11])
+ self.assertEqual(3, len(builder._backing_indices))
+ self.assertEqual(None, builder._backing_indices[0])
+ self.assertEqual(4, builder._backing_indices[1].key_count())
+ self.assertEqual(8, builder._backing_indices[2].key_count())
+ builder.add_node(*nodes[12])
+ # Test that memory and disk are both used for query methods; and that
+ # None is skipped over happily.
+ self.assertEqual([(builder,) + node for node in sorted(nodes[:13])],
+ list(builder.iter_all_entries()))
+ # Two nodes - one memory one disk
+ self.assertEqual(set([(builder,) + node for node in nodes[11:13]]),
+ set(builder.iter_entries([nodes[12][0], nodes[11][0]])))
+ self.assertEqual(13, builder.key_count())
+ self.assertEqual(set([(builder,) + node for node in nodes[11:13]]),
+ set(builder.iter_entries_prefix([nodes[12][0], nodes[11][0]])))
+ builder.add_node(*nodes[13])
+ self.assertEqual(3, len(builder._backing_indices))
+ self.assertEqual(2, builder._backing_indices[0].key_count())
+ self.assertEqual(4, builder._backing_indices[1].key_count())
+ self.assertEqual(8, builder._backing_indices[2].key_count())
+ builder.add_node(*nodes[14])
+ builder.add_node(*nodes[15])
+ self.assertEqual(4, len(builder._backing_indices))
+ self.assertEqual(None, builder._backing_indices[0])
+ self.assertEqual(None, builder._backing_indices[1])
+ self.assertEqual(None, builder._backing_indices[2])
+ self.assertEqual(16, builder._backing_indices[3].key_count())
+ # Now finish, and check we got a correctly ordered tree
+ transport = self.get_transport('')
+ size = transport.put_file('index', builder.finish())
+ index = btree_index.BTreeGraphIndex(transport, 'index', size)
+ nodes = list(index.iter_all_entries())
+ self.assertEqual(sorted(nodes), nodes)
+ self.assertEqual(16, len(nodes))
+
+ def test_spill_index_duplicate_key_caught_on_finish(self):
+ builder = btree_index.BTreeBuilder(key_elements=1, spill_at=2)
+ nodes = [node[0:2] for node in self.make_nodes(16, 1, 0)]
+ builder.add_node(*nodes[0])
+ builder.add_node(*nodes[1])
+ builder.add_node(*nodes[0])
+ self.assertRaises(bzrerrors.BadIndexDuplicateKey, builder.finish)
+
class TestBTreeIndex(BTreeTestCase):
More information about the bazaar-commits
mailing list