[apparmor] [patch 3/3] utils: use capability rule class in aa.py and cleanprof.py
Steve Beattie
steve at nxnw.org
Wed Dec 3 18:23:10 UTC 2014
This patch integrated the new capability rule class into aa.py and cleanprof.py.
Patch changes:
v5:
- merge my changes into Christian's original patches
- use CapabilityRule.parse() for parsing raw capability rules and
getting a CapabilityRule instance back
- cope with move of parse_modifiers back into rule/__init__.py.
---
utils/apparmor/aa.py | 188 ++++++++++++++---------------------------
utils/apparmor/cleanprofile.py | 15 ---
2 files changed, 68 insertions(+), 135 deletions(-)
Index: b/utils/apparmor/aa.py
===================================================================
--- a/utils/apparmor/aa.py
+++ b/utils/apparmor/aa.py
@@ -52,6 +52,9 @@ from apparmor.regex import (RE_PROFILE_S
import apparmor.rules as aarules
+from apparmor.rule.capability import CapabilityRuleset, CapabilityRule
+from apparmor.rule import parse_modifiers
+
from apparmor.yasti import SendDataToYast, GetDataFromYast, shutdown_yast
# setup module translations
@@ -89,13 +92,19 @@ seen_events = 0 # was our
# To store the globs entered by users so they can be provided again
user_globs = []
-# The key for representing bare rules such as "capability," or "file,"
+# The key for representing bare "file," rules
ALL = '\0ALL'
## Variables used under logprof
### Were our
t = hasher() # dict()
transitions = hasher()
+
+# keys used in aa[profile][hat]:
+# a) rules (as dict): alias, change_profile, include, lvar, rlimit
+# b) rules (as hasher): allow, deny
+# c) one for each rule class
+# d) other: declared, external, flags, name, profile
aa = hasher() # Profiles originally in sd, replace by aa
original_aa = hasher()
extras = hasher() # Inactive profiles from extras
@@ -1546,13 +1555,14 @@ def ask_the_questions():
for hat in hats:
for capability in sorted(log_dict[aamode][profile][hat]['capability'].keys()):
# skip if capability already in profile
- if profile_known_capability(aa[profile][hat], capability):
+ capability_obj = CapabilityRule(capability)
+ if is_known_rule(aa[profile][hat], 'capability', capability_obj):
continue
# Load variables? Don't think so.
severity = sev_db.rank('CAP_%s' % capability)
default_option = 1
options = []
- newincludes = match_cap_includes(aa[profile][hat], capability)
+ newincludes = match_includes(aa[profile][hat], 'capability', capability_obj)
q = aaui.PromptQuestion()
if newincludes:
@@ -1625,16 +1635,18 @@ def ask_the_questions():
if deleted:
aaui.UI_Info(_('Deleted %s previous matching profile entries.') % deleted)
- aa[profile][hat]['allow']['capability'][capability]['set'] = True
- aa[profile][hat]['allow']['capability'][capability]['audit'] = audit_toggle
+ else:
+ capability_obj = CapabilityRule(capability, audit=audit)
+ aa[profile][hat]['capability'].add(capability_obj)
+ aaui.UI_Info(_('Adding capability %s to profile.') % capability)
changed[profile] = True
- aaui.UI_Info(_('Adding capability %s to profile.') % capability)
done = True
elif ans == 'CMD_DENY':
- aa[profile][hat]['deny']['capability'][capability]['set'] = True
+ capability_obj = CapabilityRule(capability, audit=audit, deny=True)
+ aa[profile][hat]['capability'].add(capability_obj)
changed[profile] = True
aaui.UI_Info(_('Denying capability %s to profile.') % capability)
@@ -2119,20 +2131,6 @@ def delete_net_duplicates(netrules, incn
deleted += 1
return deleted
-def delete_cap_duplicates(profilecaps, inccaps):
- deleted = []
- if profilecaps and inccaps:
- for capname in profilecaps.keys():
- # XXX The presence of a bare capability rule ("capability,") should
- # cause more specific capability rules
- # ("capability audit_control,") to be deleted
- if inccaps[capname].get('set', False) == 1:
- deleted.append(capname)
- for capname in deleted:
- profilecaps.pop(capname)
-
- return len(deleted)
-
def delete_path_duplicates(profile, incname, allow):
deleted = []
for entry in profile[allow]['path'].keys():
@@ -2159,9 +2157,7 @@ def delete_duplicates(profile, incname):
deleted += delete_net_duplicates(profile['deny']['netdomain'], include[incname][incname]['deny']['netdomain'])
- deleted += delete_cap_duplicates(profile['allow']['capability'], include[incname][incname]['allow']['capability'])
-
- deleted += delete_cap_duplicates(profile['deny']['capability'], include[incname][incname]['deny']['capability'])
+ deleted += profile['capability'].delete_duplicates(include[incname][incname]['capability'])
deleted += delete_path_duplicates(profile, incname, 'allow')
deleted += delete_path_duplicates(profile, incname, 'deny')
@@ -2171,9 +2167,7 @@ def delete_duplicates(profile, incname):
deleted += delete_net_duplicates(profile['deny']['netdomain'], filelist[incname][incname]['deny']['netdomain'])
- deleted += delete_cap_duplicates(profile['allow']['capability'], filelist[incname][incname]['allow']['capability'])
-
- deleted += delete_cap_duplicates(profile['deny']['capability'], filelist[incname][incname]['deny']['capability'])
+ deleted += profile['capability'].delete_duplicates(filelist[incname][incname]['capability'])
deleted += delete_path_duplicates(profile, incname, 'allow')
deleted += delete_path_duplicates(profile, incname, 'deny')
@@ -2201,10 +2195,16 @@ def match_net_include(incname, family, t
return False
-def match_cap_includes(profile, cap):
+def match_cap_includes(profile, capability):
+ # still used by aa-mergeprof
+ capability_obj = CapabilityRule(capability)
+ return match_includes(profile, 'capability', capability_obj)
+
+def match_includes(profile, rule_type, rule_obj):
newincludes = []
for incname in include.keys():
- if valid_include(profile, incname) and include[incname][incname]['allow']['capability'][cap].get('set', False) == 1:
+ # XXX type check should go away once we init all profiles correctly
+ if valid_include(profile, incname) and include[incname][incname].get(rule_type, False) and include[incname][incname][rule_type].is_covered(rule_obj):
newincludes.append(incname)
return newincludes
@@ -2512,10 +2512,11 @@ def collapse_log():
log_dict[aamode][profile][hat]['path'][path] = mode
- for capability in prelog[aamode][profile][hat]['capability'].keys():
+ for cap in prelog[aamode][profile][hat]['capability'].keys():
# If capability not already in profile
- if not aa[profile][hat]['allow']['capability'][capability].get('set', False):
- log_dict[aamode][profile][hat]['capability'][capability] = True
+ # XXX remove first check when we have proper profile initialisation
+ if aa[profile][hat].get('capability', False) and not aa[profile][hat]['capability'].is_covered(CapabilityRule(cap)):
+ log_dict[aamode][profile][hat]['capability'][cap] = True
nd = prelog[aamode][profile][hat]['netdomain']
for family in nd.keys():
@@ -2702,6 +2703,10 @@ def parse_profile_data(data, file, do_in
profile_data[profile][profile]['repo']['url'] = repo_data['url']
profile_data[profile][profile]['repo']['user'] = repo_data['user']
+ # init rule classes (if not done yet)
+ if not profile_data[profile][hat].get('capability', False):
+ profile_data[profile][hat]['capability'] = CapabilityRuleset()
+
elif RE_PROFILE_END.search(line):
# If profile ends and we're not in one
if not profile:
@@ -2717,21 +2722,14 @@ def parse_profile_data(data, file, do_in
initial_comment = ''
elif RE_PROFILE_CAP.search(line):
- matches = RE_PROFILE_CAP.search(line)
-
if not profile:
raise AppArmorException(_('Syntax Error: Unexpected capability entry found in file: %(file)s line: %(line)s') % { 'file': file, 'line': lineno + 1 })
- audit, allow, allow_keyword, comment = parse_audit_allow(matches)
- # TODO: honor allow_keyword and comment
+ # init rule class (if not done yet)
+ if not profile_data[profile][hat].get('capability', False):
+ profile_data[profile][hat]['capability'] = CapabilityRuleset()
- capability = ALL
- if matches.group('capability'):
- capability = matches.group('capability').strip()
- # TODO: can contain more than one capability- split it?
-
- profile_data[profile][hat][allow]['capability'][capability]['set'] = True
- profile_data[profile][hat][allow]['capability'][capability]['audit'] = audit
+ profile_data[profile][hat]['capability'].add(CapabilityRule.parse(line))
elif RE_PROFILE_LINK.search(line):
matches = RE_PROFILE_LINK.search(line).groups()
@@ -2840,7 +2838,7 @@ def parse_profile_data(data, file, do_in
if not profile:
raise AppArmorException(_('Syntax Error: Unexpected bare file rule found in file: %(file)s line: %(line)s') % { 'file': file, 'line': lineno + 1 })
- audit, allow, allow_keyword, comment = parse_audit_allow(matches)
+ audit, allow, allow_keyword, comment = parse_modifiers(matches)
# TODO: honor allow_keyword and comment
mode = apparmor.aamode.AA_BARE_FILE_MODE
@@ -3179,26 +3177,6 @@ def parse_profile_data(data, file, do_in
return profile_data
-def parse_audit_allow(matches):
- audit = False
- if matches.group('audit'):
- audit = True
-
- allow = 'allow'
- allow_keyword = False
- if matches.group('allow'):
- allow = matches.group('allow').strip()
- allow_keyword = True
- if allow != 'allow' and allow != 'deny': # should never happen
- raise AppArmorException(_("Invalid allow/deny keyword %s" % allow))
-
- comment = ''
- if matches.group('comment'):
- # include a space so that we don't need to add it everywhere when writing the rule
- comment = ' %s' % matches.group('comment')
-
- return (audit, allow, allow_keyword, comment)
-
# RE_DBUS_ENTRY = re.compile('^dbus\s*()?,\s*$')
# use stuff like '(?P<action>(send|write|w|receive|read|r|rw))'
@@ -3371,29 +3349,10 @@ def var_transform(ref):
def write_list_vars(prof_data, depth):
return write_pair(prof_data, depth, '', 'lvar', '', ' = ', '', var_transform)
-def write_cap_rules(prof_data, depth, allow):
- pre = ' ' * depth
- data = []
- allowstr = set_allow_str(allow)
-
- if prof_data[allow].get('capability', False):
- for cap in sorted(prof_data[allow]['capability'].keys()):
- audit = ''
- if prof_data[allow]['capability'][cap].get('audit', False):
- audit = 'audit '
- if prof_data[allow]['capability'][cap].get('set', False):
- if cap == ALL:
- data.append('%s%s%scapability,' % (pre, audit, allowstr))
- else:
- data.append('%s%s%scapability %s,' % (pre, audit, allowstr, cap))
- data.append('')
-
- return data
-
def write_capabilities(prof_data, depth):
- #data = write_single(prof_data, depth, '', 'set_capability', 'set capability ', ',')
- data = write_cap_rules(prof_data, depth, 'deny')
- data += write_cap_rules(prof_data, depth, 'allow')
+ data = []
+ if prof_data.get('capability', False):
+ data = prof_data['capability'].get_clean(depth)
return data
def write_net_rules(prof_data, depth, allow):
@@ -3823,10 +3782,14 @@ def serialize_profile_from_old_profile(p
depth = len(line) - len(line.lstrip())
data += write_methods[segs](prof_data, int(depth / 2))
segments[segs] = False
+ # delete rules from prof_data to avoid duplication (they are in data now)
if prof_data['allow'].get(segs, False):
prof_data['allow'].pop(segs)
if prof_data['deny'].get(segs, False):
prof_data['deny'].pop(segs)
+ if prof_data.get(segs, False):
+ t = type(prof_data[segs])
+ prof_data[segs] = t()
return data
#data.append('reading prof')
@@ -3887,16 +3850,20 @@ def serialize_profile_from_old_profile(p
if profile:
depth = int(len(line) - len(line.lstrip()) / 2) + 1
- # first write sections that were modified (and remove them from write_prof_data)
+ # first write sections that were modified
#for segs in write_methods.keys():
for segs in default_write_order:
if segments[segs]:
data += write_methods[segs](write_prof_data[name], depth)
segments[segs] = False
+ # delete rules from write_prof_data to avoid duplication (they are in data now)
if write_prof_data[name]['allow'].get(segs, False):
write_prof_data[name]['allow'].pop(segs)
if write_prof_data[name]['deny'].get(segs, False):
write_prof_data[name]['deny'].pop(segs)
+ if write_prof_data[name].get(segs, False):
+ t = type(write_prof_data[name][segs])
+ write_prof_data[name][segs] = t()
# then write everything else
for segs in default_write_order:
@@ -3938,34 +3905,13 @@ def serialize_profile_from_old_profile(p
profile = None
elif RE_PROFILE_CAP.search(line):
- matches = RE_PROFILE_CAP.search(line).groups()
- audit = False
- if matches[0]:
- audit = matches[0]
-
- allow = 'allow'
- if matches[1] and matches[1].strip() == 'deny':
- allow = 'deny'
-
- capability = ALL
- if matches[2]:
- capability = matches[2].strip()
-
- if not write_prof_data[hat][allow]['capability'][capability].get('set', False):
- correct = False
- if not write_prof_data[hat][allow]['capability'][capability].get(audit, False) == audit:
- correct = False
-
- if correct:
+ cap = CapabilityRule.parse(line)
+ if write_prof_data[hat]['capability'].is_covered(cap, True, True):
if not segments['capability'] and True in segments.values():
data += write_prior_segments(write_prof_data[name], segments, line)
segments['capability'] = True
- write_prof_data[hat][allow]['capability'].pop(capability)
+ write_prof_data[hat]['capability'].delete(cap)
data.append(line)
-
- #write_prof_data[hat][allow]['capability'][capability].pop(audit)
-
- #Remove this line
else:
# To-Do
pass
@@ -4341,20 +4287,18 @@ def profile_known_exec(profile, typ, exe
return 0
-def profile_known_capability(profile, capname):
- if profile['deny']['capability'][capname].get('set', False):
- return -1
-
- if profile['allow']['capability'][capname].get('set', False):
- return 1
+def is_known_rule(profile, rule_type, rule_obj):
+ # XXX get rid of get() checks after we have a proper function to initialize a profile
+ if profile.get(rule_type, False):
+ if profile[rule_type].is_covered(rule_obj, False):
+ return True
for incname in profile['include'].keys():
- if include[incname][incname]['deny']['capability'][capname].get('set', False):
- return -1
- if include[incname][incname]['allow']['capability'][capname].get('set', False):
- return 1
+ if include[incname][incname].get(rule_type, False):
+ if include[incname][incname][rule_type].is_covered(rule_obj, False):
+ return True
- return 0
+ return False
def profile_known_network(profile, family, sock_type):
if netrules_access_check(profile['deny']['netdomain'], family, sock_type):
Index: b/utils/apparmor/cleanprofile.py
===================================================================
--- a/utils/apparmor/cleanprofile.py
+++ b/utils/apparmor/cleanprofile.py
@@ -65,8 +65,8 @@ class CleanProf(object):
deleted += apparmor.aa.delete_duplicates(self.other.aa[program][hat], inc)
#Clean the duplicates of caps in other profile
- deleted += delete_cap_duplicates(self.profile.aa[program][hat]['allow']['capability'], self.other.aa[program][hat]['allow']['capability'], self.same_file)
- deleted += delete_cap_duplicates(self.profile.aa[program][hat]['deny']['capability'], self.other.aa[program][hat]['deny']['capability'], self.same_file)
+ if self.same_file:
+ deleted += self.other.aa[program][hat]['capability'].delete_duplicates(self.profile.aa[program][hat]['capability'])
#Clean the duplicates of path in other profile
deleted += delete_path_duplicates(self.profile.aa[program][hat], self.other.aa[program][hat], 'allow', self.same_file)
@@ -108,17 +108,6 @@ def delete_path_duplicates(profile, prof
return len(deleted)
-def delete_cap_duplicates(profilecaps, profilecaps_other, same_profile=True):
- deleted = []
- if profilecaps and profilecaps_other and not same_profile:
- for capname in profilecaps.keys():
- if profilecaps_other[capname].get('set', False):
- deleted.append(capname)
- for capname in deleted:
- profilecaps_other.pop(capname)
-
- return len(deleted)
-
def delete_net_duplicates(netrules, netrules_other, same_profile=True):
deleted = 0
hasher_obj = apparmor.aa.hasher()
More information about the AppArmor
mailing list