Rev 2895: Create a content lookup function for bisection in GraphIndex. in http://people.ubuntu.com/~robertc/baz2.0/index

Robert Collins robertc at robertcollins.net
Sun Oct 7 23:05:01 BST 2007


At http://people.ubuntu.com/~robertc/baz2.0/index

------------------------------------------------------------
revno: 2895
revision-id: robertc at robertcollins.net-20071007220449-stt24xz9eaaj703l
parent: robertc at robertcollins.net-20071006073049-opssv3mahh830h6v
committer: Robert Collins <robertc at robertcollins.net>
branch nick: index
timestamp: Mon 2007-10-08 08:04:49 +1000
message:
  Create a content lookup function for bisection in GraphIndex.
modified:
  bzrlib/index.py                index.py-20070712131115-lolkarso50vjr64s-1
  bzrlib/tests/test_index.py     test_index.py-20070712131115-lolkarso50vjr64s-2
=== modified file 'bzrlib/index.py'
--- a/bzrlib/index.py	2007-10-05 04:57:03 +0000
+++ b/bzrlib/index.py	2007-10-07 22:04:49 +0000
@@ -24,6 +24,7 @@
     'InMemoryGraphIndex',
     ]
 
+from bisect import bisect_right
 from cStringIO import StringIO
 import re
 
@@ -245,7 +246,13 @@
         self._transport = transport
         self._name = name
         self._nodes = None
+        # a sorted list of slice-addresses for the parsed bytes of the file.
+        # e.g. (0,1) would mean that byte 0 is parsed.
         self._parsed_byte_map = []
+        # a sorted list of keys matching each slice address for parsed bytes
+        # e.g. (None, 'foo at bar') would mean that the first byte contained no
+        # key, and the end byte of the slice is the of the data for 'foo at bar'
+        self._parsed_key_map = []
         self._key_count = None
         self._keys_by_offset = None
         self._nodes_by_key = None
@@ -367,6 +374,41 @@
         except ValueError:
             raise errors.BadIndexOptions(self)
 
+    def _parsed_byte_index(self, offset):
+        """Return the index of the entry immediately before offset.
+
+        e.g. if the parsed map has regions 0,10 and 11,12 parsed, meaning that
+        there is one unparsed byte (the 11th, addressed as[10]). then:
+        asking for 0 will return 0
+        asking for 10 will return 0
+        asking for 11 will return 1
+        asking for 12 will return 1
+        """
+        key = (offset, 0)
+        return bisect_right(self._parsed_byte_map, key) - 1
+
+    def _parsed_key_index(self, key):
+        """Return the index of the entry immediately before key.
+
+        e.g. if the parsed map has regions (None, 'a') and ('b','c') parsed,
+        meaning that keys from None to 'a' inclusive, and 'b' to 'c' inclusive
+        have been parsed, then:
+        asking for '' will return 0
+        asking for 'a' will return 0
+        asking for 'b' will return 1
+        asking for 'e' will return 1
+        """
+        search_key = (key, 0)
+        return bisect_right(self._parsed_key_map, search_key) - 1
+
+    def _is_parsed(self, offset):
+        """Returns True if offset has been parsed."""
+        index = self._parsed_byte_index(offset)
+        if index == len(self._parsed_byte_map):
+            return offset < self._parsed_byte_map[index - 1][1]
+        start, end = self._parsed_byte_map[index]
+        return offset >= start and offset < end
+
     def iter_entries(self, keys):
         """Iterate over keys within the index.
 
@@ -471,6 +513,311 @@
             self._buffer_all()
         return self._key_count
 
+    def lookup_keys_via_location(self, location_keys):
+        """Public interface for implementing bisection.
+
+        :param location_keys: A list of location, key tuples.
+        :return: A list of (location_key, result) tuples as expected by
+            bzrlib.bisect_multi.bisect_multi_bytes.
+        """
+        # Possible improvements:
+        #  - only bisect lookup each key once
+        #  - sort the keys first, and use that to reduce the bisection window
+        # ----- 
+        # this progresses in three parts:
+        # read data
+        # parse it
+        # attempt to answer the question from the now in memory data.
+        # build the readv request
+        # for each location, ask for 800 bytes - much more than rows we've seen
+        # anywhere.
+        readv_ranges = []
+        for location, key in location_keys:
+            # can we answer from cache?
+            index = self._parsed_key_index(key)
+            if (len(self._parsed_key_map) and 
+                self._parsed_key_map[index][0] <= key and
+                self._parsed_key_map[index][1] > key):
+                # the key has been parsed, so no lookup is needed
+                continue
+            length = 800
+            if location + length > self._size:
+                length = self._size - location
+            # todo, trim out parsed locations.
+            if length > 0:
+                readv_ranges.append((location, length))
+        # read the header if needed
+        if self._nodes is None:
+            readv_ranges.append((0, 200))
+        if readv_ranges:
+            readv_data = self._transport.readv(self._name, readv_ranges, True,
+                self._size)
+            # parse
+            for offset, data in readv_data:
+                if self._nodes is None:
+                    # this must be the start
+                    assert offset == 0
+                    offset, data = self._parse_header_from_bytes(data)
+                self._parse_region(offset, data)
+                # print offset, len(data), data
+        # generate results:
+        #  - figure out <, >, missing, present
+        #  - result present references so we can return them.
+        result = []
+        for location, key in location_keys:
+            # can we answer from cache?
+            index = self._parsed_key_index(key)
+            if (self._parsed_key_map[index][0] <= key and
+                (self._parsed_key_map[index][1] > key or
+                 # end of the file has been parsed
+                 self._parsed_byte_map[index][1] == self._size)):
+                # the key has been parsed, so no lookup is needed
+                if key in self._nodes:
+                    if self.node_ref_lists:
+                        value, refs = self._nodes[key]
+                        result.append(((location, key), (self, key, value, refs)))
+                    else:
+                        result.append(((location, key), (self, key, self._nodes[key])))
+                else:
+                    result.append(((location, key), False))
+                continue
+            # no, is the key above or below the probed location:
+            # get the range of the probed & parsed location
+            index = self._parsed_byte_index(location)
+            # if the key is below the start of the range, its below
+            if key < self._parsed_key_map[index][0]:
+                direction = -1
+            else:
+                direction = +1
+            result.append(((location, key), direction))
+        return result
+
+    def _parse_header_from_bytes(self, bytes):
+        """Parse the header from a region of bytes.
+
+        :param bytes: The data to parse.
+        :return: An offset, data tuple such as readv yields, for the unparsed
+            data. (which may length 0).
+        """
+        signature = bytes[0:len(self._signature())]
+        if not signature == self._signature():
+            raise errors.BadIndexFormatSignature(self._name, GraphIndex)
+        lines = bytes[len(self._signature()):].splitlines()
+        options_line = lines[0]
+        if not options_line.startswith(_OPTION_NODE_REFS):
+            raise errors.BadIndexOptions(self)
+        try:
+            self.node_ref_lists = int(options_line[len(_OPTION_NODE_REFS):])
+        except ValueError:
+            raise errors.BadIndexOptions(self)
+        options_line = lines[1]
+        if not options_line.startswith(_OPTION_KEY_ELEMENTS):
+            raise errors.BadIndexOptions(self)
+        try:
+            self._key_length = int(options_line[len(_OPTION_KEY_ELEMENTS):])
+        except ValueError:
+            raise errors.BadIndexOptions(self)
+        options_line = lines[2]
+        if not options_line.startswith(_OPTION_LEN):
+            raise errors.BadIndexOptions(self)
+        try:
+            self._key_count = int(options_line[len(_OPTION_LEN):])
+        except ValueError:
+            raise errors.BadIndexOptions(self)
+        # calculate the bytes we have processed
+        header_end = (len(signature) + len(lines[0]) + len(lines[1]) +
+            len(lines[2]) + 3)
+        self._parsed_bytes(0, None, header_end, None)
+        # setup parsing state
+        self._expected_elements = 3 + self._key_length
+        # raw data keyed by offset
+        self._keys_by_offset = {}
+        # ready-to-return key:value or key:value, node_ref_lists
+        self._nodes = {}
+        self._nodes_by_key = {}
+        return header_end, bytes[header_end:]
+
+    def _parse_region(self, offset, data):
+        """Parse node data returned from a readv operation.
+
+        :param offset: The byte offset the data starts at.
+        :param data: The data to parse.
+        """
+        # trim the data.
+        # end first:
+        end = offset + len(data)
+        index = self._parsed_byte_index(offset)
+        # default is to use all data
+        trim_end = None
+        # trivial check for entirely parsed data:
+        if end < self._parsed_byte_map[index][1]:
+            return
+        # accomodate overlap with data before this.
+        if offset < self._parsed_byte_map[index][1]:
+            # overlaps the lower parsed region
+            # skip the parsed data
+            trim_start = self._parsed_byte_map[index][1] - offset
+            # don't trim the start for \n
+            start_adjacent = True
+        elif offset == self._parsed_byte_map[index][1]:
+            # abuts the lower parsed region
+            # use all data
+            trim_start = None
+            # do not trim anything
+            start_adjacent = True
+        else:
+            # does not overlap the lower parsed region
+            # use all data
+            trim_start = None
+            # but trim the leading \n
+            start_adjacent = False
+        if end == self._size:
+            # lines up to the end of all data:
+            # use it all
+            trim_end = None
+            # do not strip to the last \n
+            end_adjacent = True
+        elif index + 1 == len(self._parsed_byte_map):
+            # at the end of the parsed data
+            # use it all
+            trim_end = None
+            # but strip to the last \n
+            end_adjacent = False
+        elif end == self._parsed_byte_map[index + 1][0]:
+            # buts up against the next parsed region
+            # use it all
+            trim_end = None
+            # do not strip to the last \n
+            end_adjacent = True
+        elif end > self._parsed_byte_map[index + 1][0]:
+            # overlaps into the next parsed region
+            # only consider the unparsed data
+            trim_end = self._parsed_byte_map[index + 1][0] - offset
+            # do not strip to the last \n as we know its an entire record
+            end_adjacent = True
+        else:
+            # does not overlap into the next region
+            # use it all
+            trim_end = None
+            # but strip to the last \n
+            end_adjacent = False
+        # now find bytes to discard if needed
+        if not start_adjacent:
+            # work around python bug in rfind
+            if trim_start is None:
+                trim_start = data.find('\n') + 1
+            else:
+                trim_start = data.find('\n', trim_start) + 1
+            assert trim_start != 0, 'no \n was present'
+            # print 'removing start', offset, trim_start, repr(data[:trim_start])
+        if not end_adjacent:
+            # work around python bug in rfind
+            if trim_end is None:
+                trim_end = data.rfind('\n') + 1
+            else:
+                trim_end = data.rfind('\n', None, trim_end) + 1
+            assert trim_end != 0, 'no \n was present'
+            # print 'removing end', offset, trim_end, repr(data[trim_end:])
+        # adjust offset and data to the parseable data.
+        data = data[trim_start:trim_end]
+        if trim_start:
+            offset += trim_start
+        # print "parsing", repr(data)
+        lines = data.splitlines()
+        pos = offset
+        first_key = None
+        key = None
+        for line in lines:
+            if line == '':
+                # must be at the end
+                assert self._size == pos + 1, "%s %s" % (self._size, pos)
+                continue
+            elements = line.split('\0')
+            if len(elements) != self._expected_elements:
+                raise errors.BadIndexData(self)
+            # keys are tuples
+            key = tuple(elements[:self._key_length])
+            if first_key is None:
+                first_key = key
+            absent, references, value = elements[-3:]
+            ref_lists = []
+            for ref_string in references.split('\t'):
+                ref_lists.append(tuple([
+                    int(ref) for ref in ref_string.split('\r') if ref
+                    ]))
+            ref_lists = tuple(ref_lists)
+            self._keys_by_offset[pos] = (key, absent, ref_lists, value)
+            pos += len(line) + 1 # +1 for the \n
+        self._parsed_bytes(offset, first_key, offset + len(data), key)
+        # XXXXXX repeated work here.
+        for key, absent, references, value in self._keys_by_offset.itervalues():
+            if absent:
+                continue
+            # resolve references:
+            if self.node_ref_lists:
+                node_refs = []
+                for ref_list in references:
+                    node_refs.append(tuple([self._keys_by_offset[ref][0] for ref in ref_list]))
+                node_value = (value, tuple(node_refs))
+            else:
+                node_value = value
+            self._nodes[key] = node_value
+            if self._key_length > 1:
+                subkey = list(reversed(key[:-1]))
+                key_dict = self._nodes_by_key
+                if self.node_ref_lists:
+                    key_value = key, node_value[0], node_value[1]
+                else:
+                    key_value = key, node_value
+                # possibly should do this on-demand, but it seems likely it is 
+                # always wanted
+                # For a key of (foo, bar, baz) create
+                # _nodes_by_key[foo][bar][baz] = key_value
+                for subkey in key[:-1]:
+                    key_dict = key_dict.setdefault(subkey, {})
+                key_dict[key[-1]] = key_value
+
+    def _parsed_bytes(self, start, start_key, end, end_key):
+        """Mark the bytes from start to end as parsed.
+
+        Calling self._parsed_bytes(1,2) will mark one byte (the one at offset
+        1) as parsed.
+
+        :param start: The start of the parsed region.
+        :param end: The end of the parsed region.
+        """
+        key = (start, 0)
+        index = bisect_right(self._parsed_byte_map, key)
+        new_value = (start, end)
+        new_key = (start_key, end_key)
+        if index == 0:
+            # first range parsed is always the beginning.
+            self._parsed_byte_map.insert(index, new_value)
+            self._parsed_key_map.insert(index, new_key)
+        elif index == len(self._parsed_byte_map):
+            # may be adjacent to the prior entry
+            if start == self._parsed_byte_map[index - 1][1]:
+                self._parsed_byte_map[index - 1] = (
+                    self._parsed_byte_map[index - 1][0], end)
+                self._parsed_key_map[index - 1] = (
+                    self._parsed_key_map[index - 1][0], end_key)
+            else:
+                #no, new entry
+                self._parsed_byte_map.insert(index, new_value)
+                self._parsed_key_map.insert(index, new_key)
+        else:
+            # may be adjacent to the next entry
+            if end == self._parsed_byte_map[index][0]:
+                # move the next entry down to this range
+                self._parsed_byte_map[index] = (
+                    start, self._parsed_byte_map[index][1])
+                self._parsed_key_map[index] = (
+                    start_key, self._parsed_key_map[index][1])
+            else:
+                # no, new entry
+                self._parsed_byte_map.insert(index, new_value)
+                self._parsed_key_map.insert(index, new_key)
+
     def _signature(self):
         """The file signature for this index type."""
         return _SIGNATURE

=== modified file 'bzrlib/tests/test_index.py'
--- a/bzrlib/tests/test_index.py	2007-10-05 04:57:03 +0000
+++ b/bzrlib/tests/test_index.py	2007-10-07 22:04:49 +0000
@@ -19,6 +19,7 @@
 from bzrlib import errors
 from bzrlib.index import *
 from bzrlib.tests import TestCaseWithMemoryTransport
+from bzrlib.transport import get_transport
 
 
 class TestGraphIndexBuilder(TestCaseWithMemoryTransport):
@@ -357,7 +358,7 @@
         for key, value, references in nodes:
             builder.add_node(key, value, references)
         stream = builder.finish()
-        trans = self.get_transport()
+        trans = get_transport('trace+' + self.get_url())
         size = trans.put_file('index', stream)
         return GraphIndex(trans, 'index', size)
 
@@ -369,6 +370,181 @@
     def test_open_sets_parsed_map_empty(self):
         index = self.make_index()
         self.assertEqual([], index._parsed_byte_map)
+        self.assertEqual([], index._parsed_key_map)
+
+    def test_first_lookup_key_via_location(self):
+        index = self.make_index()
+        # reset the transport log
+        del index._transport._activity[:]
+        # do a lookup_keys_via_location call for the middle of the file, which
+        # is what bisection uses.
+        result = index.lookup_keys_via_location(
+            [(index._size // 2, ('missing', ))])
+        # this should have asked for a readv request, with adjust_for_latency,
+        # and two regions: the header, and half-way into the file.
+        self.assertEqual([
+            ('readv', 'index', [(30, 30), (0, 200)], True, 60),
+            ],
+            index._transport._activity)
+        # and the result should be that the key cannot be present, because this
+        # is a trivial index.
+        self.assertEqual([((index._size // 2, ('missing', )), False)],
+            result)
+        # And the regions of the file that have been parsed - in this case the
+        # entire file - should be in the parsed region map.
+        self.assertEqual([(0, 60)], index._parsed_byte_map)
+        self.assertEqual([(None, None)], index._parsed_key_map)
+
+    def test_parsing_parses_data_adjacent_to_parsed_regions(self):
+        # we trim data we recieve to remove the first and trailing
+        # partial lines, except when they start at the end/finish at the start
+        # of a region we've alread parsed/ the end of the file. The trivial
+        # test for this is an index with 1 key.
+        index = self.make_index(nodes=[(('name', ), 'data', ())])
+        # reset the transport log
+        del index._transport._activity[:]
+        result = index.lookup_keys_via_location(
+            [(index._size // 2, ('missing', ))])
+        # this should have asked for a readv request, with adjust_for_latency,
+        # and two regions: the header, and half-way into the file.
+        self.assertEqual([
+            ('readv', 'index', [(36, 36), (0, 200)], True, 72),
+            ],
+            index._transport._activity)
+        # and the result should be that the key cannot be present, because this
+        # is a trivial index and we should not have to do more round trips.
+        self.assertEqual([((index._size // 2, ('missing', )), False)],
+            result)
+        # The whole file should be parsed at this point.
+        self.assertEqual([(0, 72)], index._parsed_byte_map)
+        self.assertEqual([(None, ('name',))], index._parsed_key_map)
+
+    ### - tests:
+    # data with a reference that won't be accessed by the default readv request
+    # change the _nodes dict to a bisectable list, or perhaps an adjacent list.
+    # on result generation,
+
+    def test_parsing_non_adjacent_data_trims(self):
+        # generate a big enough index that we only read some of it on a typical
+        # bisection lookup.
+        nodes = []
+        def make_key(number):
+            return (str(number) + 'X'*100,)
+        for counter in range(64):
+            nodes.append((make_key(counter), 'Y'*100, ()))
+        index = self.make_index(nodes=nodes)
+        result = index.lookup_keys_via_location(
+            [(index._size // 2, ('40', ))])
+        # and the result should be that the key cannot be present, because key is
+        # in the middle of the observed data from a 4K read - the smallest transport
+        # will do today with this api.
+        self.assertEqual([((index._size // 2, ('40', )), False)],
+            result)
+        # and we should have a parse map that includes the header and the
+        # region that was parsed after trimming.
+        self.assertEqual([(0, 3972), (5001, 8914)], index._parsed_byte_map)
+        self.assertEqual([(None, make_key(26)), (make_key(31), make_key(48))],
+            index._parsed_key_map)
+
+    def test_lookup_missing_key_answers_without_io_when_map_permits(self):
+        # generate a big enough index that we only read some of it on a typical
+        # bisection lookup.
+        nodes = []
+        def make_key(number):
+            return (str(number) + 'X'*100,)
+        for counter in range(64):
+            nodes.append((make_key(counter), 'Y'*100, ()))
+        index = self.make_index(nodes=nodes)
+        # lookup the keys in the middle of the file
+        result =index.lookup_keys_via_location(
+            [(index._size // 2, ('40', ))])
+        # check the parse map, this determines the test validity
+        self.assertEqual([(0, 3972), (5001, 8914)], index._parsed_byte_map)
+        self.assertEqual([(None, make_key(26)), (make_key(31), make_key(48))],
+            index._parsed_key_map)
+        # reset the transport log
+        del index._transport._activity[:]
+        # now looking up a key in the portion of the file already parsed should
+        # not create a new transport request, and should return False (cannot
+        # be in the index) - even when the byte location we ask for is outside
+        # the parsed region
+        # 
+        result = index.lookup_keys_via_location(
+            [(4000, ('40', ))])
+        self.assertEqual([((4000, ('40', )), False)],
+            result)
+        self.assertEqual([], index._transport._activity)
+
+    def test_lookup_present_key_answers_without_io_when_map_permits(self):
+        # generate a big enough index that we only read some of it on a typical
+        # bisection lookup.
+        nodes = []
+        def make_key(number):
+            return (str(number) + 'X'*100,)
+        def make_value(number):
+            return str(number) + 'Y'*100
+        for counter in range(64):
+            nodes.append((make_key(counter), make_value(counter), ()))
+        index = self.make_index(nodes=nodes)
+        # lookup the keys in the middle of the file
+        result =index.lookup_keys_via_location(
+            [(index._size // 2, ('40', ))])
+        # check the parse map, this determines the test validity
+        self.assertEqual([(0, 4008), (5046, 8996)], index._parsed_byte_map)
+        self.assertEqual([(None, make_key(26)), (make_key(31), make_key(48))],
+            index._parsed_key_map)
+        # reset the transport log
+        del index._transport._activity[:]
+        # now looking up a key in the portion of the file already parsed should
+        # not create a new transport request, and should return False (cannot
+        # be in the index) - even when the byte location we ask for is outside
+        # the parsed region
+        # 
+        result = index.lookup_keys_via_location([(4000, make_key(40))])
+        self.assertEqual(
+            [((4000, make_key(40)), (index, make_key(40), make_value(40)))],
+            result)
+        self.assertEqual([], index._transport._activity)
+
+    def test_lookup_key_below_probed_area(self):
+        # generate a big enough index that we only read some of it on a typical
+        # bisection lookup.
+        nodes = []
+        def make_key(number):
+            return (str(number) + 'X'*100,)
+        for counter in range(64):
+            nodes.append((make_key(counter), 'Y'*100, ()))
+        index = self.make_index(nodes=nodes)
+        # ask for the key in the middle, but a key that is located in the
+        # unparsed region before the middle.
+        result =index.lookup_keys_via_location(
+            [(index._size // 2, ('30', ))])
+        # check the parse map, this determines the test validity
+        self.assertEqual([(0, 3972), (5001, 8914)], index._parsed_byte_map)
+        self.assertEqual([(None, make_key(26)), (make_key(31), make_key(48))],
+            index._parsed_key_map)
+        self.assertEqual([((index._size // 2, ('30', )), -1)],
+            result)
+
+    def test_lookup_key_above_probed_area(self):
+        # generate a big enough index that we only read some of it on a typical
+        # bisection lookup.
+        nodes = []
+        def make_key(number):
+            return (str(number) + 'X'*100,)
+        for counter in range(64):
+            nodes.append((make_key(counter), 'Y'*100, ()))
+        index = self.make_index(nodes=nodes)
+        # ask for the key in the middle, but a key that is located in the
+        # unparsed region after the middle.
+        result =index.lookup_keys_via_location(
+            [(index._size // 2, ('50', ))])
+        # check the parse map, this determines the test validity
+        self.assertEqual([(0, 3972), (5001, 8914)], index._parsed_byte_map)
+        self.assertEqual([(None, make_key(26)), (make_key(31), make_key(48))],
+            index._parsed_key_map)
+        self.assertEqual([((index._size // 2, ('50', )), +1)],
+            result)
 
     def test_iter_all_entries_empty(self):
         index = self.make_index()



More information about the bazaar-commits mailing list