Rev 4210: (robertc) Parallel test suite support (Robert Collins, in file:///home/pqm/archives/thelove/bzr/%2Btrunk/

Canonical.com Patch Queue Manager pqm at pqm.ubuntu.com
Fri Mar 27 16:55:44 GMT 2009


At file:///home/pqm/archives/thelove/bzr/%2Btrunk/

------------------------------------------------------------
revno: 4210
revision-id: pqm at pqm.ubuntu.com-20090327165538-lustxviiwnrfg2u2
parent: pqm at pqm.ubuntu.com-20090327051450-t7pdaegr0h6f40ul
parent: robertc at robertcollins.net-20090327160405-g8f40na68k2pofkm
committer: Canonical.com Patch Queue Manager <pqm at pqm.ubuntu.com>
branch nick: +trunk
timestamp: Fri 2009-03-27 16:55:38 +0000
message:
  (robertc) Parallel test suite support (Robert Collins,
  	Vincent Ladeuil)
modified:
  NEWS                           NEWS-20050323055033-4e00b5db738777ff
  bzrlib/builtins.py             builtins.py-20050830033751-fc01482b9ca23183
  bzrlib/tests/__init__.py       selftest.py-20050531073622-8d0e3c8845c97a64
    ------------------------------------------------------------
    revno: 4205.3.5
    revision-id: robertc at robertcollins.net-20090327160405-g8f40na68k2pofkm
    parent: robertc at robertcollins.net-20090327153356-1uzn0k9tjgo7oxp1
    committer: Robert Collins <robertc at robertcollins.net>
    branch nick: integration
    timestamp: Sat 2009-03-28 03:04:05 +1100
    message:
      Missing . on the parallel option.
    modified:
      bzrlib/builtins.py             builtins.py-20050830033751-fc01482b9ca23183
    ------------------------------------------------------------
    revno: 4205.3.4
    revision-id: robertc at robertcollins.net-20090327153356-1uzn0k9tjgo7oxp1
    parent: robertc at robertcollins.net-20090327152827-w226yu1mb0jw3ywd
    committer: Robert Collins <robertc at robertcollins.net>
    branch nick: integration
    timestamp: Sat 2009-03-28 02:33:56 +1100
    message:
      NEWS
    modified:
      NEWS                           NEWS-20050323055033-4e00b5db738777ff
    ------------------------------------------------------------
    revno: 4205.3.3
    revision-id: robertc at robertcollins.net-20090327152827-w226yu1mb0jw3ywd
    parent: robertc at robertcollins.net-20090327093143-gli7efprtwutx0p4
    committer: Robert Collins <robertc at robertcollins.net>
    branch nick: tests.parallel
    timestamp: Sat 2009-03-28 02:28:27 +1100
    message:
      Fixup comment.
    modified:
      bzrlib/tests/__init__.py       selftest.py-20050531073622-8d0e3c8845c97a64
    ------------------------------------------------------------
    revno: 4205.3.2
    revision-id: robertc at robertcollins.net-20090327093143-gli7efprtwutx0p4
    parent: robertc at robertcollins.net-20090326121545-t30g0qejzx7tknxl
    committer: Robert Collins <robertc at robertcollins.net>
    branch nick: tests.parallel
    timestamp: Fri 2009-03-27 20:31:43 +1100
    message:
      Create fork and reinvoke parallel testing support.
    modified:
      bzrlib/builtins.py             builtins.py-20050830033751-fc01482b9ca23183
      bzrlib/tests/__init__.py       selftest.py-20050531073622-8d0e3c8845c97a64
=== modified file 'NEWS'
--- a/NEWS	2009-03-26 13:18:16 +0000
+++ b/NEWS	2009-03-27 16:55:38 +0000
@@ -241,6 +241,11 @@
   building a working tree (file.writelines(str) is very inefficient). This
   can have a large effect on ``bzr checkout`` times. (John Arbash Meinel)
 
+* selftest now supports a --parallel option, with values of 'fork' or
+  'subprocess' to run the test suite in parallel. Currently only linux
+  machine work, other platforms need patches submitted. (Robert Collins,
+  Vincent Ladeuil)
+
 * ``tests.run_suite`` has a new parameter ``suite_decorators``, a list of 
   callables to use to decorate the test suite. Such decorators can add or
   remove tests, or even remote the test suite to another machine if

=== modified file 'bzrlib/builtins.py'
--- a/bzrlib/builtins.py	2009-03-26 05:41:59 +0000
+++ b/bzrlib/builtins.py	2009-03-27 16:55:38 +0000
@@ -3187,6 +3187,11 @@
                             ),
                      Option('list-only',
                             help='List the tests instead of running them.'),
+                     RegistryOption('parallel',
+                        help="Run the test suite in parallel.",
+                        lazy_registry=('bzrlib.tests', 'parallel_registry'),
+                        value_switches=False,
+                        ),
                      Option('randomize', type=str, argname="SEED",
                             help='Randomize the order of tests using the given'
                                  ' seed or "now" for the current time.'),
@@ -3218,7 +3223,8 @@
             lsprof_timed=None, cache_dir=None,
             first=False, list_only=False,
             randomize=None, exclude=None, strict=False,
-            load_list=None, debugflag=None, starting_with=None, subunit=False):
+            load_list=None, debugflag=None, starting_with=None, subunit=False,
+            parallel=None):
         from bzrlib.tests import selftest
         import bzrlib.benchmarks as benchmarks
         from bzrlib.benchmarks import tree_creator
@@ -3247,6 +3253,9 @@
                 raise errors.BzrCommandError("subunit not available. subunit "
                     "needs to be installed to use --subunit.")
             self.additional_selftest_args['runner_class'] = SubUnitBzrRunner
+        if parallel:
+            self.additional_selftest_args.setdefault(
+                'suite_decorators', []).append(parallel)
         if benchmark:
             test_suite_factory = benchmarks.test_suite
             # Unless user explicitly asks for quiet, be verbose in benchmarks

=== modified file 'bzrlib/tests/__init__.py'
--- a/bzrlib/tests/__init__.py	2009-03-26 12:15:45 +0000
+++ b/bzrlib/tests/__init__.py	2009-03-27 15:28:27 +0000
@@ -33,13 +33,14 @@
 import doctest
 import errno
 import logging
+import math
 import os
 from pprint import pformat
 import random
 import re
 import shlex
 import stat
-from subprocess import Popen, PIPE
+from subprocess import Popen, PIPE, STDOUT
 import sys
 import tempfile
 import threading
@@ -543,7 +544,15 @@
                 run += 1
             actionTaken = "Listed"
         else:
-            test.run(result)
+            try:
+                from testtools import ThreadsafeForwardingResult
+            except ImportError:
+                test.run(result)
+            else:
+                if type(result) == ThreadsafeForwardingResult:
+                    test.run(BZRTransformingResult(result))
+                else:
+                    test.run(result)
             run = result.testsRun
             actionTaken = "Ran"
         stopTime = time.time()
@@ -2227,7 +2236,8 @@
         For TestCaseInTempDir we create a temporary directory based on the test
         name and then create two subdirs - test and home under it.
         """
-        name_prefix = osutils.pathjoin(self.TEST_ROOT, self._getTestDirPrefix())
+        name_prefix = osutils.pathjoin(TestCaseWithMemoryTransport.TEST_ROOT,
+            self._getTestDirPrefix())
         name = name_prefix
         for i in range(100):
             if os.path.exists(name):
@@ -2251,7 +2261,7 @@
         self.addCleanup(self.deleteTestDir)
 
     def deleteTestDir(self):
-        os.chdir(self.TEST_ROOT)
+        os.chdir(TestCaseWithMemoryTransport.TEST_ROOT)
         _rmtree_temp_dir(self.test_base_dir)
 
     def build_tree(self, shape, line_endings='binary', transport=None):
@@ -2671,6 +2681,24 @@
         return result.wasSuccessful()
 
 
+# A registry where get() returns a suite decorator.
+parallel_registry = registry.Registry()
+def fork_decorator(suite):
+    concurrency = local_concurrency()
+    if concurrency == 1:
+        return suite
+    from testtools import ConcurrentTestSuite
+    return ConcurrentTestSuite(suite, fork_for_tests)
+parallel_registry.register('fork', fork_decorator)
+def subprocess_decorator(suite):
+    concurrency = local_concurrency()
+    if concurrency == 1:
+        return suite
+    from testtools import ConcurrentTestSuite
+    return ConcurrentTestSuite(suite, reinvoke_for_tests)
+parallel_registry.register('subprocess', subprocess_decorator)
+
+
 def exclude_tests(exclude_pattern):
     """Return a test suite decorator that excludes tests."""
     if exclude_pattern is None:
@@ -2836,6 +2864,192 @@
         return iter(self._tests)
 
 
+def partition_tests(suite, count):
+    """Partition suite into count lists of tests."""
+    result = []
+    tests = list(iter_suite_tests(suite))
+    tests_per_process = int(math.ceil(float(len(tests)) / count))
+    for block in range(count):
+        low_test = block * tests_per_process
+        high_test = low_test + tests_per_process
+        process_tests = tests[low_test:high_test]
+        result.append(process_tests)
+    return result
+
+
+def fork_for_tests(suite):
+    """Take suite and start up one runner per CPU by forking()
+
+    :return: An iterable of TestCase-like objects which can each have
+        run(result) called on them to feed tests to result, and
+        cleanup() called on them to stop them/kill children/end threads.
+    """
+    concurrency = local_concurrency()
+    result = []
+    from subunit import TestProtocolClient, ProtocolTestCase
+    class TestInOtherProcess(ProtocolTestCase):
+        # Should be in subunit, I think. RBC.
+        def __init__(self, stream, pid):
+            ProtocolTestCase.__init__(self, stream)
+            self.pid = pid
+
+        def run(self, result):
+            try:
+                ProtocolTestCase.run(self, result)
+            finally:
+                os.waitpid(self.pid, os.WNOHANG)
+            # print "pid %d finished" % finished_process
+
+    test_blocks = partition_tests(suite, concurrency)
+    for process_tests in test_blocks:
+        process_suite = TestSuite()
+        process_suite.addTests(process_tests)
+        c2pread, c2pwrite = os.pipe()
+        pid = os.fork()
+        if pid == 0:
+            try:
+                os.close(c2pread)
+                # Leave stderr and stdout open so we can see test noise
+                # Close stdin so that the child goes away if it decides to
+                # read from stdin (otherwise its a roulette to see what
+                # child actually gets keystrokes for pdb etc.
+                sys.stdin.close()
+                sys.stdin = None
+                stream = os.fdopen(c2pwrite, 'wb', 0)
+                subunit_result = TestProtocolClient(stream)
+                process_suite.run(subunit_result)
+            finally:
+                os._exit(0)
+        else:
+            os.close(c2pwrite)
+            stream = os.fdopen(c2pread, 'rb', 1)
+            test = TestInOtherProcess(stream, pid)
+            result.append(test)
+    return result
+
+
+def reinvoke_for_tests(suite):
+    """Take suite and start up one runner per CPU using subprocess().
+
+    :return: An iterable of TestCase-like objects which can each have
+        run(result) called on them to feed tests to result, and
+        cleanup() called on them to stop them/kill children/end threads.
+    """
+    concurrency = local_concurrency()
+    result = []
+    from subunit import TestProtocolClient, ProtocolTestCase
+    class TestInSubprocess(ProtocolTestCase):
+        def __init__(self, process, name):
+            ProtocolTestCase.__init__(self, process.stdout)
+            self.process = process
+            self.process.stdin.close()
+            self.name = name
+
+        def run(self, result):
+            try:
+                ProtocolTestCase.run(self, result)
+            finally:
+                self.process.wait()
+                os.unlink(self.name)
+            # print "pid %d finished" % finished_process
+    test_blocks = partition_tests(suite, concurrency)
+    for process_tests in test_blocks:
+        # ugly; currently reimplement rather than reuses TestCase methods.
+        bzr_path = os.path.dirname(os.path.dirname(bzrlib.__file__))+'/bzr'
+        if not os.path.isfile(bzr_path):
+            # We are probably installed. Assume sys.argv is the right file
+            bzr_path = sys.argv[0]
+        fd, test_list_file_name = tempfile.mkstemp()
+        test_list_file = os.fdopen(fd, 'wb', 1)
+        for test in process_tests:
+            test_list_file.write(test.id() + '\n')
+        test_list_file.close()
+        try:
+            argv = [bzr_path, 'selftest', '--load-list', test_list_file_name,
+                '--subunit']
+            if '--no-plugins' in sys.argv:
+                argv.append('--no-plugins')
+            # stderr=STDOUT would be ideal, but until we prevent noise on
+            # stderr it can interrupt the subunit protocol.
+            process = Popen(argv, stdin=PIPE, stdout=PIPE, stderr=PIPE,
+                bufsize=1)
+            test = TestInSubprocess(process, test_list_file_name)
+            result.append(test)
+        except:
+            os.unlink(test_list_file_name)
+            raise
+    return result
+
+
+def cpucount(content):
+    lines = content.splitlines()
+    prefix = 'processor'
+    for line in lines:
+        if line.startswith(prefix):
+            concurrency = int(line[line.find(':')+1:]) + 1
+    return concurrency
+
+
+def local_concurrency():
+    try:
+        content = file('/proc/cpuinfo', 'rb').read()
+        concurrency = cpucount(content)
+    except Exception, e:
+        concurrency = 1
+    return concurrency
+
+
+class BZRTransformingResult(unittest.TestResult):
+
+    def __init__(self, target):
+        unittest.TestResult.__init__(self)
+        self.result = target
+
+    def startTest(self, test):
+        self.result.startTest(test)
+
+    def stopTest(self, test):
+        self.result.stopTest(test)
+
+    def addError(self, test, err):
+        feature = self._error_looks_like('UnavailableFeature: ', err)
+        if feature is not None:
+            self.result.addNotSupported(test, feature)
+        else:
+            self.result.addError(test, err)
+
+    def addFailure(self, test, err):
+        known = self._error_looks_like('KnownFailure: ', err)
+        if known is not None:
+            self.result._addKnownFailure(test, [KnownFailure,
+                                                KnownFailure(known), None])
+        else:
+            self.result.addFailure(test, err)
+
+    def addSkip(self, test, reason):
+        self.result.addSkip(test, reason)
+
+    def addSuccess(self, test):
+        self.result.addSuccess(test)
+
+    def _error_looks_like(self, prefix, err):
+        """Deserialize exception and returns the stringify value."""
+        import subunit
+        value = None
+        typ, exc, _ = err
+        if isinstance(exc, subunit.RemoteException):
+            # stringify the exception gives access to the remote traceback
+            # We search the last line for 'prefix'
+            lines = str(exc).split('\n')
+            if len(lines) > 1:
+                last = lines[-2] # -1 is empty, final \n
+            else:
+                last = lines[-1]
+            if last.startswith(prefix):
+                value = last[len(prefix):]
+        return value
+
+
 # Controlled by "bzr selftest -E=..." option
 selftest_debug_flags = set()
 
@@ -2854,6 +3068,7 @@
              debug_flags=None,
              starting_with=None,
              runner_class=None,
+             suite_decorators=None,
              ):
     """Run the whole test suite under the enhanced runner"""
     # XXX: Very ugly way to do this...
@@ -2891,6 +3106,7 @@
                      exclude_pattern=exclude_pattern,
                      strict=strict,
                      runner_class=runner_class,
+                     suite_decorators=suite_decorators,
                      )
     finally:
         default_transport = old_transport




More information about the bazaar-commits mailing list