Rev 120: Add the ability to create an Importer lock. in http://bazaar.launchpad.net/~bzr/bzr-history-db/trunk

John Arbash Meinel john at arbash-meinel.com
Fri Apr 30 20:08:56 BST 2010


At http://bazaar.launchpad.net/~bzr/bzr-history-db/trunk

------------------------------------------------------------
revno: 120
revision-id: john at arbash-meinel.com-20100430190846-6a62sl3waibeejyh
parent: john at arbash-meinel.com-20100428015703-jerg5bkyuccfs1xy
committer: John Arbash Meinel <john at arbash-meinel.com>
branch nick: trunk
timestamp: Fri 2010-04-30 14:08:46 -0500
message:
  Add the ability to create an Importer lock.
  
  It even has inter-thread tests for the behavior, which seem to be pretty useful.
-------------- next part --------------
=== modified file '__init__.py'
--- a/__init__.py	2010-04-22 21:27:21 +0000
+++ b/__init__.py	2010-04-30 19:08:46 +0000
@@ -438,7 +438,11 @@
                     start_revision_id=start_revision_id,
                     stop_revision_id=real_stop_revision_id)
     t1 = time.clock()
-    merge_sorted = self._filter_non_ancestors(iter(merge_sorted))
+    func = getattr(self, '_filter_start_non_ancestors', None)
+    if func is None:
+        func = getattr(self, '_filter_non_ancestors', None)
+    if func is not None:
+        merge_sorted = func(iter(merge_sorted))
     t2 = time.clock()
     if 'history_db' in debug.debug_flags:
         import pprint

=== modified file 'history_db.py'
--- a/history_db.py	2010-04-28 01:57:03 +0000
+++ b/history_db.py	2010-04-30 19:08:46 +0000
@@ -1198,12 +1198,24 @@
         self._db_path = db_path
         self._db_conn = None
         self._cursor = None
+        self._importer_lock = None
         self._branch = a_branch
         self._branch_tip_rev_id = a_branch.last_revision()
         self._branch_tip_db_id = self._get_db_id(self._branch_tip_rev_id)
         self._tip_is_imported = False
         self._stats = defaultdict(lambda: 0)
 
+    def set_importer_lock(self, lock):
+        """Add a thread-lock for building and running an Importer.
+
+        The DB back-end is generally single-writer, so add a thread lock to
+        avoid having two writers trying to access it at the same time.
+
+        This will be used as part of _import_tip. Note that it doesn't (yet?)
+        support anything like timeout.
+        """
+        self._importer_lock = lock
+
     def _get_cursor(self):
         if self._cursor is not None:
             return self._cursor
@@ -1233,20 +1245,26 @@
     def _import_tip(self):
         if self._cursor is not None:
             self.close()
-        t = time.time()
-        importer = Importer(self._db_path, self._branch,
-                            tip_revision_id=self._branch_tip_rev_id,
-                            incremental=True)
-        importer.do_import()
-        tdelta = time.time() - t
-        if 'history_db' in debug.debug_flags:
-            trace.note('imported %d nodes on-the-fly in %.3fs'
-                       % (importer._stats.get('total_nodes_inserted', 0),
-                          tdelta))
-        self._db_conn = importer._db_conn
-        self._cursor = importer._cursor
-        self._branch_tip_db_id = self._get_db_id(self._branch_tip_rev_id)
-        self._tip_is_imported = True
+        if self._importer_lock is not None:
+            self._importer_lock.acquire()
+        try:
+            t = time.time()
+            importer = Importer(self._db_path, self._branch,
+                                tip_revision_id=self._branch_tip_rev_id,
+                                incremental=True)
+            importer.do_import()
+            tdelta = time.time() - t
+            if 'history_db' in debug.debug_flags:
+                trace.note('imported %d nodes on-the-fly in %.3fs'
+                           % (importer._stats.get('total_nodes_inserted', 0),
+                              tdelta))
+            self._db_conn = importer._db_conn
+            self._cursor = importer._cursor
+            self._branch_tip_db_id = self._get_db_id(self._branch_tip_rev_id)
+            self._tip_is_imported = True
+        finally:
+            if self._importer_lock is not None:
+                self._importer_lock.release()
 
     def _is_imported_db_id(self, tip_db_id):
         res = self._get_cursor().execute(

=== modified file 'test_importer.py'
--- a/test_importer.py	2010-04-09 20:17:00 +0000
+++ b/test_importer.py	2010-04-30 19:08:46 +0000
@@ -16,6 +16,11 @@
 
 """Test aspects of the importer code."""
 
+import os
+import tempfile
+import threading
+import time
+
 from bzrlib import (
     graph,
     tests,
@@ -678,3 +683,133 @@
         inc_merger = self.make_inc_merger(b, 'O', 'O')
         inc_merger.topo_order()
         self.assertScheduledStack(inc_merger, [])
+
+
+class _InterLocker(object):
+    """Wrapper around thread locks to help with testing.
+
+    The idea is that one thread will be wanting to acquire a lock. When it does
+    so, we block it, and signal the testing thread that the acquisition was
+    attempted, so now would be a good time to check that things are proceeding
+    properly.
+    """
+
+    def __init__(self):
+        self._monitored_lock = threading.Lock()
+        # We start the lock in blocked mode, so that we have to call
+        # self.wait_for_acquire before we will let the other thread through.
+        self._monitored_lock.acquire()
+        self._acquire_called = False
+        self._release_called = False
+        self._acquireCondition = threading.Condition()
+        self._releaseCondition = threading.Condition()
+        self._max_wait_time = 1.0
+
+    def acquire(self):
+        """Same as threading.Lock.acquire.
+        
+        This is meant to be called by the thread you are testing / monitoring.
+        """
+        self._acquireCondition.acquire()
+        self._acquire_called = True
+        self._acquireCondition.notify()
+        self._acquireCondition.release()
+        # Acquire the actual lock that this is substituting for
+        t_wait_start = time.time()
+        while not self._monitored_lock.acquire(False):
+            t_waited = time.time() - t_wait_start
+            if t_waited > self._max_wait_time:
+                raise RuntimeError('Acquire timed out after %.1fs'
+                                   % (t_waited,))
+            time.sleep(0.1)
+
+    def release(self):
+        """See threading.Lock.release."""
+        self._monitored_lock.release()
+        self._releaseCondition.acquire()
+        self._release_called = True
+        self._releaseCondition.notify()
+        self._releaseCondition.release()
+
+    def _wait_for_condition(self, condition, evaluator, name, timeout):
+        """Wait for the given condition to trigger.
+
+        :param condition: A Condition variable
+        :param evaluator: A callback to indicate if the condition has actually
+            been fulfilled. Should return True if the condition is ready to go.
+        :param name: An associated name for the condition (used for error
+            messages)
+        :param timeout: If the condition hasn't triggered after timeout
+            seconds, raise an error.
+        :return: When this function returns, the condition lock will be held.
+            Callers are responsible for calling .release()
+        """
+        t_wait_start = time.time()
+        condition.acquire()
+        while not evaluator():
+            t_waited = time.time() - t_wait_start
+            if t_waited > timeout:
+                raise RuntimeError('%s not triggered after %.1f seconds'
+                                   % (name, t_waited))
+            condition.wait(0.1)
+
+    def wait_for_acquire(self):
+        """Called by the test thread.
+
+        This will wait on a Condition until another thread calls 'acquire'.
+        Once that happens, that thread will be blocked, and this call will
+        return. Follow this up with 'wait_for_release' to let the other
+        thread continue, and block until release is called.
+        """
+        self._wait_for_condition(self._acquireCondition, 
+            lambda: self._acquire_called, 'acquire', self._max_wait_time)
+        # Let the other thread start processing from the acquire.
+        self._monitored_lock.release()
+        self._acquireCondition.release()
+
+    def wait_for_release(self):
+        """Block this thread until self.release() is called."""
+        self._wait_for_condition(self._releaseCondition,
+            lambda: self._release_called, 'release', self._max_wait_time)
+        self._releaseCondition.release()
+
+
+class TestQuerier(TestCaseWithGraphs):
+
+    def test_importer_lock(self):
+        fn, temp = tempfile.mkstemp(prefix='test-bzr-history-db-', suffix='.db')
+        os.close(fn)
+        self.addCleanup(os.remove, temp)
+        b = self.make_interesting_branch()
+        b._tip_revision = 'I'
+        importer = history_db.Importer(temp, b, incremental=False)
+        importer.do_import()
+        del importer
+        b._tip_revision = 'O'
+        lock = _InterLocker()
+        query = history_db.Querier(temp, b)
+        query.set_importer_lock(lock)
+        query.close() # We will be doing the rest in another thread, and
+                      # sqlite connections can't be shared between threads
+        t = threading.Thread(target=query.ensure_branch_tip)
+        t.start()
+        # TODO: The only bit we don't handle is that we'd really like the test
+        #       to fail if the other thread didn't exit cleanly. However, this
+        #       is good enough for now.
+        self.addCleanup(t.join)
+        lock.wait_for_acquire()
+        # At this point, the database should not have been updated yet.
+        conn = history_db.dbapi2.connect(temp)
+        res = conn.cursor().execute("SELECT db_id FROM revision"
+                                    " WHERE revision_id = ?",
+                                    ('O',)).fetchone()
+        self.assertIs(None, res)
+        lock.wait_for_release()
+        # Now that it has gotten the lock and finished, we should have the tip
+        # imported properly.
+        res = conn.cursor().execute("SELECT tip_revision"
+                                    "  FROM revision, dotted_revno"
+                                    " WHERE tip_revision = merged_revision"
+                                    "   AND revision_id = ?",
+                                    ('O',)).fetchone()
+        self.assertIsNot(None, res)



More information about the bazaar-commits mailing list