[apparmor] [patch 05/13] parser - rewrite caching tests in python unittest
Steve Beattie
steve at nxnw.org
Tue Oct 15 19:46:08 UTC 2013
On Fri, Oct 11, 2013 at 10:08:51PM +0200, Christian Boltz wrote:
> We'll see if you still like this in some months...
While I reserve the right to flake out^W^W change my mind, I help
maintain and improve other codebases that don't get reviews before
commits... and they sure could use it.
> (and I wonder if Kshitij smiled now that I have a new victim for my
> reviews ;-)
Heh, at least he knows now you weren't just picking on him. :)
> argparse already works with python 2.7 (Kshitij's code is using it, and
> I regularly test it with py 2.7 and 3.3), so there's no reason to stay
> with something that is deprecated ;-)
Doh, I missed that it was also added in 2.7. Fixed for both
valgrind_simple.py and caching.py.
> > > After reading the second half of the patch, I noticed that you also
> > > add monkeys to other files, so maybe (untested!)
> > >
> > > def add_monkey(filename):
> > > banana = os.path.join(self.cache_dir, filename)
> > >
> > > with open(banana, 'w+') as f:
> > > f.write('monkey')
> > >
> > > would be another good solution.
> >
> > Yeah, I'll think about this a bit, to determine what helper
> > function(s) I want to add to simplify it a bit.
>
> I tend to the add_monkey function, because that makes the testcase
> easier to understand than adding a "true" parameter to
> _generate_cache_file.
>
> Maybe it would be even easier with "break_file" instead of "add_monkey"
> as function name. OTOH, I'm quite sure people _will_ check what
> add_monkey() does, but nobody will read the code of break_file() ;-)
The thing is, I often need the full path for the subsequent
verification check as well, so pushing the os.path.join(self.cache_dir,
...) call into the helper function both limits the generality of
the helper function (in some cases, that's okay) and doesn't save
me very much because I need to do it again later. So in this case,
I created a write_file() function that takes a path and a string and
writes that string to the path. It's more general but means the path
join occurs in the test function.
> > Though, what I'd really like is to somehow set self.do_cleanup to
> > False when any test fails, so that for test cases that fail, the
> > temporary directory is left behind, to make diagnosing why it failed
> > easier to do. I'll think about whether there's a reasonable way to do
> > that.
>
> Looks like it isn't really nice or easy, but at least possible:
>
> http://stackoverflow.com/questions/4414234/getting-pythons-unittest-results-in-a-teardown-method
>
> http://www.piware.de/2012/10/python-unittest-show-log-on-test-failure/
> (the comments are also interesting)
Thanks. A lot of those solutions are specific to adding information
to the logged output, which is fine, but not what I'm after; I want
to be able to potentially re-run the test manually to see why it's
failing with a minimum of effort.
That said, I was able to make the decorator function approach work.
Here's the updated patch:
Subject: parser - rewrite caching tests in python unittest
This patch rewrites the caching test in python, using python's unittest
framework. It has been used with python 2.7 and python 3.3; python2.6
may have issues. It covers the tests in the existing caching.sh
test script (with the exception of the test that checks for when the
parser in $PATH is newer), as well as adding additional tests that
more extensively cover using a cache in an alternate location from
basedir. It also adds simple tests for the --create-cache-dir option
(along with that option's interaction with the alt-cache option).
(Some further work to be done is listed under TODO.)
Patch history:
v1: - initial version
v2: - create template base class
- add keep_on_fail() decorator to keep temporary test files
around after a test fails
- don't dump raw cache file to failure output in
test_cache_writing_updates_cache_file()
- push run_cmd into template class
- create run_cmd_check wrapper to run_cmd that adds an assertion
check based on whether return code matches the expected rc
(the valgrind tests only want to verify that the rc is not a
specific set of values, hence the separate wrapper function)
- similarly, add a check to run_cmd_check for verifying the output
contains a specific string, also simplifying many of the caching
tests.
- create testlib.write_file() to simplify writing file
Signed-off-by: Steve Beattie <steve at nxnw.org>
---
parser/tst/caching.py | 460 ++++++++++++++++++++++++++++++++++++++++++
parser/tst/testlib.py | 155 +++++++++++---
parser/tst/valgrind_simple.py | 4
3 files changed, 593 insertions(+), 26 deletions(-)
Index: b/parser/tst/caching.py
===================================================================
--- /dev/null
+++ b/parser/tst/caching.py
@@ -0,0 +1,460 @@
+#!/usr/bin/env python
+# ------------------------------------------------------------------
+#
+# Copyright (C) 2013 Canonical Ltd.
+# Author: Steve Beattie <steve at nxnw.org>
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of version 2 of the GNU General Public
+# License published by the Free Software Foundation.
+#
+# ------------------------------------------------------------------
+
+# TODO
+# - check cache not used if parser in $PATH is newer
+# - check cache used/not used if includes are newer/older
+# - check cache used for force-complain, disable symlink, etc.
+
+from argparse import ArgumentParser
+import os
+import shutil
+import time
+import tempfile
+import unittest
+
+import testlib
+
+
+PROFILE_CONTENTS = '''
+# Simple example profile for caching tests
+
+/bin/pingy {
+ capability net_raw,
+ capability setuid,
+ network inet raw,
+
+ /bin/ping mixr,
+ /etc/modules.conf r,
+}
+'''
+PROFILE = 'sbin.pingy'
+config = None
+
+
+class AAParserCachingCommon(testlib.AATestTemplate):
+ do_cleanup = True
+
+ def setUp(self):
+ '''setup for each test'''
+ global config
+
+ # REPORT ALL THE OUTPUT
+ self.maxDiff = None
+
+ self.tmp_dir = tempfile.mkdtemp(prefix='aa-caching-')
+ os.chmod(self.tmp_dir, 0o755)
+
+ # create directory for cached blobs
+ self.cache_dir = os.path.join(self.tmp_dir, 'cache')
+ os.mkdir(self.cache_dir)
+
+ # write our sample profile out
+ self.profile = os.path.join(self.tmp_dir, PROFILE)
+ with open(self.profile, 'w+') as p:
+ p.write(PROFILE_CONTENTS)
+
+ if config.debug:
+ self.do_cleanup = False
+ self.debug = True
+
+ self.cmd_prefix = [config.parser, '--base', self.tmp_dir, '--skip-kernel-load']
+
+ def tearDown(self):
+ '''teardown for each test'''
+
+ if not self.do_cleanup:
+ print("===> Skipping cleanup, leaving testfiles behind in '%s'" % (self.tmp_dir))
+ else:
+ if os.path.exists(self.tmp_dir):
+ shutil.rmtree(self.tmp_dir)
+
+ def assert_path_exists(self, path, expected=True):
+ if expected is True:
+ self.assertTrue(os.path.exists(path),
+ 'test did not create file %s, when it was expected to do so' % path)
+ else:
+ self.assertFalse(os.path.exists(path),
+ 'test created file %s, when it was not expected to do so' % path)
+
+ def compare_features_file(self, features_path, expected=True):
+ # compare features contents
+ expected_output = testlib.read_features_dir('/sys/kernel/security/apparmor/features')
+ with open(features_path) as f:
+ features = f.read()
+ if expected:
+ self.assertEquals(expected_output, features,
+ "features contents differ, expected:\n%s\nresult:\n%s" % (expected_output, features))
+ else:
+ self.assertNotEquals(expected_output, features,
+ "features contents equal, expected:\n%s\nresult:\n%s" % (expected_output, features))
+
+
+class AAParserBasicCachingTests(AAParserCachingCommon):
+
+ def setUp(self):
+ super(AAParserBasicCachingTests, self).setUp()
+
+ @testlib.keep_on_fail
+ def test_no_cache_by_default(self):
+ '''test profiles are not cached by default'''
+
+ cmd = list(self.cmd_prefix)
+ cmd.extend(['-q', '-r', self.profile])
+ self.run_cmd_check(cmd)
+ self.assert_path_exists(os.path.join(self.cache_dir, PROFILE), expected=False)
+
+ @testlib.keep_on_fail
+ def test_no_cache_w_skip_cache(self):
+ '''test profiles are not cached with --skip-cache'''
+
+ cmd = list(self.cmd_prefix)
+ cmd.extend(['-q', '--write-cache', '--skip-cache', '-r', self.profile])
+ self.run_cmd_check(cmd)
+ self.assert_path_exists(os.path.join(self.cache_dir, PROFILE), expected=False)
+
+ @testlib.keep_on_fail
+ def test_cache_when_requested(self):
+ '''test profiles are cached when requested'''
+
+ cmd = list(self.cmd_prefix)
+ cmd.extend(['-q', '--write-cache', '-r', self.profile])
+ self.run_cmd_check(cmd)
+ self.assert_path_exists(os.path.join(self.cache_dir, PROFILE))
+
+ @testlib.keep_on_fail
+ def test_write_features_when_caching(self):
+ '''test features file is written when caching'''
+
+ cmd = list(self.cmd_prefix)
+ cmd.extend(['-q', '--write-cache', '-r', self.profile])
+ self.run_cmd_check(cmd)
+ self.assert_path_exists(os.path.join(self.cache_dir, PROFILE))
+ self.assert_path_exists(os.path.join(self.cache_dir, '.features'))
+
+ @testlib.keep_on_fail
+ def test_features_match_when_caching(self):
+ '''test features file is written when caching'''
+
+ cmd = list(self.cmd_prefix)
+ cmd.extend(['-q', '--write-cache', '-r', self.profile])
+ self.run_cmd_check(cmd)
+ self.assert_path_exists(os.path.join(self.cache_dir, PROFILE))
+ self.assert_path_exists(os.path.join(self.cache_dir, '.features'))
+
+ self.compare_features_file(os.path.join(self.cache_dir, '.features'))
+
+
+class AAParserAltCacheBasicTests(AAParserBasicCachingTests):
+ '''Same tests as above, but with an alternate cache location specified on the command line'''
+
+ def setUp(self):
+ super(AAParserAltCacheBasicTests, self).setUp()
+
+ alt_cache_dir = tempfile.mkdtemp(prefix='aa-alt-cache', dir=self.tmp_dir)
+ os.chmod(alt_cache_dir, 0o755)
+
+ self.unused_cache_dir = self.cache_dir
+ self.cache_dir = alt_cache_dir
+ self.cmd_prefix.extend(['--cache-loc', alt_cache_dir])
+
+ def tearDown(self):
+ if len(os.listdir(self.unused_cache_dir)) > 0:
+ self.fail('original cache dir \'%s\' not empty' % self.unused_cache_dir)
+ super(AAParserAltCacheBasicTests, self).tearDown()
+
+
+class AAParserCreateCacheBasicTestsCacheExists(AAParserBasicCachingTests):
+ '''Same tests as above, but with create cache option on the command line and the cache already exists'''
+
+ def setUp(self):
+ super(AAParserCreateCacheBasicTestsCacheExists, self).setUp()
+ self.cmd_prefix.append('--create-cache-dir')
+
+
+class AAParserCreateCacheBasicTestsCacheNotExist(AAParserBasicCachingTests):
+ '''Same tests as above, but with create cache option on the command line and cache dir removed'''
+
+ def setUp(self):
+ super(AAParserCreateCacheBasicTestsCacheNotExist, self).setUp()
+ shutil.rmtree(self.cache_dir)
+ self.cmd_prefix.append('--create-cache-dir')
+
+
+class AAParserCreateCacheAltCacheTestsCacheNotExist(AAParserBasicCachingTests):
+ '''Same tests as above, but with create cache option on the command line,
+ alt cache specified, and cache dir removed'''
+
+ def setUp(self):
+ super(AAParserCreateCacheAltCacheTestsCacheNotExist, self).setUp()
+ shutil.rmtree(self.cache_dir)
+ self.cmd_prefix.append('--create-cache-dir')
+
+
+class AAParserCachingTests(AAParserCachingCommon):
+
+ def setUp(self):
+ super(AAParserCachingTests, self).setUp()
+
+ # need separation of length timeout between generating profile
+ # and generating cache entry, as the parser distinguishes
+ # between ctime, not mtime.
+ if not 'timeout' in dir(config):
+ r = testlib.filesystem_time_resolution()
+ config.timeout = r[1]
+
+ time.sleep(config.timeout)
+
+ def _generate_cache_file(self):
+
+ cmd = list(self.cmd_prefix)
+ cmd.extend(['-q', '--write-cache', '-r', self.profile])
+ self.run_cmd_check(cmd)
+ self.assert_path_exists(os.path.join(self.cache_dir, PROFILE))
+
+ @testlib.keep_on_fail
+ def test_cache_loaded_when_exists(self):
+ '''test cache is loaded when it exists, is newer than profile, and features match'''
+
+ self._generate_cache_file()
+
+ cmd = list(self.cmd_prefix)
+ cmd.extend(['-v', '-r', self.profile])
+ report = self.run_cmd_check(cmd, expected_string='Cached reload succeeded')
+
+ @testlib.keep_on_fail
+ def test_cache_not_loaded_when_skip_arg(self):
+ '''test cache is not loaded when --skip-cache is passed'''
+
+ self._generate_cache_file()
+
+ cmd = list(self.cmd_prefix)
+ cmd.extend(['-v', '--skip-cache', '-r', self.profile])
+ report = self.run_cmd_check(cmd, expected_string='Replacement succeeded for')
+
+ @testlib.keep_on_fail
+ def test_cache_not_loaded_when_skip_read_arg(self):
+ '''test cache is not loaded when --skip-read-cache is passed'''
+
+ self._generate_cache_file()
+
+ cmd = list(self.cmd_prefix)
+ cmd.extend(['-v', '--skip-read-cache', '-r', self.profile])
+ report = self.run_cmd_check(cmd, expected_string='Replacement succeeded for')
+
+ @testlib.keep_on_fail
+ def test_cache_not_loaded_when_features_differ(self):
+ '''test cache is not loaded when features file differs'''
+
+ self._generate_cache_file()
+
+ testlib.write_file(os.path.join(self.cache_dir, '.features'), 'monkey\n')
+
+ cmd = list(self.cmd_prefix)
+ cmd.extend(['-v', '-r', self.profile])
+ report = self.run_cmd_check(cmd, expected_string='Replacement succeeded for')
+
+ @testlib.keep_on_fail
+ def test_cache_writing_does_not_overwrite_features_when_features_differ(self):
+ '''test cache writing does not overwrite the features files when it differs and --skip-bad-cache is given'''
+
+ features_file = os.path.join(self.cache_dir, '.features')
+ testlib.write_file(features_file, 'monkey\n')
+
+ cmd = list(self.cmd_prefix)
+ cmd.extend(['-v', '--write-cache', '--skip-bad-cache', '-r', self.profile])
+ report = self.run_cmd_check(cmd, expected_string='Replacement succeeded for')
+ self.assert_path_exists(features_file)
+ # ensure that the features does *not* match the current features set
+ self.compare_features_file(features_file, expected=False)
+
+ @testlib.keep_on_fail
+ def test_cache_writing_skipped_when_features_differ(self):
+ '''test cache writing is skipped when features file differs'''
+
+ testlib.write_file(os.path.join(self.cache_dir, '.features'), 'monkey\n')
+
+ cmd = list(self.cmd_prefix)
+ cmd.extend(['-v', '--write-cache', '--skip-bad-cache', '-r', self.profile])
+ report = self.run_cmd_check(cmd, expected_string='Replacement succeeded for')
+ self.assert_path_exists(os.path.join(self.cache_dir, PROFILE), expected=False)
+
+ @testlib.keep_on_fail
+ def test_cache_writing_updates_features(self):
+ '''test cache writing updates features'''
+
+ features_file = os.path.join(self.cache_dir, '.features')
+ testlib.write_file(features_file, 'monkey\n')
+
+ cmd = list(self.cmd_prefix)
+ cmd.extend(['-v', '--write-cache', '-r', self.profile])
+ report = self.run_cmd_check(cmd, expected_string='Replacement succeeded for')
+ self.assert_path_exists(features_file)
+ self.compare_features_file(features_file)
+
+ @testlib.keep_on_fail
+ def test_cache_writing_updates_cache_file(self):
+ '''test cache writing updates cache file'''
+
+ cache_file = os.path.join(self.cache_dir, PROFILE)
+ testlib.write_file(cache_file, 'monkey\n')
+ orig_size = os.stat(cache_file).st_size
+
+ cmd = list(self.cmd_prefix)
+ cmd.extend(['-v', '--write-cache', '-r', self.profile])
+ report = self.run_cmd_check(cmd, expected_string='Replacement succeeded for')
+ self.assert_path_exists(cache_file)
+ with open(cache_file, 'rb') as f:
+ new_size = os.fstat(f.fileno()).st_size
+ # We check sizes here rather than whether the string monkey is
+ # in cache_contents because of the difficulty coercing cache
+ # file bytes into strings in python3
+ self.assertNotEquals(orig_size, new_size, 'Expected cache file to be updated, size is not changed.')
+
+ @testlib.keep_on_fail
+ def test_cache_writing_clears_all_files(self):
+ '''test cache writing clears all cache files'''
+
+ check_file = os.path.join(self.cache_dir, 'monkey')
+ testlib.write_file(check_file, 'monkey\n')
+
+ cmd = list(self.cmd_prefix)
+ cmd.extend(['-v', '--write-cache', '-r', self.profile])
+ report = self.run_cmd_check(cmd, expected_string='Replacement succeeded for')
+ self.assert_path_exists(check_file, expected=False)
+
+ @testlib.keep_on_fail
+ def test_profile_newer_skips_cache(self):
+ '''test cache is skipped if profile is newer'''
+
+ self._generate_cache_file()
+ time.sleep(config.timeout)
+ testlib.touch(self.profile)
+
+ cmd = list(self.cmd_prefix)
+ cmd.extend(['-v', '-r', self.profile])
+ report = self.run_cmd_check(cmd, expected_string='Replacement succeeded for')
+
+ @testlib.keep_on_fail
+ def test_parser_newer_skips_cache(self):
+ '''test cache is skipped if parser is newer'''
+
+ self._generate_cache_file()
+ time.sleep(config.timeout)
+
+ # copy parser
+ os.mkdir(os.path.join(self.tmp_dir, 'parser'))
+ new_parser = os.path.join(self.tmp_dir, 'parser', 'apparmor_parser')
+ shutil.copy(config.parser, new_parser)
+
+ cmd = list(self.cmd_prefix)
+ cmd[0] = new_parser
+ cmd.extend(['-v', '-r', self.profile])
+ report = self.run_cmd_check(cmd, expected_string='Replacement succeeded for')
+
+ def _purge_cache_test(self, location):
+
+ cache_file = os.path.join(self.cache_dir, location)
+ testlib.write_file(cache_file, 'monkey\n')
+
+ cmd = list(self.cmd_prefix)
+ cmd.extend(['-v', '--purge-cache', '-r', self.profile])
+ self.run_cmd_check(cmd)
+ # no message is output
+ self.assert_path_exists(cache_file, expected=False)
+
+ @testlib.keep_on_fail
+ def test_cache_purge_removes_features_file(self):
+ '''test cache --purge-cache removes .features file'''
+ self._purge_cache_test('.features')
+
+ @testlib.keep_on_fail
+ def test_cache_purge_removes_cache_file(self):
+ '''test cache --purge-cache removes profile cache file'''
+ self._purge_cache_test(PROFILE)
+
+ @testlib.keep_on_fail
+ def test_cache_purge_removes_other_cache_files(self):
+ '''test cache --purge-cache removes other cache files'''
+ self._purge_cache_test('monkey')
+
+
+class AAParserAltCacheTests(AAParserCachingTests):
+ '''Same tests as above, but with an alternate cache location specified on the command line'''
+ check_orig_cache = True
+
+ def setUp(self):
+ super(AAParserAltCacheTests, self).setUp()
+
+ alt_cache_dir = tempfile.mkdtemp(prefix='aa-alt-cache', dir=self.tmp_dir)
+ os.chmod(alt_cache_dir, 0o755)
+
+ self.orig_cache_dir = self.cache_dir
+ self.cache_dir = alt_cache_dir
+ self.cmd_prefix.extend(['--cache-loc', alt_cache_dir])
+
+ def tearDown(self):
+ if self.check_orig_cache and len(os.listdir(self.orig_cache_dir)) > 0:
+ self.fail('original cache dir \'%s\' not empty' % self.orig_cache_dir)
+ super(AAParserAltCacheTests, self).tearDown()
+
+ @testlib.keep_on_fail
+ def test_cache_purge_leaves_original_cache_alone(self):
+ '''test cache purging only touches alt cache'''
+
+ # skip tearDown check to ensure non-alt cache is empty
+ self.check_orig_cache = False
+ filelist = [PROFILE, '.features', 'monkey']
+
+ for f in filelist:
+ testlib.write_file(os.path.join(self.orig_cache_dir, f), 'monkey\n')
+
+ self._purge_cache_test(PROFILE)
+
+ for f in filelist:
+ if not os.path.exists(os.path.join(self.orig_cache_dir, f)):
+ self.fail('cache purge removed %s, was not supposed to' % (os.path.join(self.orig_cache_dir, f)))
+
+
+def main():
+ global config
+ p = ArgumentParser()
+ p.add_argument('-p', '--parser', default=testlib.DEFAULT_PARSER, action="store", dest='parser')
+ p.add_argument('-v', '--verbose', action="store_true", dest="verbose")
+ p.add_argument('-d', '--debug', action="store_true", dest="debug")
+ config = p.parse_args()
+
+ verbosity = 1
+ if config.verbose:
+ verbosity = 2
+
+ test_suite = unittest.TestSuite()
+ test_suite.addTest(unittest.TestLoader().loadTestsFromTestCase(AAParserBasicCachingTests))
+ test_suite.addTest(unittest.TestLoader().loadTestsFromTestCase(AAParserAltCacheBasicTests))
+ test_suite.addTest(unittest.TestLoader().loadTestsFromTestCase(AAParserCreateCacheBasicTestsCacheExists))
+ test_suite.addTest(unittest.TestLoader().loadTestsFromTestCase(AAParserCreateCacheBasicTestsCacheNotExist))
+ test_suite.addTest(unittest.TestLoader().loadTestsFromTestCase(AAParserCreateCacheAltCacheTestsCacheNotExist))
+ test_suite.addTest(unittest.TestLoader().loadTestsFromTestCase(AAParserCachingTests))
+ test_suite.addTest(unittest.TestLoader().loadTestsFromTestCase(AAParserAltCacheTests))
+ rc = 0
+ try:
+ result = unittest.TextTestRunner(verbosity=verbosity).run(test_suite)
+ if not result.wasSuccessful():
+ rc = 1
+ except:
+ rc = 1
+
+ return rc
+
+if __name__ == "__main__":
+ rc = main()
+ exit(rc)
Index: b/parser/tst/testlib.py
===================================================================
--- a/parser/tst/testlib.py
+++ b/parser/tst/testlib.py
@@ -10,8 +10,13 @@
#
# ------------------------------------------------------------------
+import os
+import shutil
import signal
import subprocess
+import tempfile
+import time
+import unittest
TIMEOUT_ERROR_CODE = 152
DEFAULT_PARSER = '../apparmor_parser'
@@ -26,32 +31,58 @@ def subprocess_setup():
signal.signal(signal.SIGPIPE, signal.SIG_DFL)
-def run_cmd(command, input=None, stderr=subprocess.STDOUT, stdout=subprocess.PIPE, stdin=None, timeout=120):
- '''Try to execute given command (array) and return its stdout, or
- return a textual error if it failed.'''
+class AATestTemplate(unittest.TestCase):
+ '''Stub class for use by test scripts'''
+ debug = False
+ do_cleanup = True
+
+ def run_cmd_check(self, command, input=None, stderr=subprocess.STDOUT, stdout=subprocess.PIPE,
+ stdin=None, timeout=120, expected_rc=0, expected_string=None):
+ '''Wrapper around run_cmd that checks the rc code against
+ expected_rc and for expected strings in the output if
+ passed. The valgrind tests generally don't care what the
+ rc is as long as it's not a specific set of return codes,
+ so can't push the check directly into run_cmd().'''
+ rc, report = self.run_cmd(command, input, stderr, stdout, stdin, timeout)
+ self.assertEqual(rc, expected_rc, "Got return code %d, expected %d\nCommand run: %s\nOutput: %s" % (rc, expected_rc, (' '.join(command)), report))
+ if expected_string:
+ self.assertIn(expected_string, report, 'Expected message "%s", got: \n%s' % (expected_string, report))
+ return report
+
+ def run_cmd(self, command, input=None, stderr=subprocess.STDOUT, stdout=subprocess.PIPE,
+ stdin=None, timeout=120):
+ '''Try to execute given command (array) and return its stdout, or
+ return a textual error if it failed.'''
- try:
- sp = subprocess.Popen(command, stdin=stdin, stdout=stdout, stderr=stderr, close_fds=True, preexec_fn=subprocess_setup)
- except OSError as e:
- return [127, str(e)]
+ if self.debug:
+ print('\n===> Running command: \'%s\'' % (' '.join(command)))
- timeout_communicate = TimeoutFunction(sp.communicate, timeout)
- out, outerr = (None, None)
- try:
- out, outerr = timeout_communicate(input)
- rc = sp.returncode
- except TimeoutFunctionException as e:
- sp.terminate()
- outerr = b'test timed out, killed'
- rc = TIMEOUT_ERROR_CODE
-
- # Handle redirection of stdout
- if out is None:
- out = b''
- # Handle redirection of stderr
- if outerr is None:
- outerr = b''
- return [rc, out.decode('utf-8') + outerr.decode('utf-8')]
+ try:
+ sp = subprocess.Popen(command, stdin=stdin, stdout=stdout, stderr=stderr,
+ close_fds=True, preexec_fn=subprocess_setup)
+ except OSError as e:
+ return [127, str(e)]
+
+ timeout_communicate = TimeoutFunction(sp.communicate, timeout)
+ out, outerr = (None, None)
+ try:
+ out, outerr = timeout_communicate(input)
+ rc = sp.returncode
+ except TimeoutFunctionException as e:
+ sp.terminate()
+ outerr = b'test timed out, killed'
+ rc = TIMEOUT_ERROR_CODE
+
+ # Handle redirection of stdout
+ if out is None:
+ out = b''
+ # Handle redirection of stderr
+ if outerr is None:
+ outerr = b''
+
+ report = out.decode('utf-8') + outerr.decode('utf-8')
+
+ return [rc, report]
# Timeout handler using alarm() from John P. Speno's Pythonic Avocado
@@ -77,3 +108,79 @@ class TimeoutFunction:
signal.signal(signal.SIGALRM, old)
signal.alarm(0)
return result
+
+
+def filesystem_time_resolution():
+ '''detect whether the filesystem stores sub 1 second timestamps'''
+
+ default_diff = 0.1
+ result = (True, default_diff)
+
+ tmp_dir = tempfile.mkdtemp(prefix='aa-caching-nanostamp-')
+ try:
+ last_stamp = None
+ for i in range(10):
+ s = None
+
+ with open(os.path.join(tmp_dir, 'test.%d' % i), 'w+') as f:
+ s = os.fstat(f.fileno())
+
+ if (s.st_mtime == last_stamp):
+ print('\n===> WARNING: TMPDIR lacks nanosecond timestamp resolution, falling back to slower test')
+ result = (False, 1.0)
+ break
+
+ last_stamp = s.st_mtime
+ time.sleep(default_diff)
+ except:
+ pass
+ finally:
+ if os.path.exists(tmp_dir):
+ shutil.rmtree(tmp_dir)
+
+ return result
+
+
+def read_features_dir(path):
+
+ result = ''
+ if not os.path.exists(path) or not os.path.isdir(path):
+ return result
+
+ for name in os.listdir(path):
+ entry = os.path.join(path, name)
+ result += '%s {' % name
+ if os.path.isfile(entry):
+ with open(entry, 'r') as f:
+ # don't need extra '\n' here as features file contains it
+ result += '%s' % (f.read())
+ elif os.path.isdir(entry):
+ result += '%s' % (read_features_dir(entry))
+ result += '}\n'
+
+ return result
+
+
+def touch(path):
+ return os.utime(path, None)
+
+
+def write_file(path, contents):
+ '''write contents to path'''
+ with open(path, 'w+') as f:
+ f.write(contents)
+
+
+def keep_on_fail(unittest_func):
+ '''wrapping function for unittest testcases to detect failure
+ and leave behind test files in tearDown(); to be used as a
+ decorator'''
+
+ def new_unittest_func(self):
+ try:
+ unittest_func(self)
+ except Exception:
+ self.do_cleanup = False
+ raise
+
+ return new_unittest_func
Index: b/parser/tst/valgrind_simple.py
===================================================================
--- a/parser/tst/valgrind_simple.py
+++ b/parser/tst/valgrind_simple.py
@@ -57,7 +57,7 @@ VALGRIND_SUPPRESSIONS = '''
}'''
-class AAParserValgrindTests(unittest.TestCase):
+class AAParserValgrindTests(testlib.AATestTemplate):
def setUp(self):
# REPORT ALL THE OUTPUT
self.maxDiff = None
@@ -70,7 +70,7 @@ class AAParserValgrindTests(unittest.Tes
command.append(config.parser)
command.extend(parser_args)
command.append(testname)
- rc, output = testlib.run_cmd(command, timeout=120)
+ rc, output = self.run_cmd(command, timeout=120)
self.assertNotIn(rc, failure_rc,
"valgrind returned error code %d, gave the following output\n%s" % (rc, output))
--
Steve Beattie
<sbeattie at ubuntu.com>
http://NxNW.org/~steve/
-------------- next part --------------
A non-text attachment was scrubbed...
Name: signature.asc
Type: application/pgp-signature
Size: 836 bytes
Desc: Digital signature
URL: <https://lists.ubuntu.com/archives/apparmor/attachments/20131015/b086e556/attachment-0001.pgp>
More information about the AppArmor
mailing list