Ended up reworking how most of the import code worked to be more inline
with the changes that hvae happened upstream. It currently only works
with a patch that Maros Barbaros emailed me as a stop gap while he plans
on reimplementing the python bindings.
---
src/bin/secstate | 4 +-
src/secstate/main.py | 154 +++++++++++++++++++++++---------------------------
src/secstate/util.py | 66 +++++++++++++---------
3 files changed, 111 insertions(+), 113 deletions(-)
diff --git a/src/bin/secstate b/src/bin/secstate
index 7e82901..bc9666d 100644
--- a/src/bin/secstate
+++ b/src/bin/secstate
@@ -119,8 +119,8 @@ def import_content(arguments):
(options, args) = parser.parse_args(arguments)
for arg in args:
- (benchmark, def_model) = sec_instance.import_content(arg, options.cpe,
options.puppet, save=True, active_profile=options.profile)
- if (benchmark == None) and (def_model == None):
+ content = sec_instance.import_content(arg, options.cpe, options.puppet,
save=True, active_profile=options.profile)
+ if content == None:
return -1
def export(arguments):
diff --git a/src/secstate/main.py b/src/secstate/main.py
index 89d7147..108af11 100644
--- a/src/secstate/main.py
+++ b/src/secstate/main.py
@@ -152,15 +152,15 @@ class Secstate:
return True
- def import_oval(self, oval_file, store_path):
+ def import_oval(self, oval_file, store_path=None):
def_model = oscap.oval.definition_model_import(oval_file)
if def_model == None:
self.log.error("Error importing OVAL content: %('file')s" %
{'file':oval_file})
- return (None, None)
+ return None
if not def_model.is_valid():
self.log.error("Definition model is invalid")
- return (None, None)
+ return None
oval_id = os.path.splitext(os.path.basename(oval_file))[0]
@@ -169,9 +169,10 @@ class Secstate:
config.optionxform = str
if config.read(self.content_configs[oval_id]) == []:
self.log.error("Error loading config file: %(file)s" %
{'file':self.content_configs[oval_id]})
- return (None, None)
+ return None
def_model.__dict__['config'] = config
+ def_model.__dict__['id'] = oval_id
if store_path:
if not os.path.isdir(store_path):
@@ -179,7 +180,7 @@ class Secstate:
os.makedirs(store_path)
except IOError, e:
self.log.error("Could not create benchmark directory:
%(dir)s" % {'dir':bench_dir})
- return (None, None)
+ return None
shutil.copy(oval_file, store_path)
config = ConfigParser.ConfigParser()
@@ -191,9 +192,9 @@ class Secstate:
conf_file = open(os.path.join(self.config.get('secstate',
'conf_dir'), id + ".cfg"), 'w')
config.write(conf_file)
conf_file.close()
- return (None, None)
+ return None
- return (None, def_model)
+ return def_model
def import_benchmark(self, benchmark_file, oval_path="", store_path=None,
changes=False, active_profile=NONE_PROFILE):
@@ -211,29 +212,19 @@ class Secstate:
os.makedirs(bench_dir)
except IOError, e:
self.log.error("Could not create benchmark directory: %(dir)s"
% {'dir':bench_dir})
- return (None, None)
+ return None
benchmark = oscap.xccdf.benchmark_import(benchmark_file)
if benchmark == None:
self.log.error("Error importing benchmark %(file)s" %
{'file':benchmark_file})
- return (None, None)
+ return None
- def_model = oscap.oval.definition_model_new()
+ benchmark.__dict__['oval'] = {}
oval_files = xccdf_get_refs(benchmark)
for oval in list(set(oval_files)):
oval_file = os.path.join(oval_path, oval)
- tmp = oscap.oval.definition_model_import(oval_file)
- if tmp == None:
- self.log.error('Error importing oval definition: %(path)s' %
{'path':oval_file})
- return (None, None)
-
- if not self.combine_def_models(def_model, tmp):
- self.log.error('Error combining definition models')
- return (None, None)
-
- if not def_model.is_valid():
- self.log.error("Definition model is invalid")
- return (None, None)
+ def_model = self.import_oval(oval_file)
+ benchmark.__dict__['oval'][oval] = def_model
profile = oscap.xccdf.profile_new()
profile.id = NONE_PROFILE
@@ -244,7 +235,7 @@ class Secstate:
if self.content_configs.has_key(benchmark.id):
if config.read(self.content_configs[benchmark.id]) == []:
self.log.error("Error opening config file: %(file)s" %
{'file':self.content_config[benchmark.id]})
- return (None, None)
+ return None
else:
config.add_section(benchmark.id)
config.set(benchmark.id, 'profile', active_profile)
@@ -269,7 +260,7 @@ class Secstate:
directory = os.path.join(bench_dir, id)
if os.path.isdir(directory):
self.log.error("A benchmark named %(id)s already exists:
%(dir)s" % {'id':id, 'dir':directory})
- return (None, None)
+ return None
try:
os.mkdir(directory)
@@ -285,9 +276,9 @@ class Secstate:
except (IOError, OSError), e:
self.log.error("Error importing content: %(error)s" %
{'error':e})
shutil.rmtree(directory)
- return (None, None)
+ return None
- return (benchmark, def_model)
+ return benchmark
def import_zipped_content(self, zip, type, store_path, puppet, changes=False,
active_profile=NONE_PROFILE):
"""
@@ -309,10 +300,10 @@ class Secstate:
else:
self.log.error("Unsupported encoding: %(content)s -
%(type)s" % {'content':zip,
'type':filetype[1]})
- return (None, None)
+ return None
except IOError,e:
self.log.error("Error opening tarfile: %(file)s" %
{'file':zip})
- return (None, None)
+ return None
for member in tar_file.getmembers():
tar_file.extract(member, extract_path)
@@ -322,7 +313,7 @@ class Secstate:
elif type[0] == "application/zip":
if sys.version_info < (2, 6):
self.log.error("Zip file support requires Python >= 2.6")
- return (None, None)
+ return None
zip_files = zipfile.ZipFile(zip, 'r')
zip_files.extractall(extract_path)
@@ -331,7 +322,7 @@ class Secstate:
else:
self.log.error("Unsupported file type: %(content)s" %
{'content':zip})
- return (None, None)
+ return None
# Now that the files have been extracted, find the benchmark and store the
content if necessary
xccdf = None
@@ -343,15 +334,15 @@ class Secstate:
if xccdf == None:
self.log.error("Could not find XCCDF benchmark in archive
%(file)s", {'file':zip})
- return (None, None)
+ return None
- (benchmark, def_model) = self.import_benchmark(os.path.join(extract_path, xccdf),
extract_path, store_path, changes, active_profile=active_profile)
+ benchmark = self.import_benchmark(os.path.join(extract_path, xccdf),
extract_path, store_path, changes, active_profile=active_profile)
if benchmark == None:
- return (None, None)
+ return None
# Delete temporary directory now that we are done with it
shutil.rmtree(extract_path)
- return (benchmark, def_model)
+ return benchmark
def import_content(self, content, cpe=False, puppet=False, changes=True, save=False,
active_profile=NONE_PROFILE):
@@ -371,7 +362,7 @@ class Secstate:
os.makedirs(conf_dir)
except IOError, e:
self.log.error("Could not create directory: %(dir)s" %
{'dir':conf_dir})
- return (None, None)
+ return None
if self.content.has_key(content):
return self.import_content(self.content[content], cpe, puppet, changes,
active_profile=active_profile)
@@ -412,7 +403,7 @@ class Secstate:
benchmark_file = self.content[benchmark_id]
else:
benchmark_file = tempfile.mktemp()
- (benchmark, oval) = self.import_content(benchmark_id)
+ benchmark = self.import_content(benchmark_id)
if benchmark.export(benchmark_file) == None:
self.log.error("Error exporting benchmark to %(file)s" %
{'file':new_file})
return False
@@ -462,14 +453,14 @@ class Secstate:
self.log.error("No benchmark %(id)s in datastore" %
{'id':benchmark_id})
return False
- (benchmark, oval) = self.import_content(benchmark_id)
+ benchmark = self.import_content(benchmark_id)
if benchmark == None:
- if oval == None:
- self.log.error("Error opening benchmark: %(file)s" %
{'file':benchmark_id})
- return False
- else:
- oval.config.set(benchmark_id, 'selected', selected)
- self.log.debug("Set Oval file %(file)s to %(sel)s" %
{'file':benchmark_id,
+ self.log.error("Error opening benchmark: %(file)s" %
{'file':benchmark_id})
+ return False
+
+ if not benchmark.__dict__.has_key('oval'):
+ oval.config.set(benchmark_id, 'selected', selected)
+ self.log.debug("Set Oval file %(file)s to %(sel)s" %
{'file':benchmark_id,
'sel':selected})
else:
@@ -570,53 +561,48 @@ class Secstate:
args = self.content.keys()
for arg in args:
- (benchmark, def_model) = self.import_content(arg)
- if (benchmark == None) and (def_model == None):
+ scanned_content = self.import_content(arg)
+ if scanned_content == None:
self.log.error("Error importing content: %(bench)s" %
{'bench':arg})
return False
else:
if self.content.has_key(arg):
- if benchmark == None:
- scanned_content = def_model
- else:
- scanned_content = benchmark
if not all and (not scanned_content.config.getboolean(arg,
'selected')) and (len(args) > 1):
print "Skipping %(id)s" % {'id':arg}
ret = True
continue
if interpreter == "openscap":
- sess = oscap.oval.agent_new_session(def_model)
-
- if benchmark != None:
- # Set profile to default found in benchmark.config.file
- if (profile == None) and
(benchmark.__dict__.has_key('config')):
- if benchmark.config.has_option(arg, 'profile'):
- profile = benchmark.config.get(arg, 'profile')
+ if scanned_content.__dict__.has_key('oval'):
+ # Set profile to default found in scanned_content.config.file
+ if (profile == None) and
(scanned_content.__dict__.has_key('config')):
+ if scanned_content.config.has_option(arg,
'profile'):
+ profile = scanned_content.config.get(arg,
'profile')
else:
profile = NONE_PROFILE
if profile!= None:
- if benchmark.get_item(profile) == None:
+ if scanned_content.get_item(profile) == None:
self.log.error("Profile '%(prof)s' does not
exist." % {'prof':profile})
return False
- (res_benchmark, res_model) = evaluate_xccdf(benchmark,
benchmark.id, sess, s_profile=profile, verbose=verbose)
+ (res_benchmark, res_models) = evaluate_xccdf(scanned_content,
scanned_content.id, s_profile=profile, verbose=verbose)
- elif def_model != None:
- (res_benchmark, res_model) = evaluate_oval(sess, verbose)
+ else:
+ sess = oscap.oval.agent_new_session(scanned_content,
scanned_content.id)
+ (res_benchmark, res_models) = evaluate_oval(sess, verbose)
if (res_benchmark == None) and (res_model == None):
self.log.error("Error auditing %(arg)s" %
{'arg':arg})
return False
if xml:
- export_results(xml, res_benchmark.id, res_benchmark, res_model)
+ export_results(xml, res_benchmark.id, res_benchmark, res_models)
if html:
xccdf_ss = self.config.get('secstate',
'xccdf_stylesheet')
oval_ss = self.config.get('secstate',
'oval_stylesheet')
- export_results(tempfile.mkdtemp(), res_benchmark.id,
res_benchmark, res_model, xccdf_ss, oval_ss, html_dir=html)
+ export_results(tempfile.mkdtemp(), res_benchmark.id,
res_benchmark, res_models, xccdf_ss, oval_ss, html_dir=html)
return True
@@ -628,7 +614,7 @@ class Secstate:
Side Effects: Prints out the results of the search
"""
for key in self.content:
- (benchmark, def_model) = self.import_content(key)
+ content = self.import_content(key)
if benchmark == None:
if def_model == None:
self.log.error("Error importing content: %(key)s" %
{'key':key})
@@ -669,7 +655,7 @@ class Secstate:
def show(self, item_id=None, verbose=False):
for key in self.content:
- (benchmark, def_model) = self.import_content(key)
+ content = self.import_content(key)
if benchmark == None:
if def_model == None:
self.log.error("Error importing content: %(file)s" %
{'file':key})
@@ -744,41 +730,42 @@ class Secstate:
return True
- def sublist(self, benchmark, def_model, arg, recurse, show_all, tabs=0):
+ def sublist(self, content, arg, recurse, show_all, tabs=0):
tabstr = "\t" * tabs
selected = ""
profile = ""
- if benchmark == None:
+ if not content.__dict__.has_key('oval'):
if self.content.has_key(arg):
- if def_model.config.getboolean(arg, 'selected'):
+ if content.config.getboolean(arg, 'selected'):
if show_all:
selected = "[X]"
else:
selected = "[ ]"
print "%(indent)s%(sel)sOVAL File - ID: %(id)s" %
{'indent':tabstr, 'sel':selected, 'id':arg}
else:
- defn = def_model.get_definition(arg)
+ defn = content.get_definition(arg)
if defn != None:
print "%(indent)sDefinition - ID: %(id)s, Title:
'%(title)s'" % {'indent':tabstr, 'id':arg,
'title':defn.title}
else:
is_selected = False
item = None
- if arg == benchmark.id:
- item = benchmark.to_item()
- is_selected = benchmark.config.getboolean(arg, 'selected')
- profile_name = benchmark.config.get(arg, 'profile')
+ if arg == content.id:
+ item = content.to_item()
+ is_selected = content.config.getboolean(arg, 'selected')
+ profile_name = content.config.get(arg, 'profile')
if profile_name != NONE_PROFILE:
profile_name = "'%s'" % profile_name
profile = ", Profile: %s" % profile_name
else:
- item = benchmark.get_item(arg)
+ item = content.get_item(arg)
if item == None:
- return self.sublist(None, def_model, arg, recurse, show_all, tabs)
+ for oval_file,def_model in content.oval.items():
+ return self.sublist(def_model, arg, recurse, show_all, tabs)
else:
- is_selected = benchmark.selections[item.id]
+ is_selected = content.selections[item.id]
for title in item.title:
if show_all:
@@ -800,24 +787,23 @@ class Secstate:
type = item.type
if (type == oscap.xccdf.XCCDF_GROUP) or (type ==
oscap.xccdf.XCCDF_BENCHMARK):
for sub in item.content:
- self.sublist(benchmark, def_model, sub.id, recurse, show_all,
tabs+1)
+ self.sublist(content, sub.id, recurse, show_all, tabs+1)
def list_content(self, arg=None, recurse=False, show_all=False):
ret = False
for key in self.content:
- (benchmark, def_model) = self.import_content(key)
- if benchmark == None:
- if def_model == None:
- self.log.error("Error loading benchmark: %(id)s" %
{'id':key})
- return False
+ content = self.import_content(key)
+ if content == None:
+ self.log.error("Error loading benchmark: %(id)s" %
{'id':key})
+ return False
if (arg == None) or (arg == key):
- ret = self.sublist(benchmark, def_model, key, recurse, show_all)
+ ret = self.sublist(content, key, recurse, show_all)
else:
if not self.content.has_key(arg):
- ret = self.sublist(benchmark, def_model, arg, recurse, show_all)
+ ret = self.sublist(content, arg, recurse, show_all)
return ret
@@ -848,7 +834,7 @@ class Secstate:
passing_ids = self.get_passed_result_ids(xccdf_results)
template = '%s\n'
if self.content.has_key(bench_id):
- (benchmark, tmp_model) = self.import_content(bench_id)
+ benchmark = self.import_content(bench_id)
if not benchmark:
self.log.error("Benchmark was None")
return False
diff --git a/src/secstate/util.py b/src/secstate/util.py
index eb3ca7d..4e6476b 100644
--- a/src/secstate/util.py
+++ b/src/secstate/util.py
@@ -53,7 +53,7 @@ def load_config(conf_file):
return config
def xccdf_reporter(msg, usr):
- result = oscap.common.reporter_message_get_user2num(msg)
+ result = msg.user2num
if result == oscap.xccdf.XCCDF_RESULT_PASS:
usr['pass'] += 1
elif result == oscap.xccdf.XCCDF_RESULT_FAIL:
@@ -72,11 +72,11 @@ def xccdf_reporter(msg, usr):
usr['fixed'] += 1
if usr['verbose']:
- print "Rule '%(id)s' result: %(res)s" %
{'id':oscap.common.reporter_message_get_user1str(msg),
+ print "Rule '%(id)s' result: %(res)s" %
{'id':msg.user1str,
'res':oscap.xccdf.test_result_type_get_text(result)}
return 0
-def evaluate_xccdf(benchmark, url_XCCDF, sess, s_profile=None, all=False,
verbose=False):
+def evaluate_xccdf(benchmark, url_XCCDF, s_profile=None, all=False, verbose=False):
policy = None
policy_model = oscap.xccdf.policy_model_new(benchmark)
@@ -102,8 +102,13 @@ def evaluate_xccdf(benchmark, url_XCCDF, sess, s_profile=None,
all=False, verbos
'fixed':0,
'verbose':verbose}
- policy_model.register_output_callback_py(xccdf_reporter, res_dict)
- policy_model.register_engine_oval(sess)
+ policy_model.register_output_callback(xccdf_reporter, res_dict)
+
+ sessions = []
+ for oval_file,def_model in benchmark.oval.items():
+ tmp_sess = oscap.oval.agent.new_session(def_model, oval_file)
+ policy_model.register_engine_oval(tmp_sess)
+ sessions.append(tmp_sess)
ritem = policy.evaluate()
ritem.benchmark_uri = url_XCCDF
@@ -114,7 +119,9 @@ def evaluate_xccdf(benchmark, url_XCCDF, sess, s_profile=None,
all=False, verbos
if policy != None:
if policy.profile != None:
ritem.set_profile(policy.profile.id)
- oscap.oval.agent_export_sysinfo_to_xccdf_result(sess, ritem)
+
+ for sess in sessions:
+ oscap.oval.agent_export_sysinfo_to_xccdf_result(sess, ritem)
for model in benchmark.models:
score = policy.get_score(ritem, model.system)
@@ -135,13 +142,13 @@ def evaluate_xccdf(benchmark, url_XCCDF, sess, s_profile=None,
all=False, verbos
results_benchmark = benchmark.clone()
results_benchmark.add_result(oscap.xccdf.result_clone(ritem))
- res_model = oscap.oval.agent_get_results_model(sess)
+ results_benchmark.__dict__['oval'] = benchmark.oval
+ res_models = dict(zip(benchmark.oval.keys(), sessions))
- policy_model.free()
- return (results_benchmark, res_model)
+ return (results_benchmark, res_models)
def oval_reporter(msg, usr):
- result = oscap.common.reporter_message_get_user2num(msg)
+ result = msg.user2num
if result == oscap.oval.OVAL_RESULT_TRUE:
usr['true'] += 1
elif result == oscap.oval.OVAL_RESULT_FALSE:
@@ -156,7 +163,7 @@ def oval_reporter(msg, usr):
usr['na'] += 1
if usr['verbose']:
- print "Definintion '%(id)s' result: %(res)s" %
{'id':oscap.common.reporter_message_get_user1str(msg),
+ print "Definintion '%(id)s' result: %(res)s" %
{'id':msg.user1str,
'res':oscap.oval.result_get_text(result)}
return 0
@@ -171,7 +178,7 @@ def evaluate_oval(sess, verbose=True):
'na':0,
'verbose':verbose}
- ret = oscap.oval.agent_eval_system_py(sess, oval_reporter, usr)
+ ret = oscap.oval.agent_eval_system(sess, oval_reporter, usr)
if verbose:
print "Evaluation Completed"
@@ -191,9 +198,9 @@ def evaluate_oval(sess, verbose=True):
"Not Applicable:\t%(na)s\n" % usr
res_model = oscap.oval.agent_get_results_model(sess)
- return (None, res_model)
+ return (None, {None:res_model})
-def export_results(results_dir, id, benchmark=None, res_model=None, xccdf_ss=None,
oval_ss=None, html_dir=None):
+def export_results(results_dir, id, benchmark=None, res_models=None, xccdf_ss=None,
oval_ss=None, html_dir=None):
unique = "audit-%(hostname)s-%(date)s" %
{'hostname':os.uname()[1],
'date':time.strftime("%a-%B-%d-%H:%M:%S-%Y")}
if not os.path.isdir(os.path.join(results_dir, unique)):
@@ -216,19 +223,24 @@ def export_results(results_dir, id, benchmark=None, res_model=None,
xccdf_ss=Non
if xccdf_ss != None:
result_to_html(xccdf_xml, xccdf_ss, os.path.join(html_dir, unique, id +
".xccdf.html"))
- if res_model != None:
- oval_xml= os.path.join(results_dir, unique, id + ".oval.xml")
- res_direct = oscap.oval.result_directives_new(res_model)
- res_direct.set_reported(oscap.oval.OVAL_RESULT_TRUE |
- oscap.oval.OVAL_RESULT_FALSE |
- oscap.oval.OVAL_RESULT_UNKNOWN |
- oscap.oval.OVAL_RESULT_ERROR |
- oscap.oval.OVAL_RESULT_NOT_EVALUATED |
- oscap.oval.OVAL_RESULT_NOT_APPLICABLE, True)
- res_direct.set_content(oscap.oval.OVAL_RESULT_FALSE,
oscap.oval.OVAL_DIRECTIVE_CONTENT_FULL)
- res_direct.set_content(oscap.oval.OVAL_RESULT_TRUE,
oscap.oval.OVAL_DIRECTIVE_CONTENT_FULL)
-
- res_model.export(res_direct, oval_xml)
+ if res_models != None:
+ for oval_file,session in res_models.items():
+ res_model = oscap.oval.agent.results_model(session)
+ if benchmark != None:
+ id = os.path.splitext(oval_file)[0]
+
+ oval_xml= os.path.join(results_dir, unique, id + ".oval.xml")
+ res_direct = oscap.oval.result_directives(res_model)
+ res_direct.set_reported(oscap.oval.OVAL_RESULT_TRUE |
+ oscap.oval.OVAL_RESULT_FALSE |
+ oscap.oval.OVAL_RESULT_UNKNOWN |
+ oscap.oval.OVAL_RESULT_ERROR |
+ oscap.oval.OVAL_RESULT_NOT_EVALUATED |
+ oscap.oval.OVAL_RESULT_NOT_APPLICABLE, True)
+ res_direct.set_content(oscap.oval.OVAL_RESULT_FALSE,
oscap.oval.OVAL_DIRECTIVE_CONTENT_FULL)
+ res_direct.set_content(oscap.oval.OVAL_RESULT_TRUE,
oscap.oval.OVAL_DIRECTIVE_CONTENT_FULL)
+
+ res_model.export(res_direct, oval_xml)
# Create html results
if oval_ss != None:
--
1.7.2.1