Rev 120: Add the ability to create an Importer lock. in http://bazaar.launchpad.net/~bzr/bzr-history-db/trunk
John Arbash Meinel
john at arbash-meinel.com
Fri Apr 30 20:08:56 BST 2010
At http://bazaar.launchpad.net/~bzr/bzr-history-db/trunk
------------------------------------------------------------
revno: 120
revision-id: john at arbash-meinel.com-20100430190846-6a62sl3waibeejyh
parent: john at arbash-meinel.com-20100428015703-jerg5bkyuccfs1xy
committer: John Arbash Meinel <john at arbash-meinel.com>
branch nick: trunk
timestamp: Fri 2010-04-30 14:08:46 -0500
message:
Add the ability to create an Importer lock.
It even has inter-thread tests for the behavior, which seem to be pretty useful.
-------------- next part --------------
=== modified file '__init__.py'
--- a/__init__.py 2010-04-22 21:27:21 +0000
+++ b/__init__.py 2010-04-30 19:08:46 +0000
@@ -438,7 +438,11 @@
start_revision_id=start_revision_id,
stop_revision_id=real_stop_revision_id)
t1 = time.clock()
- merge_sorted = self._filter_non_ancestors(iter(merge_sorted))
+ func = getattr(self, '_filter_start_non_ancestors', None)
+ if func is None:
+ func = getattr(self, '_filter_non_ancestors', None)
+ if func is not None:
+ merge_sorted = func(iter(merge_sorted))
t2 = time.clock()
if 'history_db' in debug.debug_flags:
import pprint
=== modified file 'history_db.py'
--- a/history_db.py 2010-04-28 01:57:03 +0000
+++ b/history_db.py 2010-04-30 19:08:46 +0000
@@ -1198,12 +1198,24 @@
self._db_path = db_path
self._db_conn = None
self._cursor = None
+ self._importer_lock = None
self._branch = a_branch
self._branch_tip_rev_id = a_branch.last_revision()
self._branch_tip_db_id = self._get_db_id(self._branch_tip_rev_id)
self._tip_is_imported = False
self._stats = defaultdict(lambda: 0)
+ def set_importer_lock(self, lock):
+ """Add a thread-lock for building and running an Importer.
+
+ The DB back-end is generally single-writer, so add a thread lock to
+ avoid having two writers trying to access it at the same time.
+
+ This will be used as part of _import_tip. Note that it doesn't (yet?)
+ support anything like timeout.
+ """
+ self._importer_lock = lock
+
def _get_cursor(self):
if self._cursor is not None:
return self._cursor
@@ -1233,20 +1245,26 @@
def _import_tip(self):
if self._cursor is not None:
self.close()
- t = time.time()
- importer = Importer(self._db_path, self._branch,
- tip_revision_id=self._branch_tip_rev_id,
- incremental=True)
- importer.do_import()
- tdelta = time.time() - t
- if 'history_db' in debug.debug_flags:
- trace.note('imported %d nodes on-the-fly in %.3fs'
- % (importer._stats.get('total_nodes_inserted', 0),
- tdelta))
- self._db_conn = importer._db_conn
- self._cursor = importer._cursor
- self._branch_tip_db_id = self._get_db_id(self._branch_tip_rev_id)
- self._tip_is_imported = True
+ if self._importer_lock is not None:
+ self._importer_lock.acquire()
+ try:
+ t = time.time()
+ importer = Importer(self._db_path, self._branch,
+ tip_revision_id=self._branch_tip_rev_id,
+ incremental=True)
+ importer.do_import()
+ tdelta = time.time() - t
+ if 'history_db' in debug.debug_flags:
+ trace.note('imported %d nodes on-the-fly in %.3fs'
+ % (importer._stats.get('total_nodes_inserted', 0),
+ tdelta))
+ self._db_conn = importer._db_conn
+ self._cursor = importer._cursor
+ self._branch_tip_db_id = self._get_db_id(self._branch_tip_rev_id)
+ self._tip_is_imported = True
+ finally:
+ if self._importer_lock is not None:
+ self._importer_lock.release()
def _is_imported_db_id(self, tip_db_id):
res = self._get_cursor().execute(
=== modified file 'test_importer.py'
--- a/test_importer.py 2010-04-09 20:17:00 +0000
+++ b/test_importer.py 2010-04-30 19:08:46 +0000
@@ -16,6 +16,11 @@
"""Test aspects of the importer code."""
+import os
+import tempfile
+import threading
+import time
+
from bzrlib import (
graph,
tests,
@@ -678,3 +683,133 @@
inc_merger = self.make_inc_merger(b, 'O', 'O')
inc_merger.topo_order()
self.assertScheduledStack(inc_merger, [])
+
+
+class _InterLocker(object):
+ """Wrapper around thread locks to help with testing.
+
+ The idea is that one thread will be wanting to acquire a lock. When it does
+ so, we block it, and signal the testing thread that the acquisition was
+ attempted, so now would be a good time to check that things are proceeding
+ properly.
+ """
+
+ def __init__(self):
+ self._monitored_lock = threading.Lock()
+ # We start the lock in blocked mode, so that we have to call
+ # self.wait_for_acquire before we will let the other thread through.
+ self._monitored_lock.acquire()
+ self._acquire_called = False
+ self._release_called = False
+ self._acquireCondition = threading.Condition()
+ self._releaseCondition = threading.Condition()
+ self._max_wait_time = 1.0
+
+ def acquire(self):
+ """Same as threading.Lock.acquire.
+
+ This is meant to be called by the thread you are testing / monitoring.
+ """
+ self._acquireCondition.acquire()
+ self._acquire_called = True
+ self._acquireCondition.notify()
+ self._acquireCondition.release()
+ # Acquire the actual lock that this is substituting for
+ t_wait_start = time.time()
+ while not self._monitored_lock.acquire(False):
+ t_waited = time.time() - t_wait_start
+ if t_waited > self._max_wait_time:
+ raise RuntimeError('Acquire timed out after %.1fs'
+ % (t_waited,))
+ time.sleep(0.1)
+
+ def release(self):
+ """See threading.Lock.release."""
+ self._monitored_lock.release()
+ self._releaseCondition.acquire()
+ self._release_called = True
+ self._releaseCondition.notify()
+ self._releaseCondition.release()
+
+ def _wait_for_condition(self, condition, evaluator, name, timeout):
+ """Wait for the given condition to trigger.
+
+ :param condition: A Condition variable
+ :param evaluator: A callback to indicate if the condition has actually
+ been fulfilled. Should return True if the condition is ready to go.
+ :param name: An associated name for the condition (used for error
+ messages)
+ :param timeout: If the condition hasn't triggered after timeout
+ seconds, raise an error.
+ :return: When this function returns, the condition lock will be held.
+ Callers are responsible for calling .release()
+ """
+ t_wait_start = time.time()
+ condition.acquire()
+ while not evaluator():
+ t_waited = time.time() - t_wait_start
+ if t_waited > timeout:
+ raise RuntimeError('%s not triggered after %.1f seconds'
+ % (name, t_waited))
+ condition.wait(0.1)
+
+ def wait_for_acquire(self):
+ """Called by the test thread.
+
+ This will wait on a Condition until another thread calls 'acquire'.
+ Once that happens, that thread will be blocked, and this call will
+ return. Follow this up with 'wait_for_release' to let the other
+ thread continue, and block until release is called.
+ """
+ self._wait_for_condition(self._acquireCondition,
+ lambda: self._acquire_called, 'acquire', self._max_wait_time)
+ # Let the other thread start processing from the acquire.
+ self._monitored_lock.release()
+ self._acquireCondition.release()
+
+ def wait_for_release(self):
+ """Block this thread until self.release() is called."""
+ self._wait_for_condition(self._releaseCondition,
+ lambda: self._release_called, 'release', self._max_wait_time)
+ self._releaseCondition.release()
+
+
+class TestQuerier(TestCaseWithGraphs):
+
+ def test_importer_lock(self):
+ fn, temp = tempfile.mkstemp(prefix='test-bzr-history-db-', suffix='.db')
+ os.close(fn)
+ self.addCleanup(os.remove, temp)
+ b = self.make_interesting_branch()
+ b._tip_revision = 'I'
+ importer = history_db.Importer(temp, b, incremental=False)
+ importer.do_import()
+ del importer
+ b._tip_revision = 'O'
+ lock = _InterLocker()
+ query = history_db.Querier(temp, b)
+ query.set_importer_lock(lock)
+ query.close() # We will be doing the rest in another thread, and
+ # sqlite connections can't be shared between threads
+ t = threading.Thread(target=query.ensure_branch_tip)
+ t.start()
+ # TODO: The only bit we don't handle is that we'd really like the test
+ # to fail if the other thread didn't exit cleanly. However, this
+ # is good enough for now.
+ self.addCleanup(t.join)
+ lock.wait_for_acquire()
+ # At this point, the database should not have been updated yet.
+ conn = history_db.dbapi2.connect(temp)
+ res = conn.cursor().execute("SELECT db_id FROM revision"
+ " WHERE revision_id = ?",
+ ('O',)).fetchone()
+ self.assertIs(None, res)
+ lock.wait_for_release()
+ # Now that it has gotten the lock and finished, we should have the tip
+ # imported properly.
+ res = conn.cursor().execute("SELECT tip_revision"
+ " FROM revision, dotted_revno"
+ " WHERE tip_revision = merged_revision"
+ " AND revision_id = ?",
+ ('O',)).fetchone()
+ self.assertIsNot(None, res)
More information about the bazaar-commits
mailing list