[apparmor] [patch 05/13] parser - rewrite caching tests in python unittest

Steve Beattie steve at nxnw.org
Tue Oct 15 21:23:31 UTC 2013


On Tue, Oct 15, 2013 at 12:46:08PM -0700, Steve Beattie wrote:
> That said, I was able to make the decorator function approach work.
> Here's the updated patch:
> 
> 
> Subject: parser - rewrite caching tests in python unittest

Christian asked on IRC for a diff of the changes between the two
versions of the patch, so here it is. Note that the whitespace changes
were mostly to satisfy some of pep8's gripes.

diff -u b/parser/tst/caching.py b/parser/tst/caching.py
--- b/parser/tst/caching.py
+++ b/parser/tst/caching.py
@@ -15,7 +15,7 @@
 # - check cache used/not used if includes are newer/older
 # - check cache used for force-complain, disable symlink, etc.
 
-from optparse import OptionParser    # deprecated, should move to argparse eventually
+from argparse import ArgumentParser
 import os
 import shutil
 import time
@@ -41,7 +41,7 @@
 config = None
 
 
-class AAParserCachingCommon(unittest.TestCase):
+class AAParserCachingCommon(testlib.AATestTemplate):
     do_cleanup = True
 
     def setUp(self):
@@ -58,15 +58,14 @@
         self.cache_dir = os.path.join(self.tmp_dir, 'cache')
         os.mkdir(self.cache_dir)
 
-        # NOTE: need to do this as a file copy, as writing the file out
-        # for each test doesn't give enough time between the ctime of
-        # the profile and the apparmor_parser's generated cache file.
+        # write our sample profile out
         self.profile = os.path.join(self.tmp_dir, PROFILE)
         with open(self.profile, 'w+') as p:
             p.write(PROFILE_CONTENTS)
 
         if config.debug:
             self.do_cleanup = False
+            self.debug = True
 
         self.cmd_prefix = [config.parser, '--base', self.tmp_dir, '--skip-kernel-load']
 
@@ -74,7 +73,7 @@
         '''teardown for each test'''
 
         if not self.do_cleanup:
-            print("skipping cleanup, leaving testfiles behind in '%s'" % (self.tmp_dir))
+            print("===> Skipping cleanup, leaving testfiles behind in '%s'" % (self.tmp_dir))
         else:
             if os.path.exists(self.tmp_dir):
                 shutil.rmtree(self.tmp_dir)
@@ -82,10 +81,10 @@
     def assert_path_exists(self, path, expected=True):
         if expected is True:
             self.assertTrue(os.path.exists(path),
-                    'test did not create file %s, when it was expected to do so' % path)
+                            'test did not create file %s, when it was expected to do so' % path)
         else:
             self.assertFalse(os.path.exists(path),
-                'test created file %s, when it was not expected to do so' % path)
+                             'test created file %s, when it was not expected to do so' % path)
 
     def compare_features_file(self, features_path, expected=True):
         # compare features contents
@@ -94,65 +93,67 @@
             features = f.read()
         if expected:
             self.assertEquals(expected_output, features,
-                        "features contents differ, expected:\n%s\nresult:\n%s" % (expected_output, features))
+                              "features contents differ, expected:\n%s\nresult:\n%s" % (expected_output, features))
         else:
             self.assertNotEquals(expected_output, features,
-                        "features contents equal, expected:\n%s\nresult:\n%s" % (expected_output, features))
+                                 "features contents equal, expected:\n%s\nresult:\n%s" % (expected_output, features))
+
 
 class AAParserBasicCachingTests(AAParserCachingCommon):
 
     def setUp(self):
         super(AAParserBasicCachingTests, self).setUp()
 
+    @testlib.keep_on_fail
     def test_no_cache_by_default(self):
         '''test profiles are not cached by default'''
 
         cmd = list(self.cmd_prefix)
         cmd.extend(['-q', '-r', self.profile])
-        rc, report = testlib.run_cmd(cmd)
-        self.assertEquals(rc, 0, "Got return code %d, expected 0\nOutput: %s" % (rc, report))
+        self.run_cmd_check(cmd)
         self.assert_path_exists(os.path.join(self.cache_dir, PROFILE), expected=False)
 
+    @testlib.keep_on_fail
     def test_no_cache_w_skip_cache(self):
         '''test profiles are not cached with --skip-cache'''
 
         cmd = list(self.cmd_prefix)
         cmd.extend(['-q', '--write-cache', '--skip-cache', '-r', self.profile])
-        rc, report = testlib.run_cmd(cmd)
-        self.assertEquals(rc, 0, "Got return code %d, expected 0\nOutput: %s" % (rc, report))
+        self.run_cmd_check(cmd)
         self.assert_path_exists(os.path.join(self.cache_dir, PROFILE), expected=False)
 
+    @testlib.keep_on_fail
     def test_cache_when_requested(self):
         '''test profiles are cached when requested'''
 
         cmd = list(self.cmd_prefix)
         cmd.extend(['-q', '--write-cache', '-r', self.profile])
-        rc, report = testlib.run_cmd(cmd)
-        self.assertEquals(rc, 0, "Got return code %d, expected 0\nOutput: %s" % (rc, report))
+        self.run_cmd_check(cmd)
         self.assert_path_exists(os.path.join(self.cache_dir, PROFILE))
 
+    @testlib.keep_on_fail
     def test_write_features_when_caching(self):
         '''test features file is written when caching'''
 
         cmd = list(self.cmd_prefix)
         cmd.extend(['-q', '--write-cache', '-r', self.profile])
-        rc, report = testlib.run_cmd(cmd)
-        self.assertEquals(rc, 0, "Got return code %d, expected 0\nOutput: %s" % (rc, report))
+        self.run_cmd_check(cmd)
         self.assert_path_exists(os.path.join(self.cache_dir, PROFILE))
         self.assert_path_exists(os.path.join(self.cache_dir, '.features'))
 
+    @testlib.keep_on_fail
     def test_features_match_when_caching(self):
         '''test features file is written when caching'''
 
         cmd = list(self.cmd_prefix)
         cmd.extend(['-q', '--write-cache', '-r', self.profile])
-        rc, report = testlib.run_cmd(cmd)
-        self.assertEquals(rc, 0, "Got return code %d, expected 0\nOutput: %s" % (rc, report))
+        self.run_cmd_check(cmd)
         self.assert_path_exists(os.path.join(self.cache_dir, PROFILE))
         self.assert_path_exists(os.path.join(self.cache_dir, '.features'))
 
         self.compare_features_file(os.path.join(self.cache_dir, '.features'))
 
+
 class AAParserAltCacheBasicTests(AAParserBasicCachingTests):
     '''Same tests as above, but with an alternate cache location specified on the command line'''
 
@@ -171,6 +172,7 @@
             self.fail('original cache dir \'%s\' not empty' % self.unused_cache_dir)
         super(AAParserAltCacheBasicTests, self).tearDown()
 
+
 class AAParserCreateCacheBasicTestsCacheExists(AAParserBasicCachingTests):
     '''Same tests as above, but with create cache option on the command line and the cache already exists'''
 
@@ -178,6 +180,7 @@
         super(AAParserCreateCacheBasicTestsCacheExists, self).setUp()
         self.cmd_prefix.append('--create-cache-dir')
 
+
 class AAParserCreateCacheBasicTestsCacheNotExist(AAParserBasicCachingTests):
     '''Same tests as above, but with create cache option on the command line and cache dir removed'''
 
@@ -186,14 +189,17 @@
         shutil.rmtree(self.cache_dir)
         self.cmd_prefix.append('--create-cache-dir')
 
+
 class AAParserCreateCacheAltCacheTestsCacheNotExist(AAParserBasicCachingTests):
-    '''Same tests as above, but with create cache option on the command line, alt cache specified, and cache dir removed'''
+    '''Same tests as above, but with create cache option on the command line,
+       alt cache specified, and cache dir removed'''
 
     def setUp(self):
         super(AAParserCreateCacheAltCacheTestsCacheNotExist, self).setUp()
         shutil.rmtree(self.cache_dir)
         self.cmd_prefix.append('--create-cache-dir')
 
+
 class AAParserCachingTests(AAParserCachingCommon):
 
     def setUp(self):
@@ -212,10 +218,10 @@
 
         cmd = list(self.cmd_prefix)
         cmd.extend(['-q', '--write-cache', '-r', self.profile])
-        rc, report = testlib.run_cmd(cmd)
-        self.assertEquals(rc, 0, "Got return code %d, expected 0\nOutput: %s" % (rc, report))
+        self.run_cmd_check(cmd)
         self.assert_path_exists(os.path.join(self.cache_dir, PROFILE))
 
+    @testlib.keep_on_fail
     def test_cache_loaded_when_exists(self):
         '''test cache is loaded when it exists, is newer than profile,  and features match'''
 
@@ -223,10 +229,9 @@
 
         cmd = list(self.cmd_prefix)
         cmd.extend(['-v', '-r', self.profile])
-        rc, report = testlib.run_cmd(cmd)
-        self.assertEquals(rc, 0, "Got return code %d, expected 0\nOutput: %s" % (rc, report))
-        self.assertIn('Cached reload succeeded', report, 'Expected cache reload message, got: \n%s' % report)
+        report = self.run_cmd_check(cmd, expected_string='Cached reload succeeded')
 
+    @testlib.keep_on_fail
     def test_cache_not_loaded_when_skip_arg(self):
         '''test cache is not loaded when --skip-cache is passed'''
 
@@ -234,10 +239,9 @@
 
         cmd = list(self.cmd_prefix)
         cmd.extend(['-v', '--skip-cache', '-r', self.profile])
-        rc, report = testlib.run_cmd(cmd)
-        self.assertEquals(rc, 0, "Got return code %d, expected 0\nOutput: %s" % (rc, report))
-        self.assertIn('Replacement succeeded for', report, 'Expected cache reload message, got: \n%s' % report)
+        report = self.run_cmd_check(cmd, expected_string='Replacement succeeded for')
 
+    @testlib.keep_on_fail
     def test_cache_not_loaded_when_skip_read_arg(self):
         '''test cache is not loaded when --skip-read-cache is passed'''
 
@@ -245,103 +249,90 @@
 
         cmd = list(self.cmd_prefix)
         cmd.extend(['-v', '--skip-read-cache', '-r', self.profile])
-        rc, report = testlib.run_cmd(cmd)
-        self.assertEquals(rc, 0, "Got return code %d, expected 0\nOutput: %s" % (rc, report))
-        self.assertIn('Replacement succeeded for', report, 'Expected cache reload message, got: \n%s' % report)
+        report = self.run_cmd_check(cmd, expected_string='Replacement succeeded for')
 
+    @testlib.keep_on_fail
     def test_cache_not_loaded_when_features_differ(self):
         '''test cache is not loaded when features file differs'''
 
         self._generate_cache_file()
 
-        with open(os.path.join(self.cache_dir, '.features'), 'w+') as f:
-            f.write('monkey\n')
+        testlib.write_file(os.path.join(self.cache_dir, '.features'), 'monkey\n')
 
         cmd = list(self.cmd_prefix)
         cmd.extend(['-v', '-r', self.profile])
-        rc, report = testlib.run_cmd(cmd)
-        self.assertEquals(rc, 0, "Got return code %d, expected 0\nOutput: %s" % (rc, report))
-        self.assertIn('Replacement succeeded for', report, 'Expected cache reload message, got: \n%s' % report)
+        report = self.run_cmd_check(cmd, expected_string='Replacement succeeded for')
 
+    @testlib.keep_on_fail
     def test_cache_writing_does_not_overwrite_features_when_features_differ(self):
         '''test cache writing does not overwrite the features files when it differs and --skip-bad-cache is given'''
 
         features_file = os.path.join(self.cache_dir, '.features')
-        with open(features_file, 'w+') as f:
-            f.write('monkey\n')
+        testlib.write_file(features_file, 'monkey\n')
 
         cmd = list(self.cmd_prefix)
         cmd.extend(['-v', '--write-cache', '--skip-bad-cache', '-r', self.profile])
-        rc, report = testlib.run_cmd(cmd)
-        self.assertEquals(rc, 0, "Got return code %d, expected 0\nOutput: %s" % (rc, report))
-        self.assertIn('Replacement succeeded for', report, 'Expected cache reload message, got: \n%s' % report)
+        report = self.run_cmd_check(cmd, expected_string='Replacement succeeded for')
         self.assert_path_exists(features_file)
         # ensure that the features does *not* match the current features set
         self.compare_features_file(features_file, expected=False)
 
+    @testlib.keep_on_fail
     def test_cache_writing_skipped_when_features_differ(self):
         '''test cache writing is skipped when features file differs'''
 
-        with open(os.path.join(self.cache_dir, '.features'), 'w+') as f:
-            f.write('monkey\n')
+        testlib.write_file(os.path.join(self.cache_dir, '.features'), 'monkey\n')
 
         cmd = list(self.cmd_prefix)
         cmd.extend(['-v', '--write-cache', '--skip-bad-cache', '-r', self.profile])
-        rc, report = testlib.run_cmd(cmd)
-        self.assertEquals(rc, 0, "Got return code %d, expected 0\nOutput: %s" % (rc, report))
-        self.assertIn('Replacement succeeded for', report, 'Expected cache reload message, got: \n%s' % report)
+        report = self.run_cmd_check(cmd, expected_string='Replacement succeeded for')
         self.assert_path_exists(os.path.join(self.cache_dir, PROFILE), expected=False)
 
+    @testlib.keep_on_fail
     def test_cache_writing_updates_features(self):
         '''test cache writing updates features'''
 
-        with open(os.path.join(self.cache_dir, '.features'), 'w+') as f:
-            f.write('monkey\n')
+        features_file = os.path.join(self.cache_dir, '.features')
+        testlib.write_file(features_file, 'monkey\n')
 
         cmd = list(self.cmd_prefix)
         cmd.extend(['-v', '--write-cache', '-r', self.profile])
-        rc, report = testlib.run_cmd(cmd)
-        self.assertEquals(rc, 0, "Got return code %d, expected 0\nOutput: %s" % (rc, report))
-        self.assertIn('Replacement succeeded for', report, 'Expected replacement succeeded message, got: \n%s' % report)
-        self.assert_path_exists(os.path.join(self.cache_dir, '.features'))
-        self.compare_features_file(os.path.join(self.cache_dir, '.features'))
+        report = self.run_cmd_check(cmd, expected_string='Replacement succeeded for')
+        self.assert_path_exists(features_file)
+        self.compare_features_file(features_file)
 
+    @testlib.keep_on_fail
     def test_cache_writing_updates_cache_file(self):
         '''test cache writing updates cache file'''
 
         cache_file = os.path.join(self.cache_dir, PROFILE)
-        with open(cache_file, 'w+') as f:
-            f.write('monkey\n')
-            orig_size = os.fstat(f.fileno()).st_size
+        testlib.write_file(cache_file, 'monkey\n')
+        orig_size = os.stat(cache_file).st_size
 
         cmd = list(self.cmd_prefix)
         cmd.extend(['-v', '--write-cache', '-r', self.profile])
-        rc, report = testlib.run_cmd(cmd)
-        self.assertEquals(rc, 0, "Got return code %d, expected 0\nOutput: %s" % (rc, report))
-        self.assertIn('Replacement succeeded for', report, 'Expected cache reload message, got: \n%s' % report)
+        report = self.run_cmd_check(cmd, expected_string='Replacement succeeded for')
         self.assert_path_exists(cache_file)
         with open(cache_file, 'rb') as f:
-            cache_contents = f.read()
             new_size = os.fstat(f.fileno()).st_size
         # We check sizes here rather than whether the string monkey is
         # in cache_contents because of the difficulty coercing cache
         # file bytes into strings in python3
-        self.assertNotEquals(orig_size, new_size, 'Expected cache file to be updated, got: \n%s' % cache_contents)
+        self.assertNotEquals(orig_size, new_size, 'Expected cache file to be updated, size is not changed.')
 
+    @testlib.keep_on_fail
     def test_cache_writing_clears_all_files(self):
         '''test cache writing clears all cache files'''
 
         check_file = os.path.join(self.cache_dir, 'monkey')
-        with open(check_file, 'w+') as f:
-            f.write('monkey')
+        testlib.write_file(check_file, 'monkey\n')
 
         cmd = list(self.cmd_prefix)
         cmd.extend(['-v', '--write-cache', '-r', self.profile])
-        rc, report = testlib.run_cmd(cmd)
-        self.assertEquals(rc, 0, "Got return code %d, expected 0\nOutput: %s" % (rc, report))
-        self.assertIn('Replacement succeeded for', report, 'Expected replacement succeeded (no cache) message, got: \n%s' % report)
+        report = self.run_cmd_check(cmd, expected_string='Replacement succeeded for')
         self.assert_path_exists(check_file, expected=False)
 
+    @testlib.keep_on_fail
     def test_profile_newer_skips_cache(self):
         '''test cache is skipped if profile is newer'''
 
@@ -351,10 +342,9 @@
 
         cmd = list(self.cmd_prefix)
         cmd.extend(['-v', '-r', self.profile])
-        rc, report = testlib.run_cmd(cmd)
-        self.assertEquals(rc, 0, "Got return code %d, expected 0\nOutput: %s" % (rc, report))
-        self.assertIn('Replacement succeeded for', report, 'Expected replacement succeeded (no cache) message, got: \n%s' % report)
+        report = self.run_cmd_check(cmd, expected_string='Replacement succeeded for')
 
+    @testlib.keep_on_fail
     def test_parser_newer_skips_cache(self):
         '''test cache is skipped if parser is newer'''
 
@@ -369,35 +359,35 @@
         cmd = list(self.cmd_prefix)
         cmd[0] = new_parser
         cmd.extend(['-v', '-r', self.profile])
-        rc, report = testlib.run_cmd(cmd)
-        self.assertEquals(rc, 0, "Got return code %d, expected 0\nOutput: %s" % (rc, report))
-        self.assertIn('Replacement succeeded for', report, 'Expected cache reload message, got: \n%s' % report)
+        report = self.run_cmd_check(cmd, expected_string='Replacement succeeded for')
 
     def _purge_cache_test(self, location):
 
         cache_file = os.path.join(self.cache_dir, location)
-        with open(cache_file, 'w+') as f:
-            f.write('monkey\n')
+        testlib.write_file(cache_file, 'monkey\n')
 
         cmd = list(self.cmd_prefix)
         cmd.extend(['-v', '--purge-cache', '-r', self.profile])
-        rc, report = testlib.run_cmd(cmd)
-        self.assertEquals(rc, 0, "Got return code %d, expected 0\nOutput: %s" % (rc, report))
+        self.run_cmd_check(cmd)
         # no message is output
         self.assert_path_exists(cache_file, expected=False)
 
+    @testlib.keep_on_fail
     def test_cache_purge_removes_features_file(self):
         '''test cache --purge-cache removes .features file'''
         self._purge_cache_test('.features')
 
+    @testlib.keep_on_fail
     def test_cache_purge_removes_cache_file(self):
         '''test cache --purge-cache removes profile cache file'''
         self._purge_cache_test(PROFILE)
 
+    @testlib.keep_on_fail
     def test_cache_purge_removes_other_cache_files(self):
         '''test cache --purge-cache removes other cache files'''
         self._purge_cache_test('monkey')
 
+
 class AAParserAltCacheTests(AAParserCachingTests):
     '''Same tests as above, but with an alternate cache location specified on the command line'''
     check_orig_cache = True
@@ -417,31 +407,31 @@
             self.fail('original cache dir \'%s\' not empty' % self.orig_cache_dir)
         super(AAParserAltCacheTests, self).tearDown()
 
+    @testlib.keep_on_fail
     def test_cache_purge_leaves_original_cache_alone(self):
         '''test cache purging only touches alt cache'''
 
         # skip tearDown check to ensure non-alt cache is empty
         self.check_orig_cache = False
+        filelist = [PROFILE, '.features', 'monkey']
 
-        for f in [PROFILE, '.features', 'monkey']:
-            with open(os.path.join(self.orig_cache_dir, f), 'w+') as f:
-                f.write('monkey\n')
+        for f in filelist:
+            testlib.write_file(os.path.join(self.orig_cache_dir, f), 'monkey\n')
 
         self._purge_cache_test(PROFILE)
 
-        for f in [PROFILE, '.features', 'monkey']:
+        for f in filelist:
             if not os.path.exists(os.path.join(self.orig_cache_dir, f)):
                 self.fail('cache purge removed %s, was not supposed to' % (os.path.join(self.orig_cache_dir, f)))
 
 
 def main():
     global config
-    usage = "usage: %prog [options] [test_directory]"
-    p = OptionParser(usage=usage)
-    p.add_option('-p', '--parser', default=testlib.DEFAULT_PARSER, action="store", type="string", dest='parser')
-    p.add_option('-v', '--verbose', action="store_true", dest="verbose")
-    p.add_option('-d', '--debug', action="store_true", dest="debug")
-    config, args = p.parse_args()
+    p = ArgumentParser()
+    p.add_argument('-p', '--parser', default=testlib.DEFAULT_PARSER, action="store", dest='parser')
+    p.add_argument('-v', '--verbose', action="store_true", dest="verbose")
+    p.add_argument('-d', '--debug', action="store_true", dest="debug")
+    config = p.parse_args()
 
     verbosity = 1
     if config.verbose:
@@ -471 +460,0 @@
-
diff -u b/parser/tst/testlib.py b/parser/tst/testlib.py
--- b/parser/tst/testlib.py
+++ b/parser/tst/testlib.py
@@ -16,6 +16,7 @@
 import subprocess
 import tempfile
 import time
+import unittest
 
 TIMEOUT_ERROR_CODE = 152
 DEFAULT_PARSER = '../apparmor_parser'
@@ -30,32 +31,58 @@
     signal.signal(signal.SIGPIPE, signal.SIG_DFL)
 
 
-def run_cmd(command, input=None, stderr=subprocess.STDOUT, stdout=subprocess.PIPE, stdin=None, timeout=120):
-    '''Try to execute given command (array) and return its stdout, or
-    return a textual error if it failed.'''
+class AATestTemplate(unittest.TestCase):
+    '''Stub class for use by test scripts'''
+    debug = False
+    do_cleanup = True
+
+    def run_cmd_check(self, command, input=None, stderr=subprocess.STDOUT, stdout=subprocess.PIPE,
+                      stdin=None, timeout=120, expected_rc=0, expected_string=None):
+        '''Wrapper around run_cmd that checks the rc code against
+           expected_rc and for expected strings in the output if
+           passed. The valgrind tests generally don't care what the
+           rc is as long as it's not a specific set of return codes,
+           so can't push the check directly into run_cmd().'''
+        rc, report = self.run_cmd(command, input, stderr, stdout, stdin, timeout)
+        self.assertEqual(rc, expected_rc, "Got return code %d, expected %d\nCommand run: %s\nOutput: %s" % (rc, expected_rc, (' '.join(command)), report))
+        if expected_string:
+            self.assertIn(expected_string, report, 'Expected message "%s", got: \n%s' % (expected_string, report))
+        return report
+
+    def run_cmd(self, command, input=None, stderr=subprocess.STDOUT, stdout=subprocess.PIPE,
+                stdin=None, timeout=120):
+        '''Try to execute given command (array) and return its stdout, or
+           return a textual error if it failed.'''
+
+        if self.debug:
+            print('\n===> Running command: \'%s\'' % (' '.join(command)))
+
+        try:
+            sp = subprocess.Popen(command, stdin=stdin, stdout=stdout, stderr=stderr,
+                                  close_fds=True, preexec_fn=subprocess_setup)
+        except OSError as e:
+            return [127, str(e)]
+
+        timeout_communicate = TimeoutFunction(sp.communicate, timeout)
+        out, outerr = (None, None)
+        try:
+            out, outerr = timeout_communicate(input)
+            rc = sp.returncode
+        except TimeoutFunctionException as e:
+            sp.terminate()
+            outerr = b'test timed out, killed'
+            rc = TIMEOUT_ERROR_CODE
+
+        # Handle redirection of stdout
+        if out is None:
+            out = b''
+        # Handle redirection of stderr
+        if outerr is None:
+            outerr = b''
 
-    try:
-        sp = subprocess.Popen(command, stdin=stdin, stdout=stdout, stderr=stderr, close_fds=True, preexec_fn=subprocess_setup)
-    except OSError as e:
-        return [127, str(e)]
+        report = out.decode('utf-8') + outerr.decode('utf-8')
 
-    timeout_communicate = TimeoutFunction(sp.communicate, timeout)
-    out, outerr = (None, None)
-    try:
-        out, outerr = timeout_communicate(input)
-        rc = sp.returncode
-    except TimeoutFunctionException as e:
-        sp.terminate()
-        outerr = b'test timed out, killed'
-        rc = TIMEOUT_ERROR_CODE
-
-    # Handle redirection of stdout
-    if out is None:
-        out = b''
-    # Handle redirection of stderr
-    if outerr is None:
-        outerr = b''
-    return [rc, out.decode('utf-8') + outerr.decode('utf-8')]
+        return [rc, report]
 
 
 # Timeout handler using alarm() from John P. Speno's Pythonic Avocado
@@ -99,7 +126,7 @@
                 s = os.fstat(f.fileno())
 
             if (s.st_mtime == last_stamp):
-                print('WARNING: TMPDIR lacks nanosecond timestamp resolution, falling back to slower test')
+                print('\n===> WARNING: TMPDIR lacks nanosecond timestamp resolution, falling back to slower test')
                 result = (False, 1.0)
                 break
 
@@ -113,6 +140,7 @@
 
     return result
 
+
 def read_features_dir(path):
 
     result = ''
@@ -135,2 +163,24 @@
+
 def touch(path):
     return os.utime(path, None)
+
+
+def write_file(path, contents):
+    '''write contents to path'''
+    with open(path, 'w+') as f:
+        f.write(contents)
+
+
+def keep_on_fail(unittest_func):
+    '''wrapping function for unittest testcases to detect failure
+       and leave behind test files in tearDown(); to be used as a
+       decorator'''
+
+    def new_unittest_func(self):
+        try:
+            unittest_func(self)
+        except Exception:
+            self.do_cleanup = False
+            raise
+
+    return new_unittest_func
only in patch2:
unchanged:
--- a/parser/tst/valgrind_simple.py
+++ b/parser/tst/valgrind_simple.py
@@ -57,7 +57,7 @@ VALGRIND_SUPPRESSIONS = '''
 }'''
 
 
-class AAParserValgrindTests(unittest.TestCase):
+class AAParserValgrindTests(testlib.AATestTemplate):
     def setUp(self):
         # REPORT ALL THE OUTPUT
         self.maxDiff = None
@@ -70,7 +70,7 @@ class AAParserValgrindTests(unittest.Tes
         command.append(config.parser)
         command.extend(parser_args)
         command.append(testname)
-        rc, output = testlib.run_cmd(command, timeout=120)
+        rc, output = self.run_cmd(command, timeout=120)
         self.assertNotIn(rc, failure_rc,
                     "valgrind returned error code %d, gave the following output\n%s" % (rc, output))
 
-- 
Steve Beattie
<sbeattie at ubuntu.com>
http://NxNW.org/~steve/
-------------- next part --------------
A non-text attachment was scrubbed...
Name: signature.asc
Type: application/pgp-signature
Size: 836 bytes
Desc: Digital signature
URL: <https://lists.ubuntu.com/archives/apparmor/attachments/20131015/7a783999/attachment-0001.pgp>


More information about the AppArmor mailing list