Rev 2953: Partial refactoring of pack_repo to create a Packer object for packing. in http://people.ubuntu.com/~robertc/baz2.0/reconcile
Robert Collins
robertc at robertcollins.net
Wed Oct 31 20:53:28 GMT 2007
At http://people.ubuntu.com/~robertc/baz2.0/reconcile
------------------------------------------------------------
revno: 2953
revision-id:robertc at robertcollins.net-20071031205323-pbtan7ltoci9slpq
parent: robertc at robertcollins.net-20071031165438-lc0o11v5vaf3p1ta
committer: Robert Collins <robertc at robertcollins.net>
branch nick: reconcile
timestamp: Thu 2007-11-01 07:53:23 +1100
message:
Partial refactoring of pack_repo to create a Packer object for packing.
modified:
bzrlib/reconcile.py reweave_inventory.py-20051108164726-1e5e0934febac06e
bzrlib/repofmt/pack_repo.py pack_repo.py-20070813041115-gjv5ma7ktfqwsjgn-1
bzrlib/repository.py rev_storage.py-20051111201905-119e9401e46257e3
bzrlib/tests/test_repository.py test_repository.py-20060131075918-65c555b881612f4d
=== modified file 'bzrlib/reconcile.py'
--- a/bzrlib/reconcile.py 2007-10-26 06:58:43 +0000
+++ b/bzrlib/reconcile.py 2007-10-31 20:53:23 +0000
@@ -467,3 +467,30 @@
def _reconcile_steps(self):
"""Perform the steps to reconcile this repository."""
+ if not self.thorough:
+ return
+ self.repo.lock_write()
+ try:
+ self.repo._pack_collection.ensure_loaded()
+ self.repo._pack_collection.lock_names()
+ try:
+ self.repo.start_write_group()
+ try:
+ self._new_pack = self.repo._pack_collection._new_pack
+ self._copy_revisions()
+ except:
+ self.repo.abort_write_group()
+ raise
+ else:
+ self.repo.commit_write_group()
+ finally:
+ self.repo._pack_collection._unlock_names()
+ finally:
+ self.repo.unlock()
+
+ def _copy_revisions(self):
+ """Copy revisions, regenerating the index as we go."""
+
+ def _pack_correcting_data(self):
+ """Perform a total pack, regenerating as much data as possible."""
+ revisions = self.repo.all_revision_ids()
=== modified file 'bzrlib/repofmt/pack_repo.py'
--- a/bzrlib/repofmt/pack_repo.py 2007-10-31 16:54:38 +0000
+++ b/bzrlib/repofmt/pack_repo.py 2007-10-31 20:53:23 +0000
@@ -475,126 +475,16 @@
self.knit_access.set_writer(None, None, (None, None))
-class RepositoryPackCollection(object):
- """Management of packs within a repository."""
-
- def __init__(self, repo, transport, index_transport, upload_transport,
- pack_transport):
- """Create a new RepositoryPackCollection.
-
- :param transport: Addresses the repository base directory
- (typically .bzr/repository/).
- :param index_transport: Addresses the directory containing indices.
- :param upload_transport: Addresses the directory into which packs are written
- while they're being created.
- :param pack_transport: Addresses the directory of existing complete packs.
- """
- self.repo = repo
- self.transport = transport
- self._index_transport = index_transport
- self._upload_transport = upload_transport
- self._pack_transport = pack_transport
- self._suffix_offsets = {'.rix': 0, '.iix': 1, '.tix': 2, '.six': 3}
- self.packs = []
- # name:Pack mapping
- self._packs_by_name = {}
- # the previous pack-names content
- self._packs_at_load = None
- # when a pack is being created by this object, the state of that pack.
- self._new_pack = None
- # aggregated revision index data
- self.revision_index = AggregateIndex()
- self.inventory_index = AggregateIndex()
- self.text_index = AggregateIndex()
- self.signature_index = AggregateIndex()
-
- def add_pack_to_memory(self, pack):
- """Make a Pack object available to the repository to satisfy queries.
-
- :param pack: A Pack object.
- """
- assert pack.name not in self._packs_by_name
- self.packs.append(pack)
- self._packs_by_name[pack.name] = pack
- self.revision_index.add_index(pack.revision_index, pack)
- self.inventory_index.add_index(pack.inventory_index, pack)
- self.text_index.add_index(pack.text_index, pack)
- self.signature_index.add_index(pack.signature_index, pack)
-
- def _add_text_to_weave(self, file_id, revision_id, new_lines, parents,
- nostore_sha, random_revid):
- file_id_index = GraphIndexPrefixAdapter(
- self.text_index.combined_index,
- (file_id, ), 1,
- add_nodes_callback=self.text_index.add_callback)
- self.repo._text_knit._index._graph_index = file_id_index
- self.repo._text_knit._index._add_callback = file_id_index.add_nodes
- return self.repo._text_knit.add_lines_with_ghosts(
- revision_id, parents, new_lines, nostore_sha=nostore_sha,
- random_id=random_revid, check_content=False)[0:2]
-
- def all_packs(self):
- """Return a list of all the Pack objects this repository has.
-
- Note that an in-progress pack being created is not returned.
-
- :return: A list of Pack objects for all the packs in the repository.
- """
- result = []
- for name in self.names():
- result.append(self.get_pack_by_name(name))
- return result
-
- def autopack(self):
- """Pack the pack collection incrementally.
-
- This will not attempt global reorganisation or recompression,
- rather it will just ensure that the total number of packs does
- not grow without bound. It uses the _max_pack_count method to
- determine if autopacking is needed, and the pack_distribution
- method to determine the number of revisions in each pack.
-
- If autopacking takes place then the packs name collection will have
- been flushed to disk - packing requires updating the name collection
- in synchronisation with certain steps. Otherwise the names collection
- is not flushed.
-
- :return: True if packing took place.
- """
- # XXX: Should not be needed when the management of indices is sane.
- total_revisions = self.revision_index.combined_index.key_count()
- total_packs = len(self._names)
- if self._max_pack_count(total_revisions) >= total_packs:
- return False
- # XXX: the following may want to be a class, to pack with a given
- # policy.
- mutter('Auto-packing repository %s, which has %d pack files, '
- 'containing %d revisions into %d packs.', self, total_packs,
- total_revisions, self._max_pack_count(total_revisions))
- # determine which packs need changing
- pack_distribution = self.pack_distribution(total_revisions)
- existing_packs = []
- for pack in self.all_packs():
- revision_count = pack.get_revision_count()
- if revision_count == 0:
- # revision less packs are not generated by normal operation,
- # only by operations like sign-my-commits, and thus will not
- # tend to grow rapdily or without bound like commit containing
- # packs do - leave them alone as packing them really should
- # group their data with the relevant commit, and that may
- # involve rewriting ancient history - which autopack tries to
- # avoid. Alternatively we could not group the data but treat
- # each of these as having a single revision, and thus add
- # one revision for each to the total revision count, to get
- # a matching distribution.
- continue
- existing_packs.append((revision_count, pack))
- pack_operations = self.plan_autopack_combinations(
- existing_packs, pack_distribution)
- self._execute_pack_operations(pack_operations)
- return True
-
- def create_pack_from_packs(self, packs, suffix, revision_ids=None):
+class Packer(object):
+ """Create a pack from packs."""
+
+ def __init__(self, pack_collection, packs, suffix, revision_ids=None):
+ self.packs = packs
+ self.suffix = suffix
+ self.revision_ids = revision_ids
+ self._pack_collection = pack_collection
+
+ def pack(self):
"""Create a new pack by reading data from other packs.
This does little more than a bulk copy of data. One key difference
@@ -614,57 +504,61 @@
# - which has already been flushed, so its safe.
# XXX: - duplicate code warning with start_write_group; fix before
# considering 'done'.
- if self._new_pack is not None:
+ if self._pack_collection._new_pack is not None:
raise errors.BzrError('call to create_pack_from_packs while '
'another pack is being written.')
- if revision_ids is not None:
- if len(revision_ids) == 0:
+ if self.revision_ids is not None:
+ if len(self.revision_ids) == 0:
# silly fetch request.
return None
else:
- revision_ids = frozenset(revision_ids)
- pb = ui.ui_factory.nested_progress_bar()
+ self.revision_ids = frozenset(self.revision_ids)
+ self.pb = ui.ui_factory.nested_progress_bar()
try:
- return self._create_pack_from_packs(packs, suffix, revision_ids,
- pb)
+ return self._create_pack_from_packs()
finally:
- pb.finished()
-
- def _create_pack_from_packs(self, packs, suffix, revision_ids, pb):
- pb.update("Opening pack", 0, 5)
- new_pack = NewPack(self._upload_transport, self._index_transport,
- self._pack_transport, upload_suffix=suffix)
+ self.pb.finished()
+
+ def open_pack(self):
+ """Open a pack for the pack we are creating."""
+ return NewPack(self._pack_collection._upload_transport,
+ self._pack_collection._index_transport,
+ self._pack_collection._pack_transport, upload_suffix=self.suffix)
+
+ def _create_pack_from_packs(self):
+ self.pb.update("Opening pack", 0, 5)
+ new_pack = self.open_pack()
# buffer data - we won't be reading-back during the pack creation and
# this makes a significant difference on sftp pushes.
new_pack.set_write_cache_size(1024*1024)
if 'pack' in debug.debug_flags:
plain_pack_list = ['%s%s' % (a_pack.pack_transport.base, a_pack.name)
- for a_pack in packs]
- if revision_ids is not None:
- rev_count = len(revision_ids)
+ for a_pack in self.packs]
+ if self.revision_ids is not None:
+ rev_count = len(self.revision_ids)
else:
rev_count = 'all'
mutter('%s: create_pack: creating pack from source packs: '
'%s%s %s revisions wanted %s t=0',
- time.ctime(), self._upload_transport.base, new_pack.random_name,
+ time.ctime(), self._pack_collection._upload_transport.base, new_pack.random_name,
plain_pack_list, rev_count)
# select revisions
- if revision_ids:
- revision_keys = [(revision_id,) for revision_id in revision_ids]
+ if self.revision_ids:
+ revision_keys = [(revision_id,) for revision_id in self.revision_ids]
else:
revision_keys = None
# select revision keys
- revision_index_map = self._packs_list_to_pack_map_and_index_list(
- packs, 'revision_index')[0]
- revision_nodes = self._index_contents(revision_index_map, revision_keys)
+ revision_index_map = self._pack_collection._packs_list_to_pack_map_and_index_list(
+ self.packs, 'revision_index')[0]
+ revision_nodes = self._pack_collection._index_contents(revision_index_map, revision_keys)
# copy revision keys and adjust values
- pb.update("Copying revision texts", 1)
+ self.pb.update("Copying revision texts", 1)
list(self._copy_nodes_graph(revision_nodes, revision_index_map,
new_pack._writer, new_pack.revision_index))
if 'pack' in debug.debug_flags:
mutter('%s: create_pack: revisions copied: %s%s %d items t+%6.3fs',
- time.ctime(), self._upload_transport.base, new_pack.random_name,
+ time.ctime(), self._pack_collection._upload_transport.base, new_pack.random_name,
new_pack.revision_index.key_count(),
time.time() - new_pack.start_time)
# select inventory keys
@@ -672,18 +566,18 @@
# querying for keys here could introduce a bug where an inventory item
# is missed, so do not change it to query separately without cross
# checking like the text key check below.
- inventory_index_map = self._packs_list_to_pack_map_and_index_list(
- packs, 'inventory_index')[0]
- inv_nodes = self._index_contents(inventory_index_map, inv_keys)
+ inventory_index_map = self._pack_collection._packs_list_to_pack_map_and_index_list(
+ self.packs, 'inventory_index')[0]
+ inv_nodes = self._pack_collection._index_contents(inventory_index_map, inv_keys)
# copy inventory keys and adjust values
# XXX: Should be a helper function to allow different inv representation
# at this point.
- pb.update("Copying inventory texts", 2)
+ self.pb.update("Copying inventory texts", 2)
inv_lines = self._copy_nodes_graph(inv_nodes, inventory_index_map,
new_pack._writer, new_pack.inventory_index, output_lines=True)
- if revision_ids:
- fileid_revisions = self.repo._find_file_ids_from_xml_inventory_lines(
- inv_lines, revision_ids)
+ if self.revision_ids:
+ fileid_revisions = self._pack_collection.repo._find_file_ids_from_xml_inventory_lines(
+ inv_lines, self.revision_ids)
text_filter = []
for fileid, file_revids in fileid_revisions.iteritems():
text_filter.extend(
@@ -694,13 +588,13 @@
text_filter = None
if 'pack' in debug.debug_flags:
mutter('%s: create_pack: inventories copied: %s%s %d items t+%6.3fs',
- time.ctime(), self._upload_transport.base, new_pack.random_name,
+ time.ctime(), self._pack_collection._upload_transport.base, new_pack.random_name,
new_pack.inventory_index.key_count(),
time.time() - new_pack.start_time)
# select text keys
- text_index_map = self._packs_list_to_pack_map_and_index_list(
- packs, 'text_index')[0]
- text_nodes = self._index_contents(text_index_map, text_filter)
+ text_index_map = self._pack_collection._packs_list_to_pack_map_and_index_list(
+ self.packs, 'text_index')[0]
+ text_nodes = self._pack_collection._index_contents(text_index_map, text_filter)
if text_filter is not None:
# We could return the keys copied as part of the return value from
# _copy_nodes_graph but this doesn't work all that well with the
@@ -718,37 +612,277 @@
raise errors.RevisionNotPresent(a_missing_key[1],
a_missing_key[0])
# copy text keys and adjust values
- pb.update("Copying content texts", 3)
+ self.pb.update("Copying content texts", 3)
list(self._copy_nodes_graph(text_nodes, text_index_map,
new_pack._writer, new_pack.text_index))
if 'pack' in debug.debug_flags:
mutter('%s: create_pack: file texts copied: %s%s %d items t+%6.3fs',
- time.ctime(), self._upload_transport.base, new_pack.random_name,
+ time.ctime(), self._pack_collection._upload_transport.base, new_pack.random_name,
new_pack.text_index.key_count(),
time.time() - new_pack.start_time)
# select signature keys
signature_filter = revision_keys # same keyspace
- signature_index_map = self._packs_list_to_pack_map_and_index_list(
- packs, 'signature_index')[0]
- signature_nodes = self._index_contents(signature_index_map,
+ signature_index_map = self._pack_collection._packs_list_to_pack_map_and_index_list(
+ self.packs, 'signature_index')[0]
+ signature_nodes = self._pack_collection._index_contents(signature_index_map,
signature_filter)
# copy signature keys and adjust values
- pb.update("Copying signature texts", 4)
+ self.pb.update("Copying signature texts", 4)
self._copy_nodes(signature_nodes, signature_index_map, new_pack._writer,
new_pack.signature_index)
if 'pack' in debug.debug_flags:
mutter('%s: create_pack: revision signatures copied: %s%s %d items t+%6.3fs',
- time.ctime(), self._upload_transport.base, new_pack.random_name,
+ time.ctime(), self._pack_collection._upload_transport.base, new_pack.random_name,
new_pack.signature_index.key_count(),
time.time() - new_pack.start_time)
if not new_pack.data_inserted():
new_pack.abort()
return None
- pb.update("Finishing pack", 5)
+ self.pb.update("Finishing pack", 5)
new_pack.finish()
- self.allocate(new_pack)
+ self._pack_collection.allocate(new_pack)
return new_pack
+ def _copy_nodes(self, nodes, index_map, writer, write_index):
+ """Copy knit nodes between packs with no graph references."""
+ pb = ui.ui_factory.nested_progress_bar()
+ try:
+ return self._do_copy_nodes(nodes, index_map, writer,
+ write_index, pb)
+ finally:
+ pb.finished()
+
+ def _do_copy_nodes(self, nodes, index_map, writer, write_index, pb):
+ # for record verification
+ knit_data = _KnitData(None)
+ # plan a readv on each source pack:
+ # group by pack
+ nodes = sorted(nodes)
+ # how to map this into knit.py - or knit.py into this?
+ # we don't want the typical knit logic, we want grouping by pack
+ # at this point - perhaps a helper library for the following code
+ # duplication points?
+ request_groups = {}
+ for index, key, value in nodes:
+ if index not in request_groups:
+ request_groups[index] = []
+ request_groups[index].append((key, value))
+ record_index = 0
+ pb.update("Copied record", record_index, len(nodes))
+ for index, items in request_groups.iteritems():
+ pack_readv_requests = []
+ for key, value in items:
+ # ---- KnitGraphIndex.get_position
+ bits = value[1:].split(' ')
+ offset, length = int(bits[0]), int(bits[1])
+ pack_readv_requests.append((offset, length, (key, value[0])))
+ # linear scan up the pack
+ pack_readv_requests.sort()
+ # copy the data
+ transport, path = index_map[index]
+ reader = pack.make_readv_reader(transport, path,
+ [offset[0:2] for offset in pack_readv_requests])
+ for (names, read_func), (_1, _2, (key, eol_flag)) in \
+ izip(reader.iter_records(), pack_readv_requests):
+ raw_data = read_func(None)
+ # check the header only
+ df, _ = knit_data._parse_record_header(key[-1], raw_data)
+ df.close()
+ pos, size = writer.add_bytes_record(raw_data, names)
+ write_index.add_node(key, eol_flag + "%d %d" % (pos, size))
+ pb.update("Copied record", record_index)
+ record_index += 1
+
+ def _copy_nodes_graph(self, nodes, index_map, writer, write_index,
+ output_lines=False):
+ """Copy knit nodes between packs.
+
+ :param output_lines: Return lines present in the copied data as
+ an iterator.
+ """
+ pb = ui.ui_factory.nested_progress_bar()
+ try:
+ return self._do_copy_nodes_graph(nodes, index_map, writer,
+ write_index, output_lines, pb)
+ finally:
+ pb.finished()
+
+ def _do_copy_nodes_graph(self, nodes, index_map, writer, write_index,
+ output_lines, pb):
+ # for record verification
+ knit_data = _KnitData(None)
+ # for line extraction when requested (inventories only)
+ if output_lines:
+ factory = knit.KnitPlainFactory()
+ # plan a readv on each source pack:
+ # group by pack
+ nodes = sorted(nodes)
+ # how to map this into knit.py - or knit.py into this?
+ # we don't want the typical knit logic, we want grouping by pack
+ # at this point - perhaps a helper library for the following code
+ # duplication points?
+ request_groups = {}
+ record_index = 0
+ pb.update("Copied record", record_index, len(nodes))
+ for index, key, value, references in nodes:
+ if index not in request_groups:
+ request_groups[index] = []
+ request_groups[index].append((key, value, references))
+ for index, items in request_groups.iteritems():
+ pack_readv_requests = []
+ for key, value, references in items:
+ # ---- KnitGraphIndex.get_position
+ bits = value[1:].split(' ')
+ offset, length = int(bits[0]), int(bits[1])
+ pack_readv_requests.append((offset, length, (key, value[0], references)))
+ # linear scan up the pack
+ pack_readv_requests.sort()
+ # copy the data
+ transport, path = index_map[index]
+ reader = pack.make_readv_reader(transport, path,
+ [offset[0:2] for offset in pack_readv_requests])
+ for (names, read_func), (_1, _2, (key, eol_flag, references)) in \
+ izip(reader.iter_records(), pack_readv_requests):
+ raw_data = read_func(None)
+ if output_lines:
+ # read the entire thing
+ content, _ = knit_data._parse_record(key[-1], raw_data)
+ if len(references[-1]) == 0:
+ line_iterator = factory.get_fulltext_content(content)
+ else:
+ line_iterator = factory.get_linedelta_content(content)
+ for line in line_iterator:
+ yield line
+ else:
+ # check the header only
+ df, _ = knit_data._parse_record_header(key[-1], raw_data)
+ df.close()
+ pos, size = writer.add_bytes_record(raw_data, names)
+ write_index.add_node(key, eol_flag + "%d %d" % (pos, size), references)
+ pb.update("Copied record", record_index)
+ record_index += 1
+
+
+
+class RepositoryPackCollection(object):
+ """Management of packs within a repository."""
+
+ def __init__(self, repo, transport, index_transport, upload_transport,
+ pack_transport):
+ """Create a new RepositoryPackCollection.
+
+ :param transport: Addresses the repository base directory
+ (typically .bzr/repository/).
+ :param index_transport: Addresses the directory containing indices.
+ :param upload_transport: Addresses the directory into which packs are written
+ while they're being created.
+ :param pack_transport: Addresses the directory of existing complete packs.
+ """
+ self.repo = repo
+ self.transport = transport
+ self._index_transport = index_transport
+ self._upload_transport = upload_transport
+ self._pack_transport = pack_transport
+ self._suffix_offsets = {'.rix': 0, '.iix': 1, '.tix': 2, '.six': 3}
+ self.packs = []
+ # name:Pack mapping
+ self._packs_by_name = {}
+ # the previous pack-names content
+ self._packs_at_load = None
+ # when a pack is being created by this object, the state of that pack.
+ self._new_pack = None
+ # aggregated revision index data
+ self.revision_index = AggregateIndex()
+ self.inventory_index = AggregateIndex()
+ self.text_index = AggregateIndex()
+ self.signature_index = AggregateIndex()
+
+ def add_pack_to_memory(self, pack):
+ """Make a Pack object available to the repository to satisfy queries.
+
+ :param pack: A Pack object.
+ """
+ assert pack.name not in self._packs_by_name
+ self.packs.append(pack)
+ self._packs_by_name[pack.name] = pack
+ self.revision_index.add_index(pack.revision_index, pack)
+ self.inventory_index.add_index(pack.inventory_index, pack)
+ self.text_index.add_index(pack.text_index, pack)
+ self.signature_index.add_index(pack.signature_index, pack)
+
+ def _add_text_to_weave(self, file_id, revision_id, new_lines, parents,
+ nostore_sha, random_revid):
+ file_id_index = GraphIndexPrefixAdapter(
+ self.text_index.combined_index,
+ (file_id, ), 1,
+ add_nodes_callback=self.text_index.add_callback)
+ self.repo._text_knit._index._graph_index = file_id_index
+ self.repo._text_knit._index._add_callback = file_id_index.add_nodes
+ return self.repo._text_knit.add_lines_with_ghosts(
+ revision_id, parents, new_lines, nostore_sha=nostore_sha,
+ random_id=random_revid, check_content=False)[0:2]
+
+ def all_packs(self):
+ """Return a list of all the Pack objects this repository has.
+
+ Note that an in-progress pack being created is not returned.
+
+ :return: A list of Pack objects for all the packs in the repository.
+ """
+ result = []
+ for name in self.names():
+ result.append(self.get_pack_by_name(name))
+ return result
+
+ def autopack(self):
+ """Pack the pack collection incrementally.
+
+ This will not attempt global reorganisation or recompression,
+ rather it will just ensure that the total number of packs does
+ not grow without bound. It uses the _max_pack_count method to
+ determine if autopacking is needed, and the pack_distribution
+ method to determine the number of revisions in each pack.
+
+ If autopacking takes place then the packs name collection will have
+ been flushed to disk - packing requires updating the name collection
+ in synchronisation with certain steps. Otherwise the names collection
+ is not flushed.
+
+ :return: True if packing took place.
+ """
+ # XXX: Should not be needed when the management of indices is sane.
+ total_revisions = self.revision_index.combined_index.key_count()
+ total_packs = len(self._names)
+ if self._max_pack_count(total_revisions) >= total_packs:
+ return False
+ # XXX: the following may want to be a class, to pack with a given
+ # policy.
+ mutter('Auto-packing repository %s, which has %d pack files, '
+ 'containing %d revisions into %d packs.', self, total_packs,
+ total_revisions, self._max_pack_count(total_revisions))
+ # determine which packs need changing
+ pack_distribution = self.pack_distribution(total_revisions)
+ existing_packs = []
+ for pack in self.all_packs():
+ revision_count = pack.get_revision_count()
+ if revision_count == 0:
+ # revision less packs are not generated by normal operation,
+ # only by operations like sign-my-commits, and thus will not
+ # tend to grow rapdily or without bound like commit containing
+ # packs do - leave them alone as packing them really should
+ # group their data with the relevant commit, and that may
+ # involve rewriting ancient history - which autopack tries to
+ # avoid. Alternatively we could not group the data but treat
+ # each of these as having a single revision, and thus add
+ # one revision for each to the total revision count, to get
+ # a matching distribution.
+ continue
+ existing_packs.append((revision_count, pack))
+ pack_operations = self.plan_autopack_combinations(
+ existing_packs, pack_distribution)
+ self._execute_pack_operations(pack_operations)
+ return True
+
def _execute_pack_operations(self, pack_operations):
"""Execute a series of pack operations.
@@ -759,8 +893,7 @@
# we may have no-ops from the setup logic
if len(packs) == 0:
continue
- # have a progress bar?
- self.create_pack_from_packs(packs, '.autopack')
+ Packer(self, packs, '.autopack').pack()
for pack in packs:
self._remove_pack_from_memory(pack)
# record the newly available packs and stop advertising the old
@@ -841,125 +974,6 @@
return pack_operations
- def _copy_nodes(self, nodes, index_map, writer, write_index):
- """Copy knit nodes between packs with no graph references."""
- pb = ui.ui_factory.nested_progress_bar()
- try:
- return self._do_copy_nodes(nodes, index_map, writer,
- write_index, pb)
- finally:
- pb.finished()
-
- def _do_copy_nodes(self, nodes, index_map, writer, write_index, pb):
- # for record verification
- knit_data = _KnitData(None)
- # plan a readv on each source pack:
- # group by pack
- nodes = sorted(nodes)
- # how to map this into knit.py - or knit.py into this?
- # we don't want the typical knit logic, we want grouping by pack
- # at this point - perhaps a helper library for the following code
- # duplication points?
- request_groups = {}
- for index, key, value in nodes:
- if index not in request_groups:
- request_groups[index] = []
- request_groups[index].append((key, value))
- record_index = 0
- pb.update("Copied record", record_index, len(nodes))
- for index, items in request_groups.iteritems():
- pack_readv_requests = []
- for key, value in items:
- # ---- KnitGraphIndex.get_position
- bits = value[1:].split(' ')
- offset, length = int(bits[0]), int(bits[1])
- pack_readv_requests.append((offset, length, (key, value[0])))
- # linear scan up the pack
- pack_readv_requests.sort()
- # copy the data
- transport, path = index_map[index]
- reader = pack.make_readv_reader(transport, path,
- [offset[0:2] for offset in pack_readv_requests])
- for (names, read_func), (_1, _2, (key, eol_flag)) in \
- izip(reader.iter_records(), pack_readv_requests):
- raw_data = read_func(None)
- # check the header only
- df, _ = knit_data._parse_record_header(key[-1], raw_data)
- df.close()
- pos, size = writer.add_bytes_record(raw_data, names)
- write_index.add_node(key, eol_flag + "%d %d" % (pos, size))
- pb.update("Copied record", record_index)
- record_index += 1
-
- def _copy_nodes_graph(self, nodes, index_map, writer, write_index,
- output_lines=False):
- """Copy knit nodes between packs.
-
- :param output_lines: Return lines present in the copied data as
- an iterator.
- """
- pb = ui.ui_factory.nested_progress_bar()
- try:
- return self._do_copy_nodes_graph(nodes, index_map, writer,
- write_index, output_lines, pb)
- finally:
- pb.finished()
-
- def _do_copy_nodes_graph(self, nodes, index_map, writer, write_index,
- output_lines, pb):
- # for record verification
- knit_data = _KnitData(None)
- # for line extraction when requested (inventories only)
- if output_lines:
- factory = knit.KnitPlainFactory()
- # plan a readv on each source pack:
- # group by pack
- nodes = sorted(nodes)
- # how to map this into knit.py - or knit.py into this?
- # we don't want the typical knit logic, we want grouping by pack
- # at this point - perhaps a helper library for the following code
- # duplication points?
- request_groups = {}
- record_index = 0
- pb.update("Copied record", record_index, len(nodes))
- for index, key, value, references in nodes:
- if index not in request_groups:
- request_groups[index] = []
- request_groups[index].append((key, value, references))
- for index, items in request_groups.iteritems():
- pack_readv_requests = []
- for key, value, references in items:
- # ---- KnitGraphIndex.get_position
- bits = value[1:].split(' ')
- offset, length = int(bits[0]), int(bits[1])
- pack_readv_requests.append((offset, length, (key, value[0], references)))
- # linear scan up the pack
- pack_readv_requests.sort()
- # copy the data
- transport, path = index_map[index]
- reader = pack.make_readv_reader(transport, path,
- [offset[0:2] for offset in pack_readv_requests])
- for (names, read_func), (_1, _2, (key, eol_flag, references)) in \
- izip(reader.iter_records(), pack_readv_requests):
- raw_data = read_func(None)
- if output_lines:
- # read the entire thing
- content, _ = knit_data._parse_record(key[-1], raw_data)
- if len(references[-1]) == 0:
- line_iterator = factory.get_fulltext_content(content)
- else:
- line_iterator = factory.get_linedelta_content(content)
- for line in line_iterator:
- yield line
- else:
- # check the header only
- df, _ = knit_data._parse_record_header(key[-1], raw_data)
- df.close()
- pos, size = writer.add_bytes_record(raw_data, names)
- write_index.add_node(key, eol_flag + "%d %d" % (pos, size), references)
- pb.update("Copied record", record_index)
- record_index += 1
-
def ensure_loaded(self):
# NB: if you see an assertion error here, its probably access against
# an unlocked repo. Naughty.
@@ -1483,6 +1497,42 @@
return 'w'
return 'r'
+ def _find_inconsistent_revision_parents(self):
+ """Find revisions with incorrectly cached parents.
+
+ :returns: an iterator yielding tuples of (revison-id, parents-in-index,
+ parents-in-revision).
+ """
+ assert self.is_locked()
+ pb = ui.ui_factory.nested_progress_bar()
+ try:
+ revision_nodes = self._pack_collection.revision_index \
+ .combined_index.iter_all_entries()
+ index_positions = []
+ # Get the cached index values for all revisions, and also the location
+ # in each index of the revision text so we can perform linear IO.
+ for index, key, value, refs in revision_nodes:
+ pos, length = value[1:].split(' ')
+ index_positions.append((index, int(pos), key[0],
+ tuple(parent[0] for parent in refs[0])))
+ pb.update("Reading revision index.", 0, 0)
+ index_positions.sort()
+ total = len(index_positions) / 1000 + 1
+ for offset in xrange(total):
+ pb.update("Checking cached revision graph.", offset)
+ to_query = index_positions[offset * 1000:(offset + 1) * 1000]
+ if not to_query:
+ break
+ rev_ids = [item[2] for item in to_query]
+ revs = self.get_revisions(rev_ids)
+ for revision, item in zip(revs, to_query):
+ index_parents = item[3]
+ rev_parents = tuple(revision.parent_ids)
+ if index_parents != rev_parents:
+ yield (revision.revision_id, index_parents, rev_parents)
+ finally:
+ pb.finished()
+
def get_parents(self, revision_ids):
"""See StackedParentsProvider.get_parents.
=== modified file 'bzrlib/repository.py'
--- a/bzrlib/repository.py 2007-10-30 19:42:31 +0000
+++ b/bzrlib/repository.py 2007-10-31 20:53:23 +0000
@@ -2331,6 +2331,7 @@
@needs_write_lock
def fetch(self, revision_id=None, pb=None, find_ghosts=False):
"""See InterRepository.fetch()."""
+ from bzrlib.repofmt.pack_repo import Packer
mutter("Using fetch logic to copy between %s(%s) and %s(%s)",
self.source, self.source._format, self.target, self.target._format)
self.count_copied = 0
@@ -2358,9 +2359,8 @@
except errors.NoSuchRevision:
raise errors.InstallFailed([revision_id])
packs = self.source._pack_collection.all_packs()
- pack = self.target._pack_collection.create_pack_from_packs(
- packs, '.fetch', revision_ids,
- )
+ pack = Packer(self.target._pack_collection, packs, '.fetch',
+ revision_ids).pack()
if pack is not None:
self.target._pack_collection._save_pack_names()
# Trigger an autopack. This may duplicate effort as we've just done
=== modified file 'bzrlib/tests/test_repository.py'
--- a/bzrlib/tests/test_repository.py 2007-10-30 17:39:11 +0000
+++ b/bzrlib/tests/test_repository.py 2007-10-31 20:53:23 +0000
@@ -1387,3 +1387,7 @@
self.assertEqual(20, len(pack.random_name))
self.assertIsInstance(pack.random_name, str)
self.assertIsInstance(pack.start_time, float)
+
+
+class TestPacker(TestCaseWithTransport):
+ """Tests for the packs repository Packer class."""
More information about the bazaar-commits
mailing list