summaryrefslogtreecommitdiff
path: root/tempest-pull/app
diff options
context:
space:
mode:
Diffstat (limited to 'tempest-pull/app')
-rw-r--r--tempest-pull/app/neo4j.py11
-rw-r--r--tempest-pull/app/subunitresults.py94
2 files changed, 51 insertions, 54 deletions
diff --git a/tempest-pull/app/neo4j.py b/tempest-pull/app/neo4j.py
index 2415d07..3a03d43 100644
--- a/tempest-pull/app/neo4j.py
+++ b/tempest-pull/app/neo4j.py
@@ -62,11 +62,14 @@ class Neo4JDatabase(object):
print "adding tests"
for test in test_set:
Test_node = Node("Test", test["status"], \
- name=test["test"], \
+ name=test["name"], \
+ full_name=test["fullname"], \
status=test["status"], \
- start_time=test["start_time"], \
- stop_time=test["stop_time"], \
- test_class=test["class"])
+ time=test["time"], \
+ test_class=test["class"], \
+ test_category=test["category"])
+ if test["status"] == "skip":
+ Test_node.properties["reason"] = test["reason"]
Test_relationship = Relationship(TempestRun_node, \
"HAS_TEST", Test_node, status=test["status"])
graph.create(Test_node, Test_relationship)
diff --git a/tempest-pull/app/subunitresults.py b/tempest-pull/app/subunitresults.py
index e2caf7e..91b56b8 100644
--- a/tempest-pull/app/subunitresults.py
+++ b/tempest-pull/app/subunitresults.py
@@ -1,16 +1,22 @@
-import csv
import subprocess
import string
import os
import StringIO
import re
import datetime
+import xmltodict
+PASS = "pass"
+FAIL = "fail"
+SKIP = "skip"
+NA = "n/a"
class SubunitResults(object):
def __init__(self, subunit_stream, all_tests_file):
self.subunit_stream = subunit_stream
self.all_tests_file = all_tests_file
+ self.data = { PASS:[], SKIP:[], FAIL:[], NA:[] }
+ self.parse_stream()
# clean up a test id
def clean_test_id(self, test_id):
@@ -18,68 +24,56 @@ class SubunitResults(object):
test_id = test_id[:test_id.index('[')]
return re.sub('[^-0-9A-Za-z_.]', '-', test_id)
- # extract a "class name" from a test id
- def get_class_name(self, test_id, class_depth):
+ # extract a "class major category" from a test id
+ def get_class_category(self, test_id, class_depth):
clean_name = self.clean_test_id(test_id)
words = clean_name.split(".")
return ".".join(words[1:class_depth])
- # convert subunit2csv data to json
- def csv_to_json(self, input_csv):
- f = StringIO.StringIO(input_csv)
- reader = csv.DictReader(f)
- result = []
- for row in reader:
- test_id = row["test"]
- # clean up the test name (get rid of the [uuid]) part
- row["test"] = self.clean_test_id(test_id)
- # add an entry for the class
- row["class"] = self.get_class_name(test_id, 3)
- result.append(row)
- return result
-
- def get_tests_run(self):
+ def parse_stream(self):
p1 = subprocess.Popen(["cat", self.subunit_stream], stdout=subprocess.PIPE)
- p2 = subprocess.Popen(["subunit2csv", "--no-passthrough"], stdin=p1.stdout, stdout=subprocess.PIPE)
+ p2 = subprocess.Popen(["subunit2junitxml"], stdin=p1.stdout, stdout=subprocess.PIPE)
p1.stdout.close()
output = p2.communicate()[0]
- return self.csv_to_json(output)
- def get_failing_tests(self):
- p1 = subprocess.Popen(["cat", self.subunit_stream], stdout = subprocess.PIPE)
- p2 = subprocess.Popen(["subunit-filter", "--only-genuine-failures", "--no-passthrough"], stdin=p1.stdout, stdout=subprocess.PIPE)
- p3 = subprocess.Popen(["subunit2csv", "--no-passthrough"], stdin=p2.stdout, stdout=subprocess.PIPE)
- p1.stdout.close()
- p2.stdout.close()
- output = p3.communicate()[0]
- return self.csv_to_json(output)
+ temp_data = xmltodict.parse(output)
+ if "testsuite" in temp_data:
+ if "testcase" in temp_data["testsuite"]:
+ for testcase in temp_data["testsuite"]["testcase"]:
+ entry = self.create_entry(testcase)
+ self.data[entry["status"]].append(entry)
- def get_failing_tests_xml(self):
- p1 = subprocess.Popen(["cat", self.subunit_stream], stdout = subprocess.PIPE)
- p2 = subprocess.Popen(["subunit-filter", "--only-genuine-failures", "--passthrough"], stdin=p1.stdout, stdout=subprocess.PIPE)
- p3 = subprocess.Popen(["subunit2junitxml"], stdin=p2.stdout, stdout=subprocess.PIPE)
- p1.stdout.close()
- p2.stdout.close()
- output = p3.communicate()[0]
- return output
+ def create_entry(self, e):
+ entry = {}
+ entry["fullname"] = e["@name"]
+ entry["name"] = self.clean_test_id(e["@name"])
+ entry["class"] = e["@classname"]
+ entry["category"] = self.get_class_category(entry["class"], 3)
+ entry["time"] = e["@time"]
+ entry["status"] = PASS
+ if "skip" in e:
+ entry["status"] = SKIP
+ entry["reason"] = e["skip"]
+ elif "failure" in e:
+ entry["status"] = FAIL
+ entry["reason"] = e["failure"]
+ return entry
+
+ def get_tests_run(self):
+ combined = []
+ combined.extend(self.data[FAIL])
+ combined.extend(self.data[SKIP])
+ combined.extend(self.data[PASS])
+ return combined
+
+ def get_failing_tests(self):
+ return self.data[FAIL]
def get_passing_tests(self):
- p1 = subprocess.Popen(["cat", self.subunit_stream], stdout = subprocess.PIPE)
- p2 = subprocess.Popen(["subunit-filter", "--no-skip", "--no-failure", "--success", "--no-passthrough"], stdin=p1.stdout, stdout=subprocess.PIPE)
- p3 = subprocess.Popen(["subunit2csv", "--no-passthrough"], stdin=p2.stdout, stdout=subprocess.PIPE)
- p1.stdout.close()
- p2.stdout.close()
- output = p3.communicate()[0]
- return self.csv_to_json(output)
+ return self.data[PASS]
def get_skipped_tests(self):
- p1 = subprocess.Popen(["cat", self.subunit_stream], stdout = subprocess.PIPE)
- p2 = subprocess.Popen(["subunit-filter", "--no-error", "--no-failure", "--no-success", "--no-xfail", "--no-passthrough"], stdin=p1.stdout, stdout=subprocess.PIPE)
- p3 = subprocess.Popen(["subunit2csv", "--no-passthrough"], stdin=p2.stdout, stdout=subprocess.PIPE)
- p1.stdout.close()
- p2.stdout.close()
- output = p3.communicate()[0]
- return self.csv_to_json(output)
+ return self.data[SKIP]
def get_all_tests(self):
with open(self.all_tests_file) as f: