[PATCH] Addes support for exporting audit results
by Josh Adams
Users can now specify what directory to put xml/html results. Also
fixed a few errors with the OVAL commit.
---
src/bin/secstate | 8 ++-
src/etc/secstate.conf | 3 +-
src/secstate/main.py | 77 ++++++++++++++++++++++++++++--------
src/secstate/util.py | 104 +++++++++++++++++++++++++++---------------------
4 files changed, 124 insertions(+), 68 deletions(-)
diff --git a/src/bin/secstate b/src/bin/secstate
index 3dce12c..a7bf097 100644
--- a/src/bin/secstate
+++ b/src/bin/secstate
@@ -153,14 +153,16 @@ def audit(arguments):
help="Specifies the interpreter to use when auditing the system")
parser.add_option('-p', '--profile', action='store', type='string', dest='profile',
default=None, help="Specifies the profile to use when auditing the system")
- parser.add_option('-r', '--results', action='store', dest='results', default=False,
- help="Export results to the specified file")
+ parser.add_option('--xml', action='store', type='string', dest='xml',
+ default=None, help="Specifies the directory to write XML results to (directory will be crated if non-existant)")
+ parser.add_option('--html', action='store', type='string', dest='html',
+ default=None, help="Specifies the directory to write HTML results to (directory will be crated if non-existant)")
parser.add_option('-v', '--verbose', action='store_true', dest='verbose', default=False,
help="Prints out extra information during the audit process")
parser.add_option('-a', '--all', action='store_true', dest='all', default=False,
help="Audit everything regardless of selection status")
(options, args) = parser.parse_args(arguments)
- if (not (sec_instance.audit(options.interpreter, args, all=options.all, verbose=options.verbose, profile=options.profile, f_results=options.results))):
+ if (not (sec_instance.audit(options.interpreter, args, all=options.all, verbose=options.verbose, profile=options.profile, xml=options.xml, html=options.html))):
return -1
def remediate(arguments):
diff --git a/src/etc/secstate.conf b/src/etc/secstate.conf
index 61f4249..911c04a 100644
--- a/src/etc/secstate.conf
+++ b/src/etc/secstate.conf
@@ -6,7 +6,8 @@ conf_dir=/var/lib/secstate/configs
oval_schema_dir=/usr/share/ovaldi
oval_interpreter=openscap
agressivness=1
-results_stylesheet=/etc/secstate/results_to_html.xsl
+oval_stylesheet=/etc/secstate/results_to_html.xsl
+xccdf_stylesheet=/etc/secstate/results_to_html.xsl
[logging]
debugging=0
diff --git a/src/secstate/main.py b/src/secstate/main.py
index bb6e6d6..edca0f1 100644
--- a/src/secstate/main.py
+++ b/src/secstate/main.py
@@ -173,13 +173,20 @@ class Secstate:
return (None, None)
if store_path:
+ if not os.path.isdir(store_path):
+ try:
+ os.makedirs(store_path)
+ except IOError, e:
+ self.log.error("Could not create benchmark directory: %(dir)s" % {'dir':bench_dir})
+ return (None, None)
+
shutil.copy(oval_file, store_path)
config = ConfigParser.ConfigParser()
config.optionxform = str
id = os.path.splitext(os.path.basename(oval_file))[0]
config.add_section(id)
config.set(id, 'selected', True)
- config.set(id, 'file', os.path.join(store_path, oval_file))
+ config.set(id, 'file', os.path.join(store_path, os.path.basename(oval_file)))
conf_file = open(os.path.join(self.config.get('secstate', 'conf_dir'), id + ".cfg"), 'w')
config.write(conf_file)
conf_file.close()
@@ -331,6 +338,15 @@ class Secstate:
xccdf = False
oval = False
store_path = None
+ conf_dir = self.config.get('secstate', 'conf_dir')
+
+ if save:
+ if not os.path.isdir(conf_dir):
+ try:
+ os.makedirs(conf_dir)
+ except IOError, e:
+ self.log.error("Could not create directory: %(dir)s" % {'dir':conf_dir})
+ return (None, None)
if self.content.has_key(content):
(benchmark, oval) = self.import_content(os.path.join(self.benchmark_dir, content, self.content[content]), save=False)
@@ -358,7 +374,7 @@ class Secstate:
return self.import_benchmark(content, store_path=store_path, oval_path=os.path.dirname(content))
else:
- return self.import_zipped_content(content, file_type, puppet)
+ return self.import_zipped_content(content, file_type, store_path=self.config.get('secstate', 'benchmark_dir'), puppet=puppet)
def export(self, benchmark_id, new_file, original=False):
if not self.content.has_key(benchmark_id):
@@ -389,20 +405,23 @@ class Secstate:
return True
def remove_content(self, benchmark_id):
-
if benchmark_id == 'all':
for key in self.content:
- try:
- shutil.rmtree(os.path.join(self.benchmark_dir, key))
- except IOError,e:
- self.log.error("Error removing content: %(error)s" % {'error':e})
- return False
-
+ self.remove_content(key)
+
elif self.content.has_key(benchmark_id):
+ cfg = ConfigParser.ConfigParser()
+ conf_file = self.content_configs[benchmark_id]
+ fp = open(conf_file)
+ cfg.readfp(fp)
+ fp.close()
try:
- shutil.rmtree(os.path.join(self.benchmark_dir, benchmark_id))
- os.remove(os.path.join(self.config.get('secstate', 'conf_dir'), benchmark_id + ".cfg"))
- except (IOError, OSError), e:
+ if os.path.split(cfg.get(benchmark_id, "file"))[0] != self.config.get('secstate', 'oval_dir'):
+ shutil.rmtree(os.path.split(cfg.get(benchmark_id, "file"))[0])
+ else:
+ os.remove(cfg.get(benchmark_id, "file"))
+ os.remove(conf_file)
+ except IOError,e:
self.log.error("Error removing content: %(error)s" % {'error':e})
return False
@@ -491,7 +510,7 @@ class Secstate:
oscap.xccdf_benchmark_free(benchmark)
return True
- def audit(self, interpreter, args, profile=None, verbose=False, all=False, f_results=None):
+ def audit(self, interpreter, args, profile=None, verbose=False, all=False, xml=None, html=None):
"""
Function: Run an audit on the system agains the given definition model
Input: Interpreter to use, args for interpreter, schema to use, specific definition or template
@@ -500,8 +519,13 @@ class Secstate:
"""
def_model = None
benchmark = None
+ res_model = None
+ res_benchmark = None
config = None
+ if args == []:
+ args = self.content.keys()
+
for arg in args:
(benchmark, def_model) = self.import_content(arg)
if (benchmark == None) and (def_model == None):
@@ -531,16 +555,33 @@ class Secstate:
if profile!= None:
if oscap.xccdf_benchmark_get_item(benchmark, profile) == None:
self.log.error("Profile %(prof)s does not exist." % {'prof':profile})
+ oscap.xccdf_benchmark_free(benchmark)
+ oscap.oval_definition_model_free(def_model)
return False
- ret = (evaluate_xccdf(benchmark, def_model, arg, sess, s_profile=profile, verbose=verbose) == 0)
+ (res_benchmark, res_model) = evaluate_xccdf(benchmark, arg, sess, s_profile=profile, verbose=verbose)
elif def_model != None:
- ret = (evaluate_oval(sess, f_results, verbose) == 0)
+ (res_benchmark, res_model) = evaluate_oval(sess, verbose)
- if ret == False:
- self.log.error("Error auditing %(arg)s" % {'arg':arg})
- return ret
+ if (res_benchmark == None) and (res_model == None):
+ self.log.error("Error auditing %(arg)s" % {'arg':arg})
+ oscap.oval_agent_destroy_session(sess)
+ return False
+
+ if xml:
+ export_results(xml, arg, res_benchmark, res_model)
+ if html:
+ xccdf_ss = self.config.get('secstate', 'xccdf_stylesheet')
+ oval_ss = self.config.get('secstate', 'oval_stylesheet')
+ export_results(tempfile.mkdtemp(), arg, res_benchmark, res_model, xccdf_ss, oval_ss, html_dir=html)
+
+ if res_benchmark != None:
+ oscap.xccdf_benchmark_free(res_benchmark)
+
+ oscap.oval_agent_destroy_session(sess)
+
+ return True
def search(self, search_string, verbose=False):
"""
diff --git a/src/secstate/util.py b/src/secstate/util.py
index aaec22a..8e16326 100644
--- a/src/secstate/util.py
+++ b/src/secstate/util.py
@@ -27,6 +27,9 @@ import re
import ConfigParser
import json
import tempfile
+import zipfile
+import libxml2
+import libxslt
import openscap as oscap
@@ -183,7 +186,7 @@ def output_callback(id, result, usr):
print "Rule '%(id)s' result: %(res)s" % {'id':id, 'res':oscap.xccdf_test_result_type_get_text(result)}
return 0
-def evaluate_xccdf(benchmark, def_model, url_XCCDF, sess, s_profile=None, verbose=False):
+def evaluate_xccdf(benchmark, url_XCCDF, sess, s_profile=None, all=False, verbose=False):
policy = None
policy_model = oscap.xccdf_policy_model_new(benchmark)
@@ -249,13 +252,15 @@ def evaluate_xccdf(benchmark, def_model, url_XCCDF, sess, s_profile=None, verbos
"Not Applicable:\t%(na)s\n" \
"Error:\t\t%(err)s\n" \
"Informational:\t%(info)s\n" \
- "Unknown:\t%(unknown)s" % res_dict
-
- oscap.oval_agent_destroy_session(sess)
+ "Unknown:\t%(unknown)s\n" % res_dict
+
+ results_benchmark = oscap.xccdf_benchmark_clone(benchmark)
+ oscap.xccdf_benchmark_add_result(results_benchmark, oscap.xccdf_result_clone(ritem))
+ res_model = oscap.oval_agent_get_results_model(sess)
+
oscap.oval_agent_cb_data_free(usr)
- oscap.oval_definition_model_free(def_model)
oscap.xccdf_policy_model_free(policy_model)
- return 0
+ return (results_benchmark, res_model)
def oval_callback(id, result, usr):
if result == oscap.OVAL_RESULT_TRUE:
@@ -276,39 +281,7 @@ def oval_callback(id, result, usr):
return 0
-def output_oval_results(res_model, res_directives, xml, html):
- if xml:
- if not oscap.oval_results_model_export(res_model, res_directives, xml):
- self.log.error("Error exporting results to %(file)s" % {'file':xml})
- return False
-
- if html:
- tmp = tempfile.mktemp()
- if not oscap.oval_results_model_export(res_model, res_directives, tmp):
- self.log.error("Error exporting results to %(file)s" % {'file':html})
- return False
-
- import libxml2
- import libxslt
-
- styledoc = libxml2.parseFile(self.config.get('secstate', 'results_stylesheet'))
- style = libxslt.parseStylesheetDoc(styledoc)
- doc = libxml2.parseFile(tmp)
- result = style.applyStylesheet(doc, None)
- if not style.saveResultToFilename(html, result, 0):
- self.log.error("Error exporting results to %(file)s" % {'file':html})
- style.freeStylesheet()
- doc.freeDoc()
- result.freeDoc()
- return False
-
- style.freeStylesheet()
- doc.freeDoc()
- result.freeDoc()
-
- return True
-
-def evaluate_oval(sess, f_results, verbose=False):
+def evaluate_oval(sess, verbose=True):
usr = {'false':0,
'true':0,
@@ -320,7 +293,8 @@ def evaluate_oval(sess, f_results, verbose=False):
ret = oscap.oval_agent_eval_system_py(sess, oval_callback, usr)
- print "Evaluation Completed"
+ if verbose:
+ print "Evaluation Completed"
if ret == -1:
if oscap.oscap_err():
@@ -328,8 +302,6 @@ def evaluate_oval(sess, f_results, verbose=False):
'desc':oscap.oscap_err_desc()})
return None
- res_model = oscap.oval_agent_get_results_model(sess)
-
print "--Results--\n" \
"True:\t\t%(true)s\n" \
"False:\t\t%(false)s\n" \
@@ -338,7 +310,32 @@ def evaluate_oval(sess, f_results, verbose=False):
"Not Evaluated:\t%(neval)s\n" \
"Not Applicable:\t%(na)s\n" % usr
- if f_results:
+ res_model = oscap.oval_agent_get_results_model(sess)
+ return (None, res_model)
+
+def export_results(results_dir, id, benchmark=None, res_model=None, xccdf_ss=None, oval_ss=None, html_dir=None):
+ if not os.path.isdir(results_dir):
+ try:
+ os.makedirs(results_dir)
+ except IOError, e:
+ self.log.error("Could not create benchmark directory: %(dir)s" % {'dir':results_dir})
+
+ if html_dir != None:
+ if not os.path.isdir(html_dir):
+ try:
+ os.makedirs(html_dir)
+ except IOError, e:
+ self.log.error("Could not create benchmark directory: %(dir)s" % {'dir':html_dir})
+
+ if benchmark != None:
+ xccdf_xml = os.path.join(results_dir, id + ".xccdf.xml")
+ oscap.xccdf_benchmark_export(benchmark, xccdf_xml)
+
+ if xccdf_ss != None:
+ result_to_html(xccdf_xml, xccdf_ss, os.path.join(html_dir, id + ".xccdf.html"))
+
+ if res_model != None:
+ oval_xml= os.path.join(results_dir, id + ".oval.xml")
res_direct = oscap.oval_result_directives_new(res_model)
oscap.oval_result_directives_set_reported(res_direct, oscap.OVAL_RESULT_TRUE |
oscap.OVAL_RESULT_FALSE |
@@ -349,10 +346,25 @@ def evaluate_oval(sess, f_results, verbose=False):
oscap.oval_result_directives_set_content(res_direct, oscap.OVAL_RESULT_FALSE, oscap.OVAL_DIRECTIVE_CONTENT_FULL)
oscap.oval_result_directives_set_content(res_direct, oscap.OVAL_RESULT_TRUE, oscap.OVAL_DIRECTIVE_CONTENT_FULL)
- output_oval_results(res_model, res_direct, f_xml, f_html)
- oscap.oval_result_directives_free(res_direct)
+ oscap.oval_results_model_export(res_model, res_direct, oval_xml)
+
+ # Create html results
+ if oval_ss != None:
+ result_to_html(oval_xml, oval_ss, os.path.join(html_dir, id + ".oval.html"))
+
+ return True
- return res_model
+def result_to_html(input, stylesheet, output):
+ styledoc = libxml2.parseFile(stylesheet)
+ style = libxslt.parseStylesheetDoc(styledoc)
+ doc = libxml2.parseFile(input)
+ result = style.applyStylesheet(doc, None)
+ if not style.saveResultToFilename(output, result, 0):
+ sys.stderr.write("Error exporitng results to %(file)s" % {'file':output})
+
+ style.freeStylesheet()
+ doc.freeDoc()
+ result.freeDoc()
def is_benchmark(benchmark):
tree = xml.dom.minidom.parse(benchmark)
--
1.7.1.1
13 years, 10 months
[PATCH] Updated code to improve OVAL support
by Josh Adams
Updated many commands to work (better) with OVAL files. Import will now
import plain OVAL files to /var/lib/secstate/oval and OVAL files can be
selected/deselected.
Also fixed a couple issues with the audit command when not using a
profile.
---
src/bin/secstate | 6 +-
src/secstate/main.py | 289 +++++++++++++++++++++++++++++---------------------
src/secstate/util.py | 36 +++---
3 files changed, 191 insertions(+), 140 deletions(-)
diff --git a/src/bin/secstate b/src/bin/secstate
index e786fd7..3dce12c 100644
--- a/src/bin/secstate
+++ b/src/bin/secstate
@@ -111,8 +111,8 @@ def import_content(arguments):
(options, args) = parser.parse_args(arguments)
for arg in args:
- (benchmark, def_model) = sec_instance.import_content(arg, sec_instance.config.get('secstate', 'data_dir'), options.cpe, options.puppet)
- if (benchmark == None) or (def_model == None):
+ (benchmark, def_model) = sec_instance.import_content(arg, options.cpe, options.puppet)
+ if (benchmark == None) and (def_model == None):
return -1
oscap.oval_definition_model_free(def_model)
oscap.xccdf_benchmark_free(benchmark)
@@ -152,7 +152,7 @@ def audit(arguments):
default=sec_instance.config.get('secstate', 'oval_interpreter'),
help="Specifies the interpreter to use when auditing the system")
parser.add_option('-p', '--profile', action='store', type='string', dest='profile',
- default="Custom", help="Specifies the profile to use when auditing the system")
+ default=None, help="Specifies the profile to use when auditing the system")
parser.add_option('-r', '--results', action='store', dest='results', default=False,
help="Export results to the specified file")
parser.add_option('-v', '--verbose', action='store_true', dest='verbose', default=False,
diff --git a/src/secstate/main.py b/src/secstate/main.py
index 5642e8d..bb6e6d6 100644
--- a/src/secstate/main.py
+++ b/src/secstate/main.py
@@ -40,6 +40,7 @@ class Secstate:
def __init__(self, conf_file):
self.setConfigFile(conf_file)
self.content = self.get_content_dict()
+ self.content_configs = self.get_content_configs()
self.log = self.getLogger()
self.benchmark_dir = self.config.get('secstate', 'benchmark_dir')
@@ -92,6 +93,15 @@ class Secstate:
fp.close()
return content
+
+ def get_content_configs(self):
+ configs = {}
+ conf_dir = self.config.get('secstate', 'conf_dir')
+ for conf_file in os.listdir(conf_dir):
+ id = os.path.splitext(conf_file)[0]
+ configs[id] = os.path.join(conf_dir, conf_file)
+
+ return configs
def combine_def_models(self, target, source):
"""
@@ -155,24 +165,27 @@ class Secstate:
def_model = oscap.oval_definition_model_import(oval_file)
if def_model == None:
self.log.error("Error importing OVAL content: %('file')s" % {'file':oval_file})
- return None
+ return (None, None)
if not oscap.oval_definition_model_is_valid(def_model):
self.log.error("Definition model is invalid")
oscap.oval_definition_model_free(def_model)
- return None
+ return (None, None)
if store_path:
shutil.copy(oval_file, store_path)
config = ConfigParser.ConfigParser()
- config.add_section('selected')
- config.set('selected', 'selected', True)
- conf_file = open(self.config.get('secstate', 'conf_dir') + os.path.splitext(os.path.basename(oval_file)), 'w')
+ config.optionxform = str
+ id = os.path.splitext(os.path.basename(oval_file))[0]
+ config.add_section(id)
+ config.set(id, 'selected', True)
+ config.set(id, 'file', os.path.join(store_path, oval_file))
+ conf_file = open(os.path.join(self.config.get('secstate', 'conf_dir'), id + ".cfg"), 'w')
config.write(conf_file)
conf_file.close()
- return None
+ return (None, None)
- return def_model
+ return (None, def_model)
def import_benchmark(self, benchmark_file, oval_path="", store_path=None):
@@ -229,7 +242,7 @@ class Secstate:
shutil.copy(benchmark_file, directory)
config = ConfigParser.ConfigParser()
config.add_section(id)
- config.set(id, 'file', os.path.basename(benchmark_file))
+ config.set(id, 'file', os.path.join(directory, os.path.basename(benchmark_file)))
config.set(id, 'selected', True)
conf_file = open(os.path.join(self.config.get('secstate', 'conf_dir'), id + ".cfg"), 'w')
config.write(conf_file)
@@ -309,7 +322,7 @@ class Secstate:
return (benchmark, def_model)
- def import_content(self, content, store_path=None, cpe=False, puppet=False, changes=True):
+ def import_content(self, content, cpe=False, puppet=False, changes=True, save=True):
"""
Function: Validates XCCDF/OVAL content and optionally saves it to the data store
Input: File containing content
@@ -317,12 +330,13 @@ class Secstate:
"""
xccdf = False
oval = False
+ store_path = None
if self.content.has_key(content):
- (benchmark, oval) = self.import_benchmark(os.path.join(self.benchmark_dir, content, self.content[content]), oval_path=os.path.join(self.benchmark_dir, content))
+ (benchmark, oval) = self.import_content(os.path.join(self.benchmark_dir, content, self.content[content]), save=False)
if changes and (benchmark != None):
#benchmark = apply_changes(benchmark, os.path.join(self.benchmark_dir, content, str(content + ".cfg")))
- benchmark = apply_changes_profile(benchmark, os.path.join(self.config.get('secstate', 'conf_dir'), content + ".cfg"))
+ benchmark = apply_changes_profile(benchmark, self.content_configs[content])
return (benchmark, oval)
@@ -330,17 +344,21 @@ class Secstate:
if file_type[0] == "text/xml":
if is_benchmark(content):
xccdf = True
+ if save:
+ store_path = self.config.get('secstate', 'benchmark_dir')
else:
oval = True
+ if save:
+ store_path = self.config.get('secstate', 'oval_dir')
if oval:
- return self.import_oval(content, self.config.get('secstate', 'oval_dir'))
+ return self.import_oval(content, store_path)
if xccdf:
return self.import_benchmark(content, store_path=store_path, oval_path=os.path.dirname(content))
else:
- return self.import_zipped_content(content, file_type, store_path, puppet)
+ return self.import_zipped_content(content, file_type, puppet)
def export(self, benchmark_id, new_file, original=False):
if not self.content.has_key(benchmark_id):
@@ -403,7 +421,7 @@ class Secstate:
sel_dict = {'selected':selected, 'message':message}
bench_cfg = ConfigParser.ConfigParser()
bench_cfg.optionxform = str
- conf_path = os.path.join(self.config.get('secstate', 'conf_dir'), benchmark_id + ".cfg")
+ conf_path = self.content_configs[benchmark_id]
if os.path.isfile(conf_path):
try:
fp = open(conf_path)
@@ -421,45 +439,51 @@ class Secstate:
return False
(benchmark, oval) = self.import_content(benchmark_id)
- oscap.oval_definition_model_free(oval)
if benchmark == None:
- self.log.error("Error opening benchmark: %(file)s" % {'file':benchmark_id})
- return False
-
- if item_id == benchmark_id:
- bench_cfg.set(benchmark_id, 'selected', selected)
- self.log.debug("Setting %(id)s to %(val)s" % {'id':benchmark_id,
- 'val':selected})
- item = oscap.xccdf_benchmark_to_item(benchmark)
+ if oval == None:
+ self.log.error("Error opening benchmark: %(file)s" % {'file':benchmark_id})
+ return False
+ else:
+ bench_cfg.set(benchmark_id, 'selected', selected)
+ self.log.debug("Set Oval file %(file)s to %(sel)s" % {'file':benchmark_id,
+ 'sel':selected})
else:
- item = oscap.xccdf_benchmark_get_item(benchmark, item_id)
-
- if item == None:
- self.log.error("Benchmark %(bench_id)s does not contain %(item_id)s" % {'bench_id':benchmark_id,
- 'item_id':item_id})
- return False
+ oscap.oval_definition_model_free(oval)
- item_type = oscap.xccdf_item_get_type(item)
- if item_type == oscap.XCCDF_PROFILE:
- bench_cfg.set(benchmark_id, 'profile', item_id)
- self.log.debug("Setting active profile to %(id)s" % {'id':item_id})
- else:
- if bench_cfg.has_option(benchmark_id, 'profile'):
- active_profile = bench_cfg.get(benchmark_id, 'profile')
- if active_profile != "Custom":
- bench_cfg.set(benchmark_id, 'extends', active_profile)
+ if item_id == benchmark_id:
+ bench_cfg.set(benchmark_id, 'selected', selected)
+ self.log.debug("Setting %(id)s to %(val)s" % {'id':benchmark_id,
+ 'val':selected})
+ item = oscap.xccdf_benchmark_to_item(benchmark)
+ else:
+ item = oscap.xccdf_benchmark_get_item(benchmark, item_id)
- if item_type != oscap.XCCDF_BENCHMARK:
- bench_cfg.set('selections', item_id, json.dumps(sel_dict))
- self.log.debug("Setting %(id)s to %(val)s" % {'id':item_id,
- 'val':selected})
+ if item == None:
+ self.log.error("Benchmark %(bench_id)s does not contain %(item_id)s" % {'bench_id':benchmark_id,
+ 'item_id':item_id})
+ return False
- if recurse:
- if (item_type == oscap.XCCDF_GROUP) or (item_type == oscap.XCCDF_BENCHMARK):
- for sub in xccdf_get_items(benchmark, oscap.XCCDF_ITEM, oscap.xccdf_item_get_content(item)):
- bench_cfg.set('selections', oscap.xccdf_item_get_id(sub), json.dumps(sel_dict))
- self.log.debug("Setting %(id)s to %(val)s" % {'id':oscap.xccdf_item_get_id(sub),
- 'val':selected})
+ item_type = oscap.xccdf_item_get_type(item)
+ if item_type == oscap.XCCDF_PROFILE:
+ bench_cfg.set(benchmark_id, 'profile', item_id)
+ self.log.debug("Setting active profile to %(id)s" % {'id':item_id})
+ else:
+ if bench_cfg.has_option(benchmark_id, 'profile'):
+ active_profile = bench_cfg.get(benchmark_id, 'profile')
+ if active_profile != "Custom":
+ bench_cfg.set(benchmark_id, 'extends', active_profile)
+
+ if item_type != oscap.XCCDF_BENCHMARK:
+ bench_cfg.set('selections', item_id, json.dumps(sel_dict))
+ self.log.debug("Setting %(id)s to %(val)s" % {'id':item_id,
+ 'val':selected})
+
+ if recurse:
+ if (item_type == oscap.XCCDF_GROUP) or (item_type == oscap.XCCDF_BENCHMARK):
+ for sub in xccdf_get_items(benchmark, oscap.XCCDF_ITEM, oscap.xccdf_item_get_content(item)):
+ bench_cfg.set('selections', oscap.xccdf_item_get_id(sub), json.dumps(sel_dict))
+ self.log.debug("Setting %(id)s to %(val)s" % {'id':oscap.xccdf_item_get_id(sub),
+ 'val':selected})
fp = open(conf_path, 'w')
bench_cfg.write(fp)
@@ -476,32 +500,47 @@ class Secstate:
"""
def_model = None
benchmark = None
+ config = None
for arg in args:
(benchmark, def_model) = self.import_content(arg)
- if (benchmark == None) or (def_model == None):
+ if (benchmark == None) and (def_model == None):
self.log.error("Error importing benchmark: %(bench)s" % {'bench':arg})
return False
- if interpreter == "openscap":
- sess = oscap.oval_agent_new_session(def_model)
-
+ else:
if self.content.has_key(arg):
- if profile == None:
- config = ConfigParser.ConfigParser()
- fp = open(self.content[arg][1])
- config.readfp(fp)
- profile = config.get(arg, 'profile')
- fp.close()
+ config = ConfigParser.ConfigParser()
+ fp = open(self.content_configs[arg])
+ config.readfp(fp)
+ fp.close()
+ if not all and (not config.getboolean(arg, 'selected')) and (len(args) > 1):
+ print "Skipping %(id)s" % {'id':arg}
+ ret = True
+ continue
+
+ if interpreter == "openscap":
+ sess = oscap.oval_agent_new_session(def_model)
+
+ if benchmark != None:
+ # Set profile to default found in config file
+ if (profile == None) and (config != None):
+ if config.has_option(arg, 'profile'):
+ profile = config.get(arg, 'profile')
+
+ if profile!= None:
+ if oscap.xccdf_benchmark_get_item(benchmark, profile) == None:
+ self.log.error("Profile %(prof)s does not exist." % {'prof':profile})
+ return False
+
+ ret = (evaluate_xccdf(benchmark, def_model, arg, sess, s_profile=profile, verbose=verbose) == 0)
- if oscap.xccdf_benchmark_get_item(benchmark, profile) == None:
- self.log.error("Profile %(prof)s does not exist." % {'prof':profile})
- return False
- ret = (evaluate_xccdf(benchmark, def_model, arg, sess, s_profile=profile, verbose=verbose, all=all) == 0)
+ elif def_model != None:
+ ret = (evaluate_oval(sess, f_results, verbose) == 0)
- if ret == False:
- self.log.error("Error auditing %(arg)s" % {'arg':arg})
- return ret
+ if ret == False:
+ self.log.error("Error auditing %(arg)s" % {'arg':arg})
+ return ret
def search(self, search_string, verbose=False):
"""
@@ -621,63 +660,74 @@ class Secstate:
return True
def sublist(self, benchmark, bench_cfg, def_model, arg, recurse, show_all, tabs=0):
- benchmark_id = oscap.xccdf_benchmark_get_id(benchmark)
tabstr = "\t" * tabs
selected = ""
- is_selected = False
- item = None
- if arg == benchmark_id:
- item = oscap.xccdf_benchmark_to_item(benchmark)
- is_selected = bench_cfg.getboolean(arg, 'selected')
- else:
- item = oscap.xccdf_benchmark_get_item(benchmark, arg)
- if item == None:
- defn = oscap.oval_definition_model_get_definition(def_model, arg)
- if defn == None:
- self.log.error("No item '%(id)s'" % {'id':arg})
- return
- print "%(indent)sDefinition - ID: %(id)s, Title: '%(title)s'" % {'indent':tabstr, 'id':arg,
- 'title':oscap.oval_definition_get_title(defn)}
- is_selected = oscap.xccdf_item_get_selected(item)
-
- selects = {}
- if bench_cfg.has_option(benchmark_id, 'profile'):
- prof = oscap.xccdf_benchmark_get_item(benchmark, bench_cfg.get(benchmark_id, "profile"))
- if prof == None:
- self.log.error("Error loading profile %(prof)s" % {'prof':profile})
- return False
- prof = oscap.xccdf_item_to_profile(prof)
- select_it = oscap.xccdf_profile_get_selects(prof)
- for select in xccdf_select_generator(select_it):
- selects[oscap.xccdf_select_get_item(select)] = oscap.xccdf_select_get_selected(select)
-
- try:
- is_selected = selects[oscap.xccdf_item_get_id(item)]
- except KeyError, e:
- pass
-
- titles = oscap.xccdf_item_get_title(item)
- for title in oscap_text_generator(titles):
- if show_all:
- if is_selected:
- selected = "[X]"
+ if benchmark == None:
+ if self.content.has_key(arg):
+ if bench_cfg.getboolean(arg, 'selected'):
+ if show_all:
+ selected = "[X]"
else:
selected = "[ ]"
-
- if not is_selected:
- if not recurse or (tabs == 0):
- selected = "[ ]"
+ print "%(indent)s%(sel)sOVAL File - ID: %(id)s" % {'indent':tabstr, 'sel':selected, 'id':arg}
+ else:
+ defn = oscap.oval_definition_model_get_definition(def_model, arg)
+ if defn != None:
+ print "%(indent)sDefinition - ID: %(id)s, Title: '%(title)s'" % {'indent':tabstr, 'id':arg,
+ 'title':oscap.oval_definition_get_title(defn)}
+ else:
+ benchmark_id = oscap.xccdf_benchmark_get_id(benchmark)
+ is_selected = False
+ item = None
+ if arg == benchmark_id:
+ item = oscap.xccdf_benchmark_to_item(benchmark)
+ is_selected = bench_cfg.getboolean(arg, 'selected')
+ else:
+ item = oscap.xccdf_benchmark_get_item(benchmark, arg)
+ if item == None:
+ self.sublist(None, bench_cfg, def_model, arg, recurse, show_all, tabs)
- print "%(indent)s%(sel)s%(type)s - ID: %(id)s, Title: '%(title)s'" % {'indent':tabstr, 'sel':selected,
- 'type':item_get_type_str(item), 'id':arg,
- 'title':oscap.oscap_text_get_text(title)}
- if recurse:
- type = oscap.xccdf_item_get_type(item)
- if (type == oscap.XCCDF_GROUP) or (type == oscap.XCCDF_BENCHMARK):
- content = oscap.xccdf_item_get_content(item)
- for sub in xccdf_item_generator(content):
- self.sublist(benchmark, bench_cfg, def_model, oscap.xccdf_item_get_id(sub), recurse, show_all, tabs+1)
+ is_selected = oscap.xccdf_item_get_selected(item)
+
+ selects = {}
+ if bench_cfg.has_option(benchmark_id, 'profile'):
+ prof = oscap.xccdf_benchmark_get_item(benchmark, bench_cfg.get(benchmark_id, "profile"))
+ if prof == None:
+ self.log.error("Error loading profile %(prof)s" % {'prof':profile})
+ return False
+ prof = oscap.xccdf_item_to_profile(prof)
+
+ select_it = oscap.xccdf_profile_get_selects(prof)
+ for select in xccdf_select_generator(select_it):
+ selects[oscap.xccdf_select_get_item(select)] = oscap.xccdf_select_get_selected(select)
+
+ try:
+ is_selected = selects[oscap.xccdf_item_get_id(item)]
+ except KeyError, e:
+ pass
+
+ titles = oscap.xccdf_item_get_title(item)
+ for title in oscap_text_generator(titles):
+ if show_all:
+ if is_selected:
+ selected = "[X]"
+ else:
+ selected = "[ ]"
+
+ if not is_selected:
+ if not recurse or (tabs == 0):
+ selected = "[ ]"
+
+ print "%(indent)s%(sel)s%(type)s - ID: %(id)s, Title: '%(title)s'" % {'indent':tabstr, 'sel':selected,
+ 'type':item_get_type_str(item), 'id':arg,
+ 'title':oscap.oscap_text_get_text(title)}
+ if recurse:
+ type = oscap.xccdf_item_get_type(item)
+ if (type == oscap.XCCDF_GROUP) or (type == oscap.XCCDF_BENCHMARK):
+ content = oscap.xccdf_item_get_content(item)
+ for sub in xccdf_item_generator(content):
+ self.sublist(benchmark, bench_cfg, def_model, oscap.xccdf_item_get_id(sub), recurse, show_all, tabs+1)
def list_content(self, arg=None, recurse=False, show_all=False):
@@ -685,11 +735,12 @@ class Secstate:
for key in self.content:
(benchmark, def_model) = self.import_content(key)
if benchmark == None:
- self.log.error("Error loading benchmark: %(id)s" % {'id':key})
- return False
+ if def_model == None:
+ self.log.error("Error loading benchmark: %(id)s" % {'id':key})
+ return False
config = ConfigParser.ConfigParser()
- fp = open(os.path.join(self.config.get('secstate', 'conf_dir'), key + ".cfg"))
+ fp = open(self.content_configs[key])
config.readfp(fp)
fp.close()
diff --git a/src/secstate/util.py b/src/secstate/util.py
index d1a16df..aaec22a 100644
--- a/src/secstate/util.py
+++ b/src/secstate/util.py
@@ -26,6 +26,7 @@ import time
import re
import ConfigParser
import json
+import tempfile
import openscap as oscap
@@ -182,7 +183,7 @@ def output_callback(id, result, usr):
print "Rule '%(id)s' result: %(res)s" % {'id':id, 'res':oscap.xccdf_test_result_type_get_text(result)}
return 0
-def evaluate_xccdf(benchmark, def_model, url_XCCDF, sess, s_profile=None, all=False, verbose=False):
+def evaluate_xccdf(benchmark, def_model, url_XCCDF, sess, s_profile=None, verbose=False):
policy = None
policy_model = oscap.xccdf_policy_model_new(benchmark)
@@ -214,10 +215,7 @@ def evaluate_xccdf(benchmark, def_model, url_XCCDF, sess, s_profile=None, all=Fa
oscap.oval_agent_cb_data_set_callback_py(usr, None, None)
oscap.oval_agent_cb_data_set_usr(usr, policy_model)
- if all:
- callback_func = oscap.oval_agent_eval_rule_py
- else:
- callback_func = xccdf_callback
+ callback_func = oscap.oval_agent_eval_rule_py
oscap.xccdf_policy_model_register_output_callback_py(policy_model, output_callback, res_dict)
oscap.xccdf_policy_model_register_callback_py(policy_model, "http://oval.mitre.org/XMLSchema/oval-definitions-5",
@@ -261,15 +259,17 @@ def evaluate_xccdf(benchmark, def_model, url_XCCDF, sess, s_profile=None, all=Fa
def oval_callback(id, result, usr):
if result == oscap.OVAL_RESULT_TRUE:
- usr['True'] += 1
+ usr['true'] += 1
elif result == oscap.OVAL_RESULT_FALSE:
- usr['False'] += 1
+ usr['false'] += 1
+ elif result == oscap.OVAL_RESULT_ERROR:
+ usr['err'] += 1
elif result == oscap.OVAL_RESULT_UNKNOWN:
- usr['Unknown'] += 1
+ usr['unknown'] += 1
elif result == oscap.OVAL_RESULT_NOT_EVALUATED:
- usr['Not Evaluated'] += 1
+ usr['neval'] += 1
elif result == oscap.OVAL_RESULT_NOT_APPLICABLE:
- usr['Not Applicable'] += 1
+ usr['na'] += 1
if usr['verbose']:
print "Definintion '%(id)s' result: %(res)s" % {'id':id, 'res':oscap.oval_result_get_text(result)}
@@ -308,12 +308,11 @@ def output_oval_results(res_model, res_directives, xml, html):
return True
-def evaluate_oval(def_model, sess, f_xml, f_html, verbose=True):
- res_model = oscap.oval_agent_get_results_model(sess)
+def evaluate_oval(sess, f_results, verbose=False):
usr = {'false':0,
'true':0,
- 'invalid':0,
+ 'err':0,
'unknown':0,
'neval':0,
'na':0,
@@ -327,17 +326,19 @@ def evaluate_oval(def_model, sess, f_xml, f_html, verbose=True):
if oscap.oscap_err():
sys.stderr.write("Error: (%(code)d) %(desc)s" % {'code':oscap.oscap_err_code(),
'desc':oscap.oscap_err_desc()})
- return False
+ return None
+
+ res_model = oscap.oval_agent_get_results_model(sess)
print "--Results--\n" \
"True:\t\t%(true)s\n" \
"False:\t\t%(false)s\n" \
- "Invalid:\t\t%(invalid)s\n" \
+ "Error:\t\t%(err)s\n" \
"Unknown:\t\t%(unknown)s\n" \
"Not Evaluated:\t%(neval)s\n" \
"Not Applicable:\t%(na)s\n" % usr
- if f_xml or f_html:
+ if f_results:
res_direct = oscap.oval_result_directives_new(res_model)
oscap.oval_result_directives_set_reported(res_direct, oscap.OVAL_RESULT_TRUE |
oscap.OVAL_RESULT_FALSE |
@@ -351,8 +352,7 @@ def evaluate_oval(def_model, sess, f_xml, f_html, verbose=True):
output_oval_results(res_model, res_direct, f_xml, f_html)
oscap.oval_result_directives_free(res_direct)
- oscap.oval_agent_destroy_session(sess)
- return ret
+ return res_model
def is_benchmark(benchmark):
tree = xml.dom.minidom.parse(benchmark)
--
1.7.1.1
13 years, 10 months
[PATCH] Fixes the audit command to work with upstream
by Josh Adams
The python bindings finally got fixed upstream and we can audit again.
Also fixed an issue when combining definition models where it would only
clone the definition, and not the tests, states, or objects that the
definition referenced.
---
src/secstate/main.py | 79 +++++++++++++++++++++++++-------
src/secstate/util.py | 123 ++++++++++++++++++++++---------------------------
2 files changed, 116 insertions(+), 86 deletions(-)
diff --git a/src/secstate/main.py b/src/secstate/main.py
index 3ef48eb..1a5b6f4 100644
--- a/src/secstate/main.py
+++ b/src/secstate/main.py
@@ -94,6 +94,65 @@ class Secstate:
else:
return {}
+ def combine_def_models(self, target, source):
+ """
+ Function: Add all the definitions from the source model to the target model
+ Input: Two oval_definition_model's
+ Output: Success or failure of combination
+ """
+ definitions = oscap.oval_definition_model_get_definitions(source)
+ for defn in oval_definition_generator(definitions):
+ if oscap.oval_definition_model_get_definition(target, oscap.oval_definition_get_id(defn)) == None:
+ new_def = oscap.oval_definition_clone(target, defn)
+ if new_def == None:
+ self.log.error("Error adding definition %(name)s to target model" % {'name':oscap.oval_definition_get_id(defn)})
+ return False
+ else:
+ self.log.debug("Succesfully added %(name)s to target model" % {'name':oscap.oval_definition_get_id(defn)})
+
+ tests = oscap.oval_definition_model_get_tests(source)
+ for defn in oval_test_generator(tests):
+ print oscap.oval_test_get_id(defn)
+ if oscap.oval_definition_model_get_test(target, oscap.oval_test_get_id(defn)) == None:
+ new_def = oscap.oval_test_clone(target, defn)
+ if new_def == None:
+ self.log.error("Error adding test %(name)s to target model" % {'name':oscap.oval_test_get_id(defn)})
+ return False
+ else:
+ self.log.debug("Succesfully added %(name)s to target model" % {'name':oscap.oval_test_get_id(defn)})
+
+ objects = oscap.oval_definition_model_get_objects(source)
+ for defn in oval_object_generator(objects):
+ if oscap.oval_definition_model_get_object(target, oscap.oval_object_get_id(defn)) == None:
+ new_def = oscap.oval_object_clone(target, defn)
+ if new_def == None:
+ self.log.error("Error adding object %(name)s to target model" % {'name':oscap.oval_object_get_id(defn)})
+ return False
+ else:
+ self.log.debug("Succesfully added %(name)s to target model" % {'name':oscap.oval_object_get_id(defn)})
+
+ states = oscap.oval_definition_model_get_states(source)
+ for defn in oval_state_generator(states):
+ if oscap.oval_definition_model_get_state(target, oscap.oval_state_get_id(defn)) == None:
+ new_def = oscap.oval_state_clone(target, defn)
+ if new_def == None:
+ self.log.error("Error adding state %(name)s to target model" % {'name':oscap.oval_state_get_id(defn)})
+ return False
+ else:
+ self.log.debug("Succesfully added %(name)s to target model" % {'name':oscap.oval_state_get_id(defn)})
+
+ variables = oscap.oval_definition_model_get_variables(source)
+ for defn in oval_variable_generator(variables):
+ if oscap.oval_definition_model_get_variable(target, oscap.oval_variable_get_id(defn)) == None:
+ new_def = oscap.oval_variable_clone(target, defn)
+ if new_def == None:
+ self.log.error("Error adding variable %(name)s to target model" % {'name':oscap.oval_variable_get_id(defn)})
+ return False
+ else:
+ self.log.debug("Succesfully added %(name)s to target model" % {'name':oscap.oval_variable_get_id(defn)})
+
+ return True
+
def import_benchmark(self, benchmark_file, oval_path="", store_path=None):
"""
Function: Imports an XCCDF benchmark
@@ -394,23 +453,6 @@ class Secstate:
oscap.xccdf_benchmark_free(benchmark)
return True
- def combine_def_models(self, target, source):
- """
- Function: Add all the definitions from the source model to the target model
- Input: Two oval_definition_model's
- Output: Success or failure of combination
- """
- definitions = oscap.oval_definition_model_get_definitions(source)
- for defn in oval_definition_generator(definitions):
- if oscap.oval_definition_model_get_definition(target, oscap.oval_definition_get_id(defn)) == None:
- new_def = oscap.oval_definition_clone(target, defn)
- if new_def == None:
- self.log.error("Error adding definition %(name)s to target model" % {'name':oscap.oval_definition_get_id(defn)})
- return False
- else:
- self.log.debug("Succesfully added %(name)s to target model" % {'name':oscap.oval_definition_get_id(defn)})
- return True
-
def audit(self, interpreter, args, oval_only, def_models=None, verbose=False, xml=None, html=None):
"""
Function: Run an audit on the system agains the given definition model
@@ -448,9 +490,10 @@ class Secstate:
self.log.error("Error importing benchmark: %(bench)s" % {'bench':arg})
return False
+
if interpreter == "openscap":
sess = oscap.oval_agent_new_session(def_model)
- return evaluate_xccdf(benchmark, def_model, benchmark_path, sess, f_xml=xml, f_html=html)
+ return evaluate_xccdf(benchmark, def_model, arg, sess, f_xml=xml, f_html=html)
def search(self, search_string, verbose=False):
"""
diff --git a/src/secstate/util.py b/src/secstate/util.py
index 3b27a79..c9784ce 100644
--- a/src/secstate/util.py
+++ b/src/secstate/util.py
@@ -60,7 +60,7 @@ xccdf_generator_list_types = ['xccdf_item', 'xccdf_notice', 'xccdf_status', 'xcc
'xccdf_fixtext', 'xccdf_check_content_ref', 'xccdf_check_import',
'xccdf_fix', 'xccdf_check_export', 'xccdf_warning', 'xccdf_instance',
'xccdf_message', 'xccdf_override', 'xccdf_rule_result', 'xccdf_score',
- 'xccdf_target_fact', 'xccdf_plain_text']
+ 'xccdf_target_fact', 'xccdf_plain_text', 'xccdf_model']
for type in xccdf_generator_list_types:
gen_func, list_func = iterator_to_generator_and_list('%s_iterator' % type)
@@ -177,6 +177,10 @@ def xccdf_callback(model, rule_id, id, bindings, usr):
return True
+def callback(id, result, arg):
+ print "Rule '%(id)s' result: %(res)s" % {'id':id, 'res':oscap.xccdf_test_result_type_get_text(result)}
+ return 0
+
def evaluate_xccdf(benchmark, def_model, url_XCCDF, sess, s_profile=None, f_xml=None, f_html=None):
policy = None
policy_model = oscap.xccdf_policy_model_new(benchmark)
@@ -193,49 +197,53 @@ def evaluate_xccdf(benchmark, def_model, url_XCCDF, sess, s_profile=None, f_xml=
sys.stderr.write("No policy to evaluate.\n")
return -1
- usr = { 'result_id':"secstate_audit-test",
- 'asess':sess }
+ usr = oscap.oval_agent_cb_data_new()
+ oscap.oval_agent_cb_data_set_session(usr, sess)
+ oscap.oval_agent_cb_data_set_callback_py(usr, None, None)
+ oscap.oval_agent_cb_data_set_usr(usr, policy_model)
+
+ oscap.xccdf_policy_model_register_output_callback_py(policy_model, callback, None)
+ oscap.xccdf_policy_model_register_callback_py(policy_model, "http://oval.mitre.org/XMLSchema/oval-definitions-5",
+ oscap.oval_agent_eval_rule_py, usr)
- ritem = oscap.xccdf_result_new()
- oscap.xccdf_result_set_id(ritem, usr['result_id'])
+ ritem = oscap.xccdf_policy_evaluate(policy)
oscap.xccdf_result_set_benchmark_uri(ritem, url_XCCDF)
title = oscap.oscap_text_new()
oscap.oscap_text_set_text(title, "Secstate Audit Result")
oscap.xccdf_result_add_title(ritem, title)
oscap.xccdf_result_set_start_time(ritem, time.time())
- oscap.xccdf_policy_model_add_result(policy_model, ritem)
+ if policy != None:
+ id = oscap.xccdf_profile_get_id(oscap.xccdf_policy_get_profile(policy))
+ if id != None:
+ oscap.xccdf_result_set_profile(ritem, id)
+ oscap.oval_agent_export_sysinfo_to_xccdf_result(sess, ritem)
+
+ models = oscap.xccdf_benchmark_get_models(benchmark)
+ for model in xccdf_model_generator(models):
+ score = oscap.xccdf_policy_get_score(policy, ritem, oscap.xccdf_model_get_system(model))
+ oscap.xccdf_result_add_score(ritem, score)
- oscap.xccdf_policy_model_register_callback_py(policy_model, "http://oval.mitre.org/XMLSchema/oval-definitions-5",
- xccdf_callback, usr)
-
- oscap.xccdf_policy_evaluate(policy)
-
- res_model = oscap.oval_agent_get_results_model(usr['asess'])
- res_system = oscap.oval_result_system_iterator_next(oscap.oval_results_model_get_systems(res_model))
- sys_model = oscap.oval_result_system_get_syschar_model(res_system)
- sysinfo = oscap.oval_syschar_model_get_sysinfo(sys_model)
-
- oscap.xccdf_result_set_test_system(ritem, oscap.oval_sysinfo_get_primary_host_name(sysinfo))
- if (policy != None):
- profile = oscap.xccdf_policy_get_profile(policy)
- if oscap.xccdf_profile_get_id(profile) != None:
- oscap.xccdf_result_set_profile(ritem, oscap.xccdf_profile_get_id(profile))
-
- sysint_it = oscap.oval_sysinfo_get_interfaces(sysinfo)
- for sysint in oval_sysint_generator(sysint_it):
- oscap.xccdf_result_add_target_address(ritem, oscap.oval_sysint_get_ip_address(sysint))
- if oscap.oval_sysint_get_mac_address(sysint) != None:
- fact = oscap.xccdf_target_fact_new()
- oscap.xccdf_target_fact_set_name(fact, "urn:xccdf:fact:ethernet:MAC")
- oscap.xccdf_target_fact_set_string(fact, oscap.oval_sysint_get_mac_address(sysint))
-
oscap.xccdf_result_set_end_time(ritem, time.time())
- if f_xml != None:
- oscap.xccdf_result_export(ritem, f_xml)
+ #if f_xml != None:
+ # oscap.xccdf_result_export(ritem, f_xml)
+
+ oscap.oval_agent_cb_data_free(usr)
+ oscap.xccdf_policy_model_free(policy_model)
+ return 0
+
+def oval_callback(id, result, usr):
+ if result == oscap.OVAL_RESULT_TRUE:
+ usr['True'] += 1
+ elif result == oscap.OVAL_RESULT_FALSE:
+ usr['False'] += 1
+ elif result == oscap.OVAL_RESULT_UNKNOWN:
+ usr['Unknown'] += 1
+ elif result == oscap.OVAL_RESULT_NOT_EVALUATED:
+ usr['Not Evaluated'] += 1
+ elif result == oscap.OVAL_RESULT_NOT_APPLICABLE:
+ usr['Not Applicable'] += 1
- oscap.oval_agent_destroy_session(usr['asess'])
- #oscap.xccdf_policy_model_free(policy_model)
return 0
def output_oval_results(res_model, res_directives, xml, html):
@@ -270,34 +278,16 @@ def output_oval_results(res_model, res_directives, xml, html):
return True
-def oval_callback(id, result, usr):
- if usr['verbose']:
- print "Evaluated definition %(id)s: %(res)s" % {'id':id, 'res':oscap.oval_result_get_text(result)}
+def evaluate_oval(def_model, sess, f_xml, f_html, verbose=True):
+ res_model = oscap.oval_agent_get_results_model(sess)
- if result == oscap.OVAL_RESULT_TRUE:
- usr['true'] += 1
- elif result == oscap.OVAL_RESULT_FALSE:
- usr['false'] += 1
- if result == oscap.OVAL_RESULT_INVALID:
- usr['invalid'] += 1
- if result == oscap.OVAL_RESULT_UNKNOWN:
- usr['unknown'] += 1
- if result == oscap.OVAL_RESULT_NOT_EVALUATED:
- usr['neval'] += 1
- if result == oscap.OVAL_RESULT_NOT_APPLICABLE:
- usr['napp'] += 1
+ usr = {'False':0,
+ 'True':0,
+ 'Invalid':0,
+ 'Unknown':0,
+ 'Not Evaluated':0,
+ 'Not Applicable':0}
- return 0
-
-def evaluate_oval(def_model, sess, f_xml, f_html, verbose=False):
- res_model = oscap.oval_agent_get_results_model(sess)
- usr = {'verbose':verbose,
- 'true':0,
- 'false':0,
- 'invalid':0,
- 'unknown':0,
- 'neval':0,
- 'napp':0}
ret = oscap.oval_agent_eval_system_py(sess, oval_callback, usr)
if verbose:
@@ -309,14 +299,11 @@ def evaluate_oval(def_model, sess, f_xml, f_html, verbose=False):
'desc':oscap.oscap_err_desc()})
return False
- print "Results:"
- print "True:\t%d" % usr['true']
- print "False:\t%d" % usr['false']
- print "Invalid:\t%d" % usr['invalid']
- print "Unknown:\t%d" % usr['unknown']
- print "Not Evaluated:\t%d" % usr['neval']
- print "Not Applicable:\t%d" % usr['napp']
-
+ if verbose:
+ print "Results:"
+ for key,val in usr.items():
+ print "%(key)s:\t\t%(val)d" % {'key':key, 'val':val}
+
if f_xml or f_html:
res_direct = oscap.oval_result_directives_new(res_model)
oscap.oval_result_directives_set_reported(res_direct, oscap.OVAL_RESULT_INVALID |
--
1.7.0.1
13 years, 10 months
[PATCH] Fixes the audit command to work with upstream
by Josh Adams
The python bindings finally got fixed upstream and we can audit again.
Also fixed an issue when combining definition models where it would only
clone the definition, and not the tests, states, or objects that the
definition referenced.
---
src/secstate/main.py | 80 +++++++++++++++++++++++++-------
src/secstate/util.py | 125 +++++++++++++++++++++++---------------------------
2 files changed, 119 insertions(+), 86 deletions(-)
diff --git a/src/secstate/main.py b/src/secstate/main.py
index 3ef48eb..be106a7 100644
--- a/src/secstate/main.py
+++ b/src/secstate/main.py
@@ -94,6 +94,65 @@ class Secstate:
else:
return {}
+ def combine_def_models(self, target, source):
+ """
+ Function: Add all the definitions from the source model to the target model
+ Input: Two oval_definition_model's
+ Output: Success or failure of combination
+ """
+ definitions = oscap.oval_definition_model_get_definitions(source)
+ for defn in oval_definition_generator(definitions):
+ if oscap.oval_definition_model_get_definition(target, oscap.oval_definition_get_id(defn)) == None:
+ new_def = oscap.oval_definition_clone(target, defn)
+ if new_def == None:
+ self.log.error("Error adding definition %(name)s to target model" % {'name':oscap.oval_definition_get_id(defn)})
+ return False
+ else:
+ self.log.debug("Succesfully added %(name)s to target model" % {'name':oscap.oval_definition_get_id(defn)})
+
+ tests = oscap.oval_definition_model_get_tests(source)
+ for defn in oval_test_generator(tests):
+ print oscap.oval_test_get_id(defn)
+ if oscap.oval_definition_model_get_test(target, oscap.oval_test_get_id(defn)) == None:
+ new_def = oscap.oval_test_clone(target, defn)
+ if new_def == None:
+ self.log.error("Error adding test %(name)s to target model" % {'name':oscap.oval_test_get_id(defn)})
+ return False
+ else:
+ self.log.debug("Succesfully added %(name)s to target model" % {'name':oscap.oval_test_get_id(defn)})
+
+ objects = oscap.oval_definition_model_get_objects(source)
+ for defn in oval_object_generator(objects):
+ if oscap.oval_definition_model_get_object(target, oscap.oval_object_get_id(defn)) == None:
+ new_def = oscap.oval_object_clone(target, defn)
+ if new_def == None:
+ self.log.error("Error adding object %(name)s to target model" % {'name':oscap.oval_object_get_id(defn)})
+ return False
+ else:
+ self.log.debug("Succesfully added %(name)s to target model" % {'name':oscap.oval_object_get_id(defn)})
+
+ states = oscap.oval_definition_model_get_states(source)
+ for defn in oval_state_generator(states):
+ if oscap.oval_definition_model_get_state(target, oscap.oval_state_get_id(defn)) == None:
+ new_def = oscap.oval_state_clone(target, defn)
+ if new_def == None:
+ self.log.error("Error adding state %(name)s to target model" % {'name':oscap.oval_state_get_id(defn)})
+ return False
+ else:
+ self.log.debug("Succesfully added %(name)s to target model" % {'name':oscap.oval_state_get_id(defn)})
+
+ variables = oscap.oval_definition_model_get_variables(source)
+ for defn in oval_variable_generator(variables):
+ if oscap.oval_definition_model_get_variable(target, oscap.oval_variable_get_id(defn)) == None:
+ new_def = oscap.oval_variable_clone(target, defn)
+ if new_def == None:
+ self.log.error("Error adding variable %(name)s to target model" % {'name':oscap.oval_variable_get_id(defn)})
+ return False
+ else:
+ self.log.debug("Succesfully added %(name)s to target model" % {'name':oscap.oval_variable_get_id(defn)})
+
+ return True
+
def import_benchmark(self, benchmark_file, oval_path="", store_path=None):
"""
Function: Imports an XCCDF benchmark
@@ -394,23 +453,6 @@ class Secstate:
oscap.xccdf_benchmark_free(benchmark)
return True
- def combine_def_models(self, target, source):
- """
- Function: Add all the definitions from the source model to the target model
- Input: Two oval_definition_model's
- Output: Success or failure of combination
- """
- definitions = oscap.oval_definition_model_get_definitions(source)
- for defn in oval_definition_generator(definitions):
- if oscap.oval_definition_model_get_definition(target, oscap.oval_definition_get_id(defn)) == None:
- new_def = oscap.oval_definition_clone(target, defn)
- if new_def == None:
- self.log.error("Error adding definition %(name)s to target model" % {'name':oscap.oval_definition_get_id(defn)})
- return False
- else:
- self.log.debug("Succesfully added %(name)s to target model" % {'name':oscap.oval_definition_get_id(defn)})
- return True
-
def audit(self, interpreter, args, oval_only, def_models=None, verbose=False, xml=None, html=None):
"""
Function: Run an audit on the system agains the given definition model
@@ -448,9 +490,11 @@ class Secstate:
self.log.error("Error importing benchmark: %(bench)s" % {'bench':arg})
return False
+ oscap.oval_definition_model_export(def_model, "test_defs.xml")
+
if interpreter == "openscap":
sess = oscap.oval_agent_new_session(def_model)
- return evaluate_xccdf(benchmark, def_model, benchmark_path, sess, f_xml=xml, f_html=html)
+ return evaluate_xccdf(benchmark, def_model, arg, sess, f_xml=xml, f_html=html)
def search(self, search_string, verbose=False):
"""
diff --git a/src/secstate/util.py b/src/secstate/util.py
index 3b27a79..cfa4dd7 100644
--- a/src/secstate/util.py
+++ b/src/secstate/util.py
@@ -60,7 +60,7 @@ xccdf_generator_list_types = ['xccdf_item', 'xccdf_notice', 'xccdf_status', 'xcc
'xccdf_fixtext', 'xccdf_check_content_ref', 'xccdf_check_import',
'xccdf_fix', 'xccdf_check_export', 'xccdf_warning', 'xccdf_instance',
'xccdf_message', 'xccdf_override', 'xccdf_rule_result', 'xccdf_score',
- 'xccdf_target_fact', 'xccdf_plain_text']
+ 'xccdf_target_fact', 'xccdf_plain_text', 'xccdf_model']
for type in xccdf_generator_list_types:
gen_func, list_func = iterator_to_generator_and_list('%s_iterator' % type)
@@ -177,6 +177,10 @@ def xccdf_callback(model, rule_id, id, bindings, usr):
return True
+def callback(id, result, arg):
+ print "Rule '%(id)s' result: %(res)s" % {'id':id, 'res':oscap.xccdf_test_result_type_get_text(result)}
+ return 0
+
def evaluate_xccdf(benchmark, def_model, url_XCCDF, sess, s_profile=None, f_xml=None, f_html=None):
policy = None
policy_model = oscap.xccdf_policy_model_new(benchmark)
@@ -193,49 +197,53 @@ def evaluate_xccdf(benchmark, def_model, url_XCCDF, sess, s_profile=None, f_xml=
sys.stderr.write("No policy to evaluate.\n")
return -1
- usr = { 'result_id':"secstate_audit-test",
- 'asess':sess }
+ usr = oscap.oval_agent_cb_data_new()
+ oscap.oval_agent_cb_data_set_session(usr, sess)
+ oscap.oval_agent_cb_data_set_callback_py(usr, None, None)
+ oscap.oval_agent_cb_data_set_usr(usr, policy_model)
+
+ oscap.xccdf_policy_model_register_output_callback_py(policy_model, callback, None)
+ oscap.xccdf_policy_model_register_callback_py(policy_model, "http://oval.mitre.org/XMLSchema/oval-definitions-5",
+ oscap.oval_agent_eval_rule_py, usr)
- ritem = oscap.xccdf_result_new()
- oscap.xccdf_result_set_id(ritem, usr['result_id'])
+ ritem = oscap.xccdf_policy_evaluate(policy)
oscap.xccdf_result_set_benchmark_uri(ritem, url_XCCDF)
title = oscap.oscap_text_new()
oscap.oscap_text_set_text(title, "Secstate Audit Result")
oscap.xccdf_result_add_title(ritem, title)
oscap.xccdf_result_set_start_time(ritem, time.time())
- oscap.xccdf_policy_model_add_result(policy_model, ritem)
+ if policy != None:
+ id = oscap.xccdf_profile_get_id(oscap.xccdf_policy_get_profile(policy))
+ if id != None:
+ oscap.xccdf_result_set_profile(ritem, id)
+ oscap.oval_agent_export_sysinfo_to_xccdf_result(sess, ritem)
+
+ models = oscap.xccdf_benchmark_get_models(benchmark)
+ for model in xccdf_model_generator(models):
+ score = oscap.xccdf_policy_get_score(policy, ritem, oscap.xccdf_model_get_system(model))
+ oscap.xccdf_result_add_score(ritem, score)
- oscap.xccdf_policy_model_register_callback_py(policy_model, "http://oval.mitre.org/XMLSchema/oval-definitions-5",
- xccdf_callback, usr)
-
- oscap.xccdf_policy_evaluate(policy)
-
- res_model = oscap.oval_agent_get_results_model(usr['asess'])
- res_system = oscap.oval_result_system_iterator_next(oscap.oval_results_model_get_systems(res_model))
- sys_model = oscap.oval_result_system_get_syschar_model(res_system)
- sysinfo = oscap.oval_syschar_model_get_sysinfo(sys_model)
-
- oscap.xccdf_result_set_test_system(ritem, oscap.oval_sysinfo_get_primary_host_name(sysinfo))
- if (policy != None):
- profile = oscap.xccdf_policy_get_profile(policy)
- if oscap.xccdf_profile_get_id(profile) != None:
- oscap.xccdf_result_set_profile(ritem, oscap.xccdf_profile_get_id(profile))
-
- sysint_it = oscap.oval_sysinfo_get_interfaces(sysinfo)
- for sysint in oval_sysint_generator(sysint_it):
- oscap.xccdf_result_add_target_address(ritem, oscap.oval_sysint_get_ip_address(sysint))
- if oscap.oval_sysint_get_mac_address(sysint) != None:
- fact = oscap.xccdf_target_fact_new()
- oscap.xccdf_target_fact_set_name(fact, "urn:xccdf:fact:ethernet:MAC")
- oscap.xccdf_target_fact_set_string(fact, oscap.oval_sysint_get_mac_address(sysint))
-
oscap.xccdf_result_set_end_time(ritem, time.time())
- if f_xml != None:
- oscap.xccdf_result_export(ritem, f_xml)
+ #if f_xml != None:
+ # oscap.xccdf_result_export(ritem, f_xml)
+
+ oscap.oval_agent_cb_data_free(usr)
+ oscap.xccdf_policy_model_free(policy_model)
+ return 0
+
+def oval_callback(id, result, usr):
+ if result == oscap.OVAL_RESULT_TRUE:
+ usr['True'] += 1
+ elif result == oscap.OVAL_RESULT_FALSE:
+ usr['False'] += 1
+ elif result == oscap.OVAL_RESULT_UNKNOWN:
+ usr['Unknown'] += 1
+ elif result == oscap.OVAL_RESULT_NOT_EVALUATED:
+ usr['Not Evaluated'] += 1
+ elif result == oscap.OVAL_RESULT_NOT_APPLICABLE:
+ usr['Not Applicable'] += 1
- oscap.oval_agent_destroy_session(usr['asess'])
- #oscap.xccdf_policy_model_free(policy_model)
return 0
def output_oval_results(res_model, res_directives, xml, html):
@@ -270,34 +278,18 @@ def output_oval_results(res_model, res_directives, xml, html):
return True
-def oval_callback(id, result, usr):
- if usr['verbose']:
- print "Evaluated definition %(id)s: %(res)s" % {'id':id, 'res':oscap.oval_result_get_text(result)}
+def evaluate_oval(def_model, sess, f_xml, f_html, verbose=True):
+ print "Evaluating Oval"
+ print verbose
+ res_model = oscap.oval_agent_get_results_model(sess)
- if result == oscap.OVAL_RESULT_TRUE:
- usr['true'] += 1
- elif result == oscap.OVAL_RESULT_FALSE:
- usr['false'] += 1
- if result == oscap.OVAL_RESULT_INVALID:
- usr['invalid'] += 1
- if result == oscap.OVAL_RESULT_UNKNOWN:
- usr['unknown'] += 1
- if result == oscap.OVAL_RESULT_NOT_EVALUATED:
- usr['neval'] += 1
- if result == oscap.OVAL_RESULT_NOT_APPLICABLE:
- usr['napp'] += 1
+ usr = {'False':0,
+ 'True':0,
+ 'Invalid':0,
+ 'Unknown':0,
+ 'Not Evaluated':0,
+ 'Not Applicable':0}
- return 0
-
-def evaluate_oval(def_model, sess, f_xml, f_html, verbose=False):
- res_model = oscap.oval_agent_get_results_model(sess)
- usr = {'verbose':verbose,
- 'true':0,
- 'false':0,
- 'invalid':0,
- 'unknown':0,
- 'neval':0,
- 'napp':0}
ret = oscap.oval_agent_eval_system_py(sess, oval_callback, usr)
if verbose:
@@ -309,14 +301,11 @@ def evaluate_oval(def_model, sess, f_xml, f_html, verbose=False):
'desc':oscap.oscap_err_desc()})
return False
- print "Results:"
- print "True:\t%d" % usr['true']
- print "False:\t%d" % usr['false']
- print "Invalid:\t%d" % usr['invalid']
- print "Unknown:\t%d" % usr['unknown']
- print "Not Evaluated:\t%d" % usr['neval']
- print "Not Applicable:\t%d" % usr['napp']
-
+ if verbose:
+ print "Results:"
+ for key,val in usr.items():
+ print "%(key)s:\t\t%(val)d" % {'key':key, 'val':val}
+
if f_xml or f_html:
res_direct = oscap.oval_result_directives_new(res_model)
oscap.oval_result_directives_set_reported(res_direct, oscap.OVAL_RESULT_INVALID |
--
1.7.0.1
13 years, 10 months