[apparmor] [patch 05/13] parser - rewrite caching tests in python unittest

Steve Beattie steve at nxnw.org
Tue Oct 15 19:46:08 UTC 2013


On Fri, Oct 11, 2013 at 10:08:51PM +0200, Christian Boltz wrote:
> We'll see if you still like this in some months...

While I reserve the right to flake out^W^W change my mind, I help
maintain and improve other codebases that don't get reviews before
commits... and they sure could use it.

> (and I wonder if Kshitij smiled now that I have a new victim for my
> reviews ;-)

Heh, at least he knows now you weren't just picking on him. :)

> argparse already works with python 2.7 (Kshitij's code is using it, and 
> I regularly test it with py 2.7 and 3.3), so there's no reason to stay 
> with something that is deprecated ;-)

Doh, I missed that it was also added in 2.7. Fixed for both
valgrind_simple.py and caching.py.

> > > After reading the second half of the patch, I noticed that you also
> > > add monkeys to other files, so maybe (untested!)
> > > 
> > >     def add_monkey(filename):
> > >         banana = os.path.join(self.cache_dir, filename)
> > >         
> > >         with open(banana, 'w+') as f:
> > >             f.write('monkey')
> > > 
> > > would be another good solution.
> > 
> > Yeah, I'll think about this a bit, to determine what helper
> > function(s) I want to add to simplify it a bit.
> 
> I tend to the add_monkey function, because that makes the testcase 
> easier to understand than adding a "true" parameter to 
> _generate_cache_file.
> 
> Maybe it would be even easier with "break_file" instead of "add_monkey" 
> as function name. OTOH, I'm quite sure people _will_ check what 
> add_monkey() does, but nobody will read the code of break_file() ;-)

The thing is, I often need the full path for the subsequent
verification check as well, so pushing the os.path.join(self.cache_dir,
...)  call into the helper function both limits the generality of
the helper function (in some cases, that's okay) and doesn't save
me very much because I need to do it again later. So in this case,
I created a write_file() function that takes a path and a string and
writes that string to the path. It's more general but means the path
join occurs in the test function.

> > Though, what I'd really like is to somehow set self.do_cleanup to
> > False when any test fails, so that for test cases that fail, the
> > temporary directory is left behind, to make diagnosing why it failed
> > easier to do. I'll think about whether there's a reasonable way to do
> > that.
> 
> Looks like it isn't really nice or easy, but at least possible:
> 
> http://stackoverflow.com/questions/4414234/getting-pythons-unittest-results-in-a-teardown-method
> 
> http://www.piware.de/2012/10/python-unittest-show-log-on-test-failure/ 
> (the comments are also interesting)

Thanks. A lot of those solutions are specific to adding information
to the logged output, which is fine, but not what I'm after; I want
to be able to potentially re-run the test manually to see why it's
failing with a minimum of effort.

That said, I was able to make the decorator function approach work.
Here's the updated patch:


Subject: parser - rewrite caching tests in python unittest

This patch rewrites the caching test in python, using python's unittest
framework. It has been used with python 2.7 and python 3.3; python2.6
may have issues. It covers the tests in the existing caching.sh
test script (with the exception of the test that checks for when the
parser in $PATH is newer), as well as adding additional tests that
more extensively cover using a cache in an alternate location from
basedir. It also adds simple tests for the --create-cache-dir option
(along with that option's interaction with the alt-cache option).

(Some further work to be done is listed under TODO.)

Patch history:
  v1: - initial version
  v2: - create template base class
      - add keep_on_fail() decorator to keep temporary test files
        around after a test fails
      - don't dump raw cache file to failure output in
	test_cache_writing_updates_cache_file()
      - push run_cmd into template class
      - create run_cmd_check wrapper to run_cmd that adds an assertion
	check based on whether return code matches the expected rc
	(the valgrind tests only want to verify that the rc is not a
	specific set of values, hence the separate wrapper function)
      - similarly, add a check to run_cmd_check for verifying the output
	contains a specific string, also simplifying many of the caching
	tests.
      - create testlib.write_file() to simplify writing file

Signed-off-by: Steve Beattie <steve at nxnw.org>
---
 parser/tst/caching.py         |  460 ++++++++++++++++++++++++++++++++++++++++++
 parser/tst/testlib.py         |  155 +++++++++++---
 parser/tst/valgrind_simple.py |    4 
 3 files changed, 593 insertions(+), 26 deletions(-)

Index: b/parser/tst/caching.py
===================================================================
--- /dev/null
+++ b/parser/tst/caching.py
@@ -0,0 +1,460 @@
+#!/usr/bin/env python
+# ------------------------------------------------------------------
+#
+#   Copyright (C) 2013 Canonical Ltd.
+#   Author: Steve Beattie <steve at nxnw.org>
+#
+#   This program is free software; you can redistribute it and/or
+#   modify it under the terms of version 2 of the GNU General Public
+#   License published by the Free Software Foundation.
+#
+# ------------------------------------------------------------------
+
+# TODO
+# - check cache not used if parser in $PATH is newer
+# - check cache used/not used if includes are newer/older
+# - check cache used for force-complain, disable symlink, etc.
+
+from argparse import ArgumentParser
+import os
+import shutil
+import time
+import tempfile
+import unittest
+
+import testlib
+
+
+PROFILE_CONTENTS = '''
+# Simple example profile for caching tests
+
+/bin/pingy {
+  capability net_raw,
+  capability setuid,
+  network inet raw,
+
+  /bin/ping mixr,
+  /etc/modules.conf r,
+}
+'''
+PROFILE = 'sbin.pingy'
+config = None
+
+
+class AAParserCachingCommon(testlib.AATestTemplate):
+    do_cleanup = True
+
+    def setUp(self):
+        '''setup for each test'''
+        global config
+
+        # REPORT ALL THE OUTPUT
+        self.maxDiff = None
+
+        self.tmp_dir = tempfile.mkdtemp(prefix='aa-caching-')
+        os.chmod(self.tmp_dir, 0o755)
+
+        # create directory for cached blobs
+        self.cache_dir = os.path.join(self.tmp_dir, 'cache')
+        os.mkdir(self.cache_dir)
+
+        # write our sample profile out
+        self.profile = os.path.join(self.tmp_dir, PROFILE)
+        with open(self.profile, 'w+') as p:
+            p.write(PROFILE_CONTENTS)
+
+        if config.debug:
+            self.do_cleanup = False
+            self.debug = True
+
+        self.cmd_prefix = [config.parser, '--base', self.tmp_dir, '--skip-kernel-load']
+
+    def tearDown(self):
+        '''teardown for each test'''
+
+        if not self.do_cleanup:
+            print("===> Skipping cleanup, leaving testfiles behind in '%s'" % (self.tmp_dir))
+        else:
+            if os.path.exists(self.tmp_dir):
+                shutil.rmtree(self.tmp_dir)
+
+    def assert_path_exists(self, path, expected=True):
+        if expected is True:
+            self.assertTrue(os.path.exists(path),
+                            'test did not create file %s, when it was expected to do so' % path)
+        else:
+            self.assertFalse(os.path.exists(path),
+                             'test created file %s, when it was not expected to do so' % path)
+
+    def compare_features_file(self, features_path, expected=True):
+        # compare features contents
+        expected_output = testlib.read_features_dir('/sys/kernel/security/apparmor/features')
+        with open(features_path) as f:
+            features = f.read()
+        if expected:
+            self.assertEquals(expected_output, features,
+                              "features contents differ, expected:\n%s\nresult:\n%s" % (expected_output, features))
+        else:
+            self.assertNotEquals(expected_output, features,
+                                 "features contents equal, expected:\n%s\nresult:\n%s" % (expected_output, features))
+
+
+class AAParserBasicCachingTests(AAParserCachingCommon):
+
+    def setUp(self):
+        super(AAParserBasicCachingTests, self).setUp()
+
+    @testlib.keep_on_fail
+    def test_no_cache_by_default(self):
+        '''test profiles are not cached by default'''
+
+        cmd = list(self.cmd_prefix)
+        cmd.extend(['-q', '-r', self.profile])
+        self.run_cmd_check(cmd)
+        self.assert_path_exists(os.path.join(self.cache_dir, PROFILE), expected=False)
+
+    @testlib.keep_on_fail
+    def test_no_cache_w_skip_cache(self):
+        '''test profiles are not cached with --skip-cache'''
+
+        cmd = list(self.cmd_prefix)
+        cmd.extend(['-q', '--write-cache', '--skip-cache', '-r', self.profile])
+        self.run_cmd_check(cmd)
+        self.assert_path_exists(os.path.join(self.cache_dir, PROFILE), expected=False)
+
+    @testlib.keep_on_fail
+    def test_cache_when_requested(self):
+        '''test profiles are cached when requested'''
+
+        cmd = list(self.cmd_prefix)
+        cmd.extend(['-q', '--write-cache', '-r', self.profile])
+        self.run_cmd_check(cmd)
+        self.assert_path_exists(os.path.join(self.cache_dir, PROFILE))
+
+    @testlib.keep_on_fail
+    def test_write_features_when_caching(self):
+        '''test features file is written when caching'''
+
+        cmd = list(self.cmd_prefix)
+        cmd.extend(['-q', '--write-cache', '-r', self.profile])
+        self.run_cmd_check(cmd)
+        self.assert_path_exists(os.path.join(self.cache_dir, PROFILE))
+        self.assert_path_exists(os.path.join(self.cache_dir, '.features'))
+
+    @testlib.keep_on_fail
+    def test_features_match_when_caching(self):
+        '''test features file is written when caching'''
+
+        cmd = list(self.cmd_prefix)
+        cmd.extend(['-q', '--write-cache', '-r', self.profile])
+        self.run_cmd_check(cmd)
+        self.assert_path_exists(os.path.join(self.cache_dir, PROFILE))
+        self.assert_path_exists(os.path.join(self.cache_dir, '.features'))
+
+        self.compare_features_file(os.path.join(self.cache_dir, '.features'))
+
+
+class AAParserAltCacheBasicTests(AAParserBasicCachingTests):
+    '''Same tests as above, but with an alternate cache location specified on the command line'''
+
+    def setUp(self):
+        super(AAParserAltCacheBasicTests, self).setUp()
+
+        alt_cache_dir = tempfile.mkdtemp(prefix='aa-alt-cache', dir=self.tmp_dir)
+        os.chmod(alt_cache_dir, 0o755)
+
+        self.unused_cache_dir = self.cache_dir
+        self.cache_dir = alt_cache_dir
+        self.cmd_prefix.extend(['--cache-loc', alt_cache_dir])
+
+    def tearDown(self):
+        if len(os.listdir(self.unused_cache_dir)) > 0:
+            self.fail('original cache dir \'%s\' not empty' % self.unused_cache_dir)
+        super(AAParserAltCacheBasicTests, self).tearDown()
+
+
+class AAParserCreateCacheBasicTestsCacheExists(AAParserBasicCachingTests):
+    '''Same tests as above, but with create cache option on the command line and the cache already exists'''
+
+    def setUp(self):
+        super(AAParserCreateCacheBasicTestsCacheExists, self).setUp()
+        self.cmd_prefix.append('--create-cache-dir')
+
+
+class AAParserCreateCacheBasicTestsCacheNotExist(AAParserBasicCachingTests):
+    '''Same tests as above, but with create cache option on the command line and cache dir removed'''
+
+    def setUp(self):
+        super(AAParserCreateCacheBasicTestsCacheNotExist, self).setUp()
+        shutil.rmtree(self.cache_dir)
+        self.cmd_prefix.append('--create-cache-dir')
+
+
+class AAParserCreateCacheAltCacheTestsCacheNotExist(AAParserBasicCachingTests):
+    '''Same tests as above, but with create cache option on the command line,
+       alt cache specified, and cache dir removed'''
+
+    def setUp(self):
+        super(AAParserCreateCacheAltCacheTestsCacheNotExist, self).setUp()
+        shutil.rmtree(self.cache_dir)
+        self.cmd_prefix.append('--create-cache-dir')
+
+
+class AAParserCachingTests(AAParserCachingCommon):
+
+    def setUp(self):
+        super(AAParserCachingTests, self).setUp()
+
+        # need separation of length timeout between generating profile
+        # and generating cache entry, as the parser distinguishes
+        # between ctime, not mtime.
+        if not 'timeout' in dir(config):
+            r = testlib.filesystem_time_resolution()
+            config.timeout = r[1]
+
+        time.sleep(config.timeout)
+
+    def _generate_cache_file(self):
+
+        cmd = list(self.cmd_prefix)
+        cmd.extend(['-q', '--write-cache', '-r', self.profile])
+        self.run_cmd_check(cmd)
+        self.assert_path_exists(os.path.join(self.cache_dir, PROFILE))
+
+    @testlib.keep_on_fail
+    def test_cache_loaded_when_exists(self):
+        '''test cache is loaded when it exists, is newer than profile,  and features match'''
+
+        self._generate_cache_file()
+
+        cmd = list(self.cmd_prefix)
+        cmd.extend(['-v', '-r', self.profile])
+        report = self.run_cmd_check(cmd, expected_string='Cached reload succeeded')
+
+    @testlib.keep_on_fail
+    def test_cache_not_loaded_when_skip_arg(self):
+        '''test cache is not loaded when --skip-cache is passed'''
+
+        self._generate_cache_file()
+
+        cmd = list(self.cmd_prefix)
+        cmd.extend(['-v', '--skip-cache', '-r', self.profile])
+        report = self.run_cmd_check(cmd, expected_string='Replacement succeeded for')
+
+    @testlib.keep_on_fail
+    def test_cache_not_loaded_when_skip_read_arg(self):
+        '''test cache is not loaded when --skip-read-cache is passed'''
+
+        self._generate_cache_file()
+
+        cmd = list(self.cmd_prefix)
+        cmd.extend(['-v', '--skip-read-cache', '-r', self.profile])
+        report = self.run_cmd_check(cmd, expected_string='Replacement succeeded for')
+
+    @testlib.keep_on_fail
+    def test_cache_not_loaded_when_features_differ(self):
+        '''test cache is not loaded when features file differs'''
+
+        self._generate_cache_file()
+
+        testlib.write_file(os.path.join(self.cache_dir, '.features'), 'monkey\n')
+
+        cmd = list(self.cmd_prefix)
+        cmd.extend(['-v', '-r', self.profile])
+        report = self.run_cmd_check(cmd, expected_string='Replacement succeeded for')
+
+    @testlib.keep_on_fail
+    def test_cache_writing_does_not_overwrite_features_when_features_differ(self):
+        '''test cache writing does not overwrite the features files when it differs and --skip-bad-cache is given'''
+
+        features_file = os.path.join(self.cache_dir, '.features')
+        testlib.write_file(features_file, 'monkey\n')
+
+        cmd = list(self.cmd_prefix)
+        cmd.extend(['-v', '--write-cache', '--skip-bad-cache', '-r', self.profile])
+        report = self.run_cmd_check(cmd, expected_string='Replacement succeeded for')
+        self.assert_path_exists(features_file)
+        # ensure that the features does *not* match the current features set
+        self.compare_features_file(features_file, expected=False)
+
+    @testlib.keep_on_fail
+    def test_cache_writing_skipped_when_features_differ(self):
+        '''test cache writing is skipped when features file differs'''
+
+        testlib.write_file(os.path.join(self.cache_dir, '.features'), 'monkey\n')
+
+        cmd = list(self.cmd_prefix)
+        cmd.extend(['-v', '--write-cache', '--skip-bad-cache', '-r', self.profile])
+        report = self.run_cmd_check(cmd, expected_string='Replacement succeeded for')
+        self.assert_path_exists(os.path.join(self.cache_dir, PROFILE), expected=False)
+
+    @testlib.keep_on_fail
+    def test_cache_writing_updates_features(self):
+        '''test cache writing updates features'''
+
+        features_file = os.path.join(self.cache_dir, '.features')
+        testlib.write_file(features_file, 'monkey\n')
+
+        cmd = list(self.cmd_prefix)
+        cmd.extend(['-v', '--write-cache', '-r', self.profile])
+        report = self.run_cmd_check(cmd, expected_string='Replacement succeeded for')
+        self.assert_path_exists(features_file)
+        self.compare_features_file(features_file)
+
+    @testlib.keep_on_fail
+    def test_cache_writing_updates_cache_file(self):
+        '''test cache writing updates cache file'''
+
+        cache_file = os.path.join(self.cache_dir, PROFILE)
+        testlib.write_file(cache_file, 'monkey\n')
+        orig_size = os.stat(cache_file).st_size
+
+        cmd = list(self.cmd_prefix)
+        cmd.extend(['-v', '--write-cache', '-r', self.profile])
+        report = self.run_cmd_check(cmd, expected_string='Replacement succeeded for')
+        self.assert_path_exists(cache_file)
+        with open(cache_file, 'rb') as f:
+            new_size = os.fstat(f.fileno()).st_size
+        # We check sizes here rather than whether the string monkey is
+        # in cache_contents because of the difficulty coercing cache
+        # file bytes into strings in python3
+        self.assertNotEquals(orig_size, new_size, 'Expected cache file to be updated, size is not changed.')
+
+    @testlib.keep_on_fail
+    def test_cache_writing_clears_all_files(self):
+        '''test cache writing clears all cache files'''
+
+        check_file = os.path.join(self.cache_dir, 'monkey')
+        testlib.write_file(check_file, 'monkey\n')
+
+        cmd = list(self.cmd_prefix)
+        cmd.extend(['-v', '--write-cache', '-r', self.profile])
+        report = self.run_cmd_check(cmd, expected_string='Replacement succeeded for')
+        self.assert_path_exists(check_file, expected=False)
+
+    @testlib.keep_on_fail
+    def test_profile_newer_skips_cache(self):
+        '''test cache is skipped if profile is newer'''
+
+        self._generate_cache_file()
+        time.sleep(config.timeout)
+        testlib.touch(self.profile)
+
+        cmd = list(self.cmd_prefix)
+        cmd.extend(['-v', '-r', self.profile])
+        report = self.run_cmd_check(cmd, expected_string='Replacement succeeded for')
+
+    @testlib.keep_on_fail
+    def test_parser_newer_skips_cache(self):
+        '''test cache is skipped if parser is newer'''
+
+        self._generate_cache_file()
+        time.sleep(config.timeout)
+
+        # copy parser
+        os.mkdir(os.path.join(self.tmp_dir, 'parser'))
+        new_parser = os.path.join(self.tmp_dir, 'parser', 'apparmor_parser')
+        shutil.copy(config.parser, new_parser)
+
+        cmd = list(self.cmd_prefix)
+        cmd[0] = new_parser
+        cmd.extend(['-v', '-r', self.profile])
+        report = self.run_cmd_check(cmd, expected_string='Replacement succeeded for')
+
+    def _purge_cache_test(self, location):
+
+        cache_file = os.path.join(self.cache_dir, location)
+        testlib.write_file(cache_file, 'monkey\n')
+
+        cmd = list(self.cmd_prefix)
+        cmd.extend(['-v', '--purge-cache', '-r', self.profile])
+        self.run_cmd_check(cmd)
+        # no message is output
+        self.assert_path_exists(cache_file, expected=False)
+
+    @testlib.keep_on_fail
+    def test_cache_purge_removes_features_file(self):
+        '''test cache --purge-cache removes .features file'''
+        self._purge_cache_test('.features')
+
+    @testlib.keep_on_fail
+    def test_cache_purge_removes_cache_file(self):
+        '''test cache --purge-cache removes profile cache file'''
+        self._purge_cache_test(PROFILE)
+
+    @testlib.keep_on_fail
+    def test_cache_purge_removes_other_cache_files(self):
+        '''test cache --purge-cache removes other cache files'''
+        self._purge_cache_test('monkey')
+
+
+class AAParserAltCacheTests(AAParserCachingTests):
+    '''Same tests as above, but with an alternate cache location specified on the command line'''
+    check_orig_cache = True
+
+    def setUp(self):
+        super(AAParserAltCacheTests, self).setUp()
+
+        alt_cache_dir = tempfile.mkdtemp(prefix='aa-alt-cache', dir=self.tmp_dir)
+        os.chmod(alt_cache_dir, 0o755)
+
+        self.orig_cache_dir = self.cache_dir
+        self.cache_dir = alt_cache_dir
+        self.cmd_prefix.extend(['--cache-loc', alt_cache_dir])
+
+    def tearDown(self):
+        if self.check_orig_cache and len(os.listdir(self.orig_cache_dir)) > 0:
+            self.fail('original cache dir \'%s\' not empty' % self.orig_cache_dir)
+        super(AAParserAltCacheTests, self).tearDown()
+
+    @testlib.keep_on_fail
+    def test_cache_purge_leaves_original_cache_alone(self):
+        '''test cache purging only touches alt cache'''
+
+        # skip tearDown check to ensure non-alt cache is empty
+        self.check_orig_cache = False
+        filelist = [PROFILE, '.features', 'monkey']
+
+        for f in filelist:
+            testlib.write_file(os.path.join(self.orig_cache_dir, f), 'monkey\n')
+
+        self._purge_cache_test(PROFILE)
+
+        for f in filelist:
+            if not os.path.exists(os.path.join(self.orig_cache_dir, f)):
+                self.fail('cache purge removed %s, was not supposed to' % (os.path.join(self.orig_cache_dir, f)))
+
+
+def main():
+    global config
+    p = ArgumentParser()
+    p.add_argument('-p', '--parser', default=testlib.DEFAULT_PARSER, action="store", dest='parser')
+    p.add_argument('-v', '--verbose', action="store_true", dest="verbose")
+    p.add_argument('-d', '--debug', action="store_true", dest="debug")
+    config = p.parse_args()
+
+    verbosity = 1
+    if config.verbose:
+        verbosity = 2
+
+    test_suite = unittest.TestSuite()
+    test_suite.addTest(unittest.TestLoader().loadTestsFromTestCase(AAParserBasicCachingTests))
+    test_suite.addTest(unittest.TestLoader().loadTestsFromTestCase(AAParserAltCacheBasicTests))
+    test_suite.addTest(unittest.TestLoader().loadTestsFromTestCase(AAParserCreateCacheBasicTestsCacheExists))
+    test_suite.addTest(unittest.TestLoader().loadTestsFromTestCase(AAParserCreateCacheBasicTestsCacheNotExist))
+    test_suite.addTest(unittest.TestLoader().loadTestsFromTestCase(AAParserCreateCacheAltCacheTestsCacheNotExist))
+    test_suite.addTest(unittest.TestLoader().loadTestsFromTestCase(AAParserCachingTests))
+    test_suite.addTest(unittest.TestLoader().loadTestsFromTestCase(AAParserAltCacheTests))
+    rc = 0
+    try:
+        result = unittest.TextTestRunner(verbosity=verbosity).run(test_suite)
+        if not result.wasSuccessful():
+            rc = 1
+    except:
+        rc = 1
+
+    return rc
+
+if __name__ == "__main__":
+    rc = main()
+    exit(rc)
Index: b/parser/tst/testlib.py
===================================================================
--- a/parser/tst/testlib.py
+++ b/parser/tst/testlib.py
@@ -10,8 +10,13 @@
 #
 # ------------------------------------------------------------------
 
+import os
+import shutil
 import signal
 import subprocess
+import tempfile
+import time
+import unittest
 
 TIMEOUT_ERROR_CODE = 152
 DEFAULT_PARSER = '../apparmor_parser'
@@ -26,32 +31,58 @@ def subprocess_setup():
     signal.signal(signal.SIGPIPE, signal.SIG_DFL)
 
 
-def run_cmd(command, input=None, stderr=subprocess.STDOUT, stdout=subprocess.PIPE, stdin=None, timeout=120):
-    '''Try to execute given command (array) and return its stdout, or
-    return a textual error if it failed.'''
+class AATestTemplate(unittest.TestCase):
+    '''Stub class for use by test scripts'''
+    debug = False
+    do_cleanup = True
+
+    def run_cmd_check(self, command, input=None, stderr=subprocess.STDOUT, stdout=subprocess.PIPE,
+                      stdin=None, timeout=120, expected_rc=0, expected_string=None):
+        '''Wrapper around run_cmd that checks the rc code against
+           expected_rc and for expected strings in the output if
+           passed. The valgrind tests generally don't care what the
+           rc is as long as it's not a specific set of return codes,
+           so can't push the check directly into run_cmd().'''
+        rc, report = self.run_cmd(command, input, stderr, stdout, stdin, timeout)
+        self.assertEqual(rc, expected_rc, "Got return code %d, expected %d\nCommand run: %s\nOutput: %s" % (rc, expected_rc, (' '.join(command)), report))
+        if expected_string:
+            self.assertIn(expected_string, report, 'Expected message "%s", got: \n%s' % (expected_string, report))
+        return report
+
+    def run_cmd(self, command, input=None, stderr=subprocess.STDOUT, stdout=subprocess.PIPE,
+                stdin=None, timeout=120):
+        '''Try to execute given command (array) and return its stdout, or
+           return a textual error if it failed.'''
 
-    try:
-        sp = subprocess.Popen(command, stdin=stdin, stdout=stdout, stderr=stderr, close_fds=True, preexec_fn=subprocess_setup)
-    except OSError as e:
-        return [127, str(e)]
+        if self.debug:
+            print('\n===> Running command: \'%s\'' % (' '.join(command)))
 
-    timeout_communicate = TimeoutFunction(sp.communicate, timeout)
-    out, outerr = (None, None)
-    try:
-        out, outerr = timeout_communicate(input)
-        rc = sp.returncode
-    except TimeoutFunctionException as e:
-        sp.terminate()
-        outerr = b'test timed out, killed'
-        rc = TIMEOUT_ERROR_CODE
-
-    # Handle redirection of stdout
-    if out is None:
-        out = b''
-    # Handle redirection of stderr
-    if outerr is None:
-        outerr = b''
-    return [rc, out.decode('utf-8') + outerr.decode('utf-8')]
+        try:
+            sp = subprocess.Popen(command, stdin=stdin, stdout=stdout, stderr=stderr,
+                                  close_fds=True, preexec_fn=subprocess_setup)
+        except OSError as e:
+            return [127, str(e)]
+
+        timeout_communicate = TimeoutFunction(sp.communicate, timeout)
+        out, outerr = (None, None)
+        try:
+            out, outerr = timeout_communicate(input)
+            rc = sp.returncode
+        except TimeoutFunctionException as e:
+            sp.terminate()
+            outerr = b'test timed out, killed'
+            rc = TIMEOUT_ERROR_CODE
+
+        # Handle redirection of stdout
+        if out is None:
+            out = b''
+        # Handle redirection of stderr
+        if outerr is None:
+            outerr = b''
+
+        report = out.decode('utf-8') + outerr.decode('utf-8')
+
+        return [rc, report]
 
 
 # Timeout handler using alarm() from John P. Speno's Pythonic Avocado
@@ -77,3 +108,79 @@ class TimeoutFunction:
             signal.signal(signal.SIGALRM, old)
         signal.alarm(0)
         return result
+
+
+def filesystem_time_resolution():
+    '''detect whether the filesystem stores sub 1 second timestamps'''
+
+    default_diff = 0.1
+    result = (True, default_diff)
+
+    tmp_dir = tempfile.mkdtemp(prefix='aa-caching-nanostamp-')
+    try:
+        last_stamp = None
+        for i in range(10):
+            s = None
+
+            with open(os.path.join(tmp_dir, 'test.%d' % i), 'w+') as f:
+                s = os.fstat(f.fileno())
+
+            if (s.st_mtime == last_stamp):
+                print('\n===> WARNING: TMPDIR lacks nanosecond timestamp resolution, falling back to slower test')
+                result = (False, 1.0)
+                break
+
+            last_stamp = s.st_mtime
+            time.sleep(default_diff)
+    except:
+        pass
+    finally:
+        if os.path.exists(tmp_dir):
+            shutil.rmtree(tmp_dir)
+
+    return result
+
+
+def read_features_dir(path):
+
+    result = ''
+    if not os.path.exists(path) or not os.path.isdir(path):
+        return result
+
+    for name in os.listdir(path):
+        entry = os.path.join(path, name)
+        result += '%s {' % name
+        if os.path.isfile(entry):
+            with open(entry, 'r') as f:
+                # don't need extra '\n' here as features file contains it
+                result += '%s' % (f.read())
+        elif os.path.isdir(entry):
+            result += '%s' % (read_features_dir(entry))
+        result += '}\n'
+
+    return result
+
+
+def touch(path):
+    return os.utime(path, None)
+
+
+def write_file(path, contents):
+    '''write contents to path'''
+    with open(path, 'w+') as f:
+        f.write(contents)
+
+
+def keep_on_fail(unittest_func):
+    '''wrapping function for unittest testcases to detect failure
+       and leave behind test files in tearDown(); to be used as a
+       decorator'''
+
+    def new_unittest_func(self):
+        try:
+            unittest_func(self)
+        except Exception:
+            self.do_cleanup = False
+            raise
+
+    return new_unittest_func
Index: b/parser/tst/valgrind_simple.py
===================================================================
--- a/parser/tst/valgrind_simple.py
+++ b/parser/tst/valgrind_simple.py
@@ -57,7 +57,7 @@ VALGRIND_SUPPRESSIONS = '''
 }'''
 
 
-class AAParserValgrindTests(unittest.TestCase):
+class AAParserValgrindTests(testlib.AATestTemplate):
     def setUp(self):
         # REPORT ALL THE OUTPUT
         self.maxDiff = None
@@ -70,7 +70,7 @@ class AAParserValgrindTests(unittest.Tes
         command.append(config.parser)
         command.extend(parser_args)
         command.append(testname)
-        rc, output = testlib.run_cmd(command, timeout=120)
+        rc, output = self.run_cmd(command, timeout=120)
         self.assertNotIn(rc, failure_rc,
                     "valgrind returned error code %d, gave the following output\n%s" % (rc, output))
 

-- 
Steve Beattie
<sbeattie at ubuntu.com>
http://NxNW.org/~steve/
-------------- next part --------------
A non-text attachment was scrubbed...
Name: signature.asc
Type: application/pgp-signature
Size: 836 bytes
Desc: Digital signature
URL: <https://lists.ubuntu.com/archives/apparmor/attachments/20131015/b086e556/attachment-0001.pgp>


More information about the AppArmor mailing list