diff options
Diffstat (limited to 'tempest-pull/app')
-rw-r--r-- | tempest-pull/app/neo4j.py | 11 | ||||
-rw-r--r-- | tempest-pull/app/subunitresults.py | 94 |
2 files changed, 51 insertions, 54 deletions
diff --git a/tempest-pull/app/neo4j.py b/tempest-pull/app/neo4j.py index 2415d07..3a03d43 100644 --- a/tempest-pull/app/neo4j.py +++ b/tempest-pull/app/neo4j.py @@ -62,11 +62,14 @@ class Neo4JDatabase(object): print "adding tests" for test in test_set: Test_node = Node("Test", test["status"], \ - name=test["test"], \ + name=test["name"], \ + full_name=test["fullname"], \ status=test["status"], \ - start_time=test["start_time"], \ - stop_time=test["stop_time"], \ - test_class=test["class"]) + time=test["time"], \ + test_class=test["class"], \ + test_category=test["category"]) + if test["status"] == "skip": + Test_node.properties["reason"] = test["reason"] Test_relationship = Relationship(TempestRun_node, \ "HAS_TEST", Test_node, status=test["status"]) graph.create(Test_node, Test_relationship) diff --git a/tempest-pull/app/subunitresults.py b/tempest-pull/app/subunitresults.py index e2caf7e..91b56b8 100644 --- a/tempest-pull/app/subunitresults.py +++ b/tempest-pull/app/subunitresults.py @@ -1,16 +1,22 @@ -import csv import subprocess import string import os import StringIO import re import datetime +import xmltodict +PASS = "pass" +FAIL = "fail" +SKIP = "skip" +NA = "n/a" class SubunitResults(object): def __init__(self, subunit_stream, all_tests_file): self.subunit_stream = subunit_stream self.all_tests_file = all_tests_file + self.data = { PASS:[], SKIP:[], FAIL:[], NA:[] } + self.parse_stream() # clean up a test id def clean_test_id(self, test_id): @@ -18,68 +24,56 @@ class SubunitResults(object): test_id = test_id[:test_id.index('[')] return re.sub('[^-0-9A-Za-z_.]', '-', test_id) - # extract a "class name" from a test id - def get_class_name(self, test_id, class_depth): + # extract a "class major category" from a test id + def get_class_category(self, test_id, class_depth): clean_name = self.clean_test_id(test_id) words = clean_name.split(".") return ".".join(words[1:class_depth]) - # convert subunit2csv data to json - def csv_to_json(self, input_csv): - f = StringIO.StringIO(input_csv) - reader = csv.DictReader(f) - result = [] - for row in reader: - test_id = row["test"] - # clean up the test name (get rid of the [uuid]) part - row["test"] = self.clean_test_id(test_id) - # add an entry for the class - row["class"] = self.get_class_name(test_id, 3) - result.append(row) - return result - - def get_tests_run(self): + def parse_stream(self): p1 = subprocess.Popen(["cat", self.subunit_stream], stdout=subprocess.PIPE) - p2 = subprocess.Popen(["subunit2csv", "--no-passthrough"], stdin=p1.stdout, stdout=subprocess.PIPE) + p2 = subprocess.Popen(["subunit2junitxml"], stdin=p1.stdout, stdout=subprocess.PIPE) p1.stdout.close() output = p2.communicate()[0] - return self.csv_to_json(output) - def get_failing_tests(self): - p1 = subprocess.Popen(["cat", self.subunit_stream], stdout = subprocess.PIPE) - p2 = subprocess.Popen(["subunit-filter", "--only-genuine-failures", "--no-passthrough"], stdin=p1.stdout, stdout=subprocess.PIPE) - p3 = subprocess.Popen(["subunit2csv", "--no-passthrough"], stdin=p2.stdout, stdout=subprocess.PIPE) - p1.stdout.close() - p2.stdout.close() - output = p3.communicate()[0] - return self.csv_to_json(output) + temp_data = xmltodict.parse(output) + if "testsuite" in temp_data: + if "testcase" in temp_data["testsuite"]: + for testcase in temp_data["testsuite"]["testcase"]: + entry = self.create_entry(testcase) + self.data[entry["status"]].append(entry) - def get_failing_tests_xml(self): - p1 = subprocess.Popen(["cat", self.subunit_stream], stdout = subprocess.PIPE) - p2 = subprocess.Popen(["subunit-filter", "--only-genuine-failures", "--passthrough"], stdin=p1.stdout, stdout=subprocess.PIPE) - p3 = subprocess.Popen(["subunit2junitxml"], stdin=p2.stdout, stdout=subprocess.PIPE) - p1.stdout.close() - p2.stdout.close() - output = p3.communicate()[0] - return output + def create_entry(self, e): + entry = {} + entry["fullname"] = e["@name"] + entry["name"] = self.clean_test_id(e["@name"]) + entry["class"] = e["@classname"] + entry["category"] = self.get_class_category(entry["class"], 3) + entry["time"] = e["@time"] + entry["status"] = PASS + if "skip" in e: + entry["status"] = SKIP + entry["reason"] = e["skip"] + elif "failure" in e: + entry["status"] = FAIL + entry["reason"] = e["failure"] + return entry + + def get_tests_run(self): + combined = [] + combined.extend(self.data[FAIL]) + combined.extend(self.data[SKIP]) + combined.extend(self.data[PASS]) + return combined + + def get_failing_tests(self): + return self.data[FAIL] def get_passing_tests(self): - p1 = subprocess.Popen(["cat", self.subunit_stream], stdout = subprocess.PIPE) - p2 = subprocess.Popen(["subunit-filter", "--no-skip", "--no-failure", "--success", "--no-passthrough"], stdin=p1.stdout, stdout=subprocess.PIPE) - p3 = subprocess.Popen(["subunit2csv", "--no-passthrough"], stdin=p2.stdout, stdout=subprocess.PIPE) - p1.stdout.close() - p2.stdout.close() - output = p3.communicate()[0] - return self.csv_to_json(output) + return self.data[PASS] def get_skipped_tests(self): - p1 = subprocess.Popen(["cat", self.subunit_stream], stdout = subprocess.PIPE) - p2 = subprocess.Popen(["subunit-filter", "--no-error", "--no-failure", "--no-success", "--no-xfail", "--no-passthrough"], stdin=p1.stdout, stdout=subprocess.PIPE) - p3 = subprocess.Popen(["subunit2csv", "--no-passthrough"], stdin=p2.stdout, stdout=subprocess.PIPE) - p1.stdout.close() - p2.stdout.close() - output = p3.communicate()[0] - return self.csv_to_json(output) + return self.data[SKIP] def get_all_tests(self): with open(self.all_tests_file) as f: |