summaryrefslogtreecommitdiff
path: root/tempest-pull/src
diff options
context:
space:
mode:
Diffstat (limited to 'tempest-pull/src')
-rw-r--r--tempest-pull/src/__init__.py0
-rw-r--r--tempest-pull/src/bundle.py208
-rw-r--r--tempest-pull/src/lava.py25
-rw-r--r--tempest-pull/src/subunitresults.py88
-rw-r--r--tempest-pull/src/tempest-lava-pull.py149
-rw-r--r--tempest-pull/src/util.py31
6 files changed, 501 insertions, 0 deletions
diff --git a/tempest-pull/src/__init__.py b/tempest-pull/src/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/tempest-pull/src/__init__.py
diff --git a/tempest-pull/src/bundle.py b/tempest-pull/src/bundle.py
new file mode 100644
index 0000000..6a79004
--- /dev/null
+++ b/tempest-pull/src/bundle.py
@@ -0,0 +1,208 @@
+import base64
+import datetime
+import fnmatch
+import json
+import os
+import string
+import subprocess
+import xmlrpclib
+
+from util import *
+from subunitresults import SubunitResults
+
+SUBUNIT_RESULTS_FILE = "results.subunit"
+ALL_TESTS_FILE = "all-tests.txt"
+
+class BundleStore(object):
+ def __init__(self, root_path):
+ self.root_path = root_path
+
+ def bundle_list(self):
+ dirs = []
+ root = os.path.expanduser(self.root_path)
+ for subdir in os.listdir(root):
+ if os.path.isdir(os.path.join(root, subdir)):
+ dirs.append(subdir)
+ return dirs
+
+ def is_bundle_present(self, bundle):
+ if not isinstance(bundle, Bundle):
+ raise Exception("argument is not a Bundle")
+ bundles = self.bundle_list()
+ return bundle.sha1 in bundles
+
+ def write_bundle_receipt(self, bundle, include_data=False):
+ if not isinstance(bundle, Bundle):
+ raise Exception("argument is not a Bundle")
+ root = os.path.expanduser(self.root_path)
+ bundle_root = os.path.join(root, bundle.sha1)
+ create_dir(bundle_root)
+
+ full_meta_path = os.path.join(bundle_root, "metadata")
+ with open(full_meta_path, "w") as f:
+ f.write(json.dumps(bundle.entry_data, default=json_serial))
+ if include_data:
+ full_data_path = os.path.join(bundle_root, "data")
+ with open(full_data_path, "w") as f:
+ f.write(json.dumps(bundle.data, default=json_serial))
+
+class Bundle(object):
+ def __init__(self, sha1, entry_data, data):
+ self.metadata = None # not populated until expand() completes
+ self.entry_data = entry_data
+ self.data = json.loads(data)
+ self.sha1 = sha1
+ self.upload_date = entry_data["uploaded_on"]
+ self.lava_job_id = entry_data["associated_job"]
+ self.subdir_whitelist = "logs"
+ self.subdir_unimportant = "xtra"
+
+ self.tests_run = None
+ self.all_tests = None
+ self.failing_tests = None
+ self.passing_tests = None
+ self.skipped_tests = None
+
+ # these are files that will be present in attachments for a test run that
+ # are not to be extracted
+ self.skip_files = [
+ "testdef.yaml",
+ "return_code",
+ "run.sh",
+ "lava-results.sh"
+ ]
+
+ #
+ # expand - process the bundle data, extracting and moving files to their
+ # desired locations
+ #
+
+ def expand(self, whitelist, output_root):
+ # quit if there is not a valid LAVA job ID for it
+ if self.lava_job_id == "NA":
+ print "Invalid LAVA Job ID - skipping"
+ return
+
+ # attempt to retrieve attributes from the first test in the bundle
+ attributes = self.data["test_runs"][0]["attributes"]
+ if not attributes:
+ print "Invalid bundle data - no test attributes present"
+ return
+
+ # start building up the metadata to write out (and return) about the bundle
+ bundle_metadata = {}
+ bundle_metadata['bundle_sha1'] = self.sha1
+ bundle_metadata['lava_job_id'] = self.lava_job_id
+ bundle_metadata['date_uploaded'] = self.upload_date.strftime('%Y-%m-%dT%H:%M:%S')
+ bundle_metadata['lava_job_attributes'] = attributes
+
+ # create the names for the root directories
+ output_subdir = str(self.lava_job_id)
+ #if "os-distro" in attributes and "os-version" in attributes and "devstack-branch" in attributes:
+ # output_subdir = "%s,os=%s,osver=%s,branch=%s" % \
+ # (self.lava_job_id, attributes["os-distro"], attributes["os-version"], attributes["devstack-branch"])
+ full_root_path = os.path.join(os.path.expanduser(output_root), output_subdir)
+
+ whitelist_path = os.path.join(full_root_path, self.subdir_whitelist)
+ unimp_path = os.path.join(full_root_path, self.subdir_unimportant)
+ print "storing output here: { root = '%s', whitelist = '%s', unimportant = '%s' }" % \
+ (full_root_path, whitelist_path, unimp_path)
+
+ # create the root and top-level subdirectories
+ create_dir(full_root_path)
+ create_dir(whitelist_path)
+ create_dir(unimp_path)
+
+ # loop through all of the tests in the bundle
+ for test_run in self.data["test_runs"]:
+ test_id = test_run["test_id"]
+ print "processing test [%s]" % test_id
+
+ # create directories if necessary
+ test_root_path = os.path.join(unimp_path, test_id)
+ create_dir(test_root_path)
+
+ # see if there is a whitelist specified for the test
+ test_whitelist = None
+ if test_id in whitelist:
+ test_whitelist = whitelist[test_id]
+
+ # process attachments if there are any
+ if "attachments" in test_run:
+ for attachment in test_run["attachments"]:
+ matching_whitelist_filter = None
+
+ filename = attachment["pathname"]
+ if not filename in self.skip_files:
+ if test_whitelist:
+ # see if the file matches one of the whitelist patterns
+ for filter in test_whitelist:
+ pattern = filter["src"]
+ if fnmatch.fnmatch(filename, pattern):
+ matching_whitelist_filter = filter
+ break
+
+ # build the full path to the output file, assuming it
+ # has not been whitelisted
+ full_file_path = os.path.join(test_root_path, filename)
+
+ if matching_whitelist_filter:
+ filename2 = filename
+ # the file has been whitelisted -- see if it is supposed
+ # to be renamed
+ if 'new-name' in matching_whitelist_filter:
+ filename2 = matching_whitelist_filter['new-name']
+ # build the full path for the whitelisted file
+ full_file_path = os.path.join(whitelist_path, filename2)
+
+ # create the directory if necessary
+ dir_name = os.path.dirname(full_file_path)
+ create_dir(dir_name)
+
+ # finally - write the attachment
+ with open(full_file_path, "w") as f:
+ decoded_data = base64.b64decode(attachment["content"])
+ f.write(decoded_data)
+
+ # check for results.subunut in whitelisted files and process
+ self.process_subunit(whitelist_path)
+
+ # get final list of whitelisted files and add to the metadata
+ whitelist_file_list = get_recursive_file_list(whitelist_path)
+ bundle_metadata["file_list"] = whitelist_file_list
+
+ # write the metadata
+ with open(os.path.join(full_root_path, "metadata.json"), "w") as f:
+ json.dump(bundle_metadata, f)
+
+ # touch the directory with the original creation date
+ subprocess.check_output(["touch", "--date=%s" % self.upload_date, full_root_path])
+
+ self.metadata = bundle_metadata
+
+ return bundle_metadata
+
+ def process_subunit(self, whitelist_path):
+ subunit_stream_path = os.path.join(whitelist_path, SUBUNIT_RESULTS_FILE)
+ all_tests_path = os.path.join(whitelist_path, ALL_TESTS_FILE)
+ if not os.path.exists(subunit_stream_path):
+ return None
+ if not os.path.exists(all_tests_path):
+ return None
+ r = SubunitResults(subunit_stream_path, all_tests_path)
+ self.tests_run = r.get_tests_run()
+ with open(os.path.join(whitelist_path, "tests-run.json"), "w") as f:
+ json.dump(self.tests_run, f)
+ self.all_tests = r.get_all_tests()
+ with open(os.path.join(whitelist_path, "tests-all.json"), "w") as f:
+ json.dump(self.all_tests, f)
+ self.failing_tests = r.get_failing_tests()
+ with open(os.path.join(whitelist_path, "tests-failing.json"), "w") as f:
+ json.dump(self.failing_tests, f)
+ self.passing_tests = r.get_passing_tests()
+ with open(os.path.join(whitelist_path, "tests-passing.json"), "w") as f:
+ json.dump(self.passing_tests, f)
+ self.skipped_tests = r.get_skipped_tests()
+ with open(os.path.join(whitelist_path, "tests-skipped.json"), "w") as f:
+ json.dump(self.skipped_tests, f)
+
diff --git a/tempest-pull/src/lava.py b/tempest-pull/src/lava.py
new file mode 100644
index 0000000..5be75cc
--- /dev/null
+++ b/tempest-pull/src/lava.py
@@ -0,0 +1,25 @@
+import xmlrpclib
+
+from bundle import Bundle
+
+class LAVADashboard(object):
+ def __init__(self, endpoint, bundle_stream_name):
+ self.endpoint = endpoint
+ self.bundle_stream_name = bundle_stream_name
+ self.rpcserver = None
+
+ def connect(self):
+ self.rpcserver = xmlrpclib.ServerProxy(self.endpoint, \
+ None, None, None, None, True)
+
+ def retrieve_bundle(self, bundle_entry):
+ sha1 = bundle_entry["content_sha1"]
+ data = self.rpcserver.dashboard.get(sha1)
+ return Bundle(sha1, bundle_entry, data["content"])
+
+ def server_bundle_list(self):
+ bundles = self.rpcserver.dashboard.bundles(self.bundle_stream_name)
+ bundles_sha1 = [ b["content_sha1"] for b in bundles ]
+ return (bundles_sha1, bundles)
+
+
diff --git a/tempest-pull/src/subunitresults.py b/tempest-pull/src/subunitresults.py
new file mode 100644
index 0000000..e2caf7e
--- /dev/null
+++ b/tempest-pull/src/subunitresults.py
@@ -0,0 +1,88 @@
+import csv
+import subprocess
+import string
+import os
+import StringIO
+import re
+import datetime
+
+
+class SubunitResults(object):
+ def __init__(self, subunit_stream, all_tests_file):
+ self.subunit_stream = subunit_stream
+ self.all_tests_file = all_tests_file
+
+ # clean up a test id
+ def clean_test_id(self, test_id):
+ if '[' in test_id:
+ test_id = test_id[:test_id.index('[')]
+ return re.sub('[^-0-9A-Za-z_.]', '-', test_id)
+
+ # extract a "class name" from a test id
+ def get_class_name(self, test_id, class_depth):
+ clean_name = self.clean_test_id(test_id)
+ words = clean_name.split(".")
+ return ".".join(words[1:class_depth])
+
+ # convert subunit2csv data to json
+ def csv_to_json(self, input_csv):
+ f = StringIO.StringIO(input_csv)
+ reader = csv.DictReader(f)
+ result = []
+ for row in reader:
+ test_id = row["test"]
+ # clean up the test name (get rid of the [uuid]) part
+ row["test"] = self.clean_test_id(test_id)
+ # add an entry for the class
+ row["class"] = self.get_class_name(test_id, 3)
+ result.append(row)
+ return result
+
+ def get_tests_run(self):
+ p1 = subprocess.Popen(["cat", self.subunit_stream], stdout=subprocess.PIPE)
+ p2 = subprocess.Popen(["subunit2csv", "--no-passthrough"], stdin=p1.stdout, stdout=subprocess.PIPE)
+ p1.stdout.close()
+ output = p2.communicate()[0]
+ return self.csv_to_json(output)
+
+ def get_failing_tests(self):
+ p1 = subprocess.Popen(["cat", self.subunit_stream], stdout = subprocess.PIPE)
+ p2 = subprocess.Popen(["subunit-filter", "--only-genuine-failures", "--no-passthrough"], stdin=p1.stdout, stdout=subprocess.PIPE)
+ p3 = subprocess.Popen(["subunit2csv", "--no-passthrough"], stdin=p2.stdout, stdout=subprocess.PIPE)
+ p1.stdout.close()
+ p2.stdout.close()
+ output = p3.communicate()[0]
+ return self.csv_to_json(output)
+
+ def get_failing_tests_xml(self):
+ p1 = subprocess.Popen(["cat", self.subunit_stream], stdout = subprocess.PIPE)
+ p2 = subprocess.Popen(["subunit-filter", "--only-genuine-failures", "--passthrough"], stdin=p1.stdout, stdout=subprocess.PIPE)
+ p3 = subprocess.Popen(["subunit2junitxml"], stdin=p2.stdout, stdout=subprocess.PIPE)
+ p1.stdout.close()
+ p2.stdout.close()
+ output = p3.communicate()[0]
+ return output
+
+ def get_passing_tests(self):
+ p1 = subprocess.Popen(["cat", self.subunit_stream], stdout = subprocess.PIPE)
+ p2 = subprocess.Popen(["subunit-filter", "--no-skip", "--no-failure", "--success", "--no-passthrough"], stdin=p1.stdout, stdout=subprocess.PIPE)
+ p3 = subprocess.Popen(["subunit2csv", "--no-passthrough"], stdin=p2.stdout, stdout=subprocess.PIPE)
+ p1.stdout.close()
+ p2.stdout.close()
+ output = p3.communicate()[0]
+ return self.csv_to_json(output)
+
+ def get_skipped_tests(self):
+ p1 = subprocess.Popen(["cat", self.subunit_stream], stdout = subprocess.PIPE)
+ p2 = subprocess.Popen(["subunit-filter", "--no-error", "--no-failure", "--no-success", "--no-xfail", "--no-passthrough"], stdin=p1.stdout, stdout=subprocess.PIPE)
+ p3 = subprocess.Popen(["subunit2csv", "--no-passthrough"], stdin=p2.stdout, stdout=subprocess.PIPE)
+ p1.stdout.close()
+ p2.stdout.close()
+ output = p3.communicate()[0]
+ return self.csv_to_json(output)
+
+ def get_all_tests(self):
+ with open(self.all_tests_file) as f:
+ return [self.clean_test_id(line.rstrip('\n')) for line in f]
+
+
diff --git a/tempest-pull/src/tempest-lava-pull.py b/tempest-pull/src/tempest-lava-pull.py
new file mode 100644
index 0000000..aa74c50
--- /dev/null
+++ b/tempest-pull/src/tempest-lava-pull.py
@@ -0,0 +1,149 @@
+import gc
+import os
+import string
+
+from py2neo import Graph, Node, Relationship
+from lava import LAVADashboard
+from bundle import BundleStore
+from bundle import Bundle
+
+# this is the URL of the LAVA instance to retrieve from
+LAVA_XMLRPC_ENDPOINT = "https://openstack.validation.linaro.org/RPC2"
+
+# this is the URL of the Neo4J instance
+NEO4J_ENDPOINT = "http://neo4j:linaro@localhost:7474/db/data/"
+
+# this is the name of the bundle stream to retrieve from
+BUNDLE_STREAM_NAME = "/public/team/openstack/tempest-ci/"
+
+# this is the root location where the raw bundle JSON will be downloaded
+# and stored. Each bundle will be stored as a single .txt file inside
+# a directory named with the bundle UUID. Any bundle that already has a
+# directory present will be skipped on subsequent runs.
+BUNDLES_ROOT = os.environ["LAVA_PULL_BUNDLEROOT"] #"/srv/tempest/bundles/"
+
+# this is the root location where the bundle's extracted log data and
+# other files will be written
+LOGS_ROOT = os.environ["LAVA_PULL_LOGROOT"] #"/srv/tempest/logs/"
+
+# these are the important files that are to be brought to the top-level of
+# the output directory
+WHITELIST = {
+ 'devstack' : [
+ { 'src' : "stdout.log", 'new-name' : "stdout-devstack.log" }
+ ],
+ 'tempest-summary' : [
+ { 'src' : "tempest-summary.txt" }
+ ],
+ 'tempest-run' : [
+ { 'src' : "*.log.gz" },
+ { 'src' : "*.txt.gz" },
+ { 'src' : "apache2/*" },
+ { 'src' : "libvirt/*" },
+ { 'src' : "all-tests.txt" },
+ { 'src' : "results.subunit" },
+ { 'src' : "stack.sh.summary.gz" },
+ { 'src' : "stdout.log", 'new-name' : "stdout-tempest.log" },
+ { 'src' : "tempest_conf.txt" }
+ ]
+}
+
+class Neo4JDatabase(object):
+ def __init__(self, endpoint):
+ self.endpoint = endpoint
+
+ def store_bundle(self, bundle):
+ if not isinstance(bundle, Bundle):
+ raise Exception("argument is not a Bundle")
+
+ graph = Graph(self.endpoint)
+
+ print "creating graph for bundle"
+
+ # find (or create) the OS version + distro node
+ os_version = bundle.metadata["lava_job_attributes"]["os-version"]
+ os_distro = bundle.metadata["lava_job_attributes"]["os-distro"]
+ os_name = "%s/%s" % (os_distro, os_version)
+ OS_node = graph.find_one("OS", "name", os_name)
+ if not OS_node:
+ OS_node = Node("OS", name=os_name, distro=os_distro, version=os_version)
+ graph.create(OS_node)
+
+ # find (or create) the devstack branch node
+ devstack_branch = bundle.metadata["lava_job_attributes"]["devstack-branch"]
+ Branch_node = graph.find_one("Devstack", "name", devstack_branch)
+ if not Branch_node:
+ Branch_node = Node("Devstack", name=devstack_branch)
+ graph.create(Branch_node)
+
+ # create the main tempest run node and associate with the OS and Branch
+ TempestRun_node = Node("TempestRun", \
+ date = bundle.metadata["date_uploaded"], \
+ lava_job = bundle.metadata["lava_job_id"], \
+ sha1 = bundle.metadata["bundle_sha1"])
+ if bundle.all_tests:
+ TempestRun_node.properties["all_tests"] = len(bundle.all_tests)
+ if bundle.tests_run:
+ TempestRun_node.properties["tests_run"] = len(bundle.tests_run)
+ if bundle.failing_tests:
+ TempestRun_node.properties["failing_tests"] = len(bundle.failing_tests)
+ if bundle.skipped_tests:
+ TempestRun_node.properties["skipped_tests"] = len(bundle.skipped_tests)
+ if bundle.passing_tests:
+ TempestRun_node.properties["passing_tests"] = len(bundle.passing_tests)
+ OS_relationship = Relationship(TempestRun_node, "ON", OS_node)
+ Branch_relationship = Relationship(TempestRun_node, "USING", Branch_node)
+ graph.create(TempestRun_node, OS_relationship, Branch_relationship)
+
+ # create all of the tests and relate them back to the tempest node
+ for test_set in [bundle.failing_tests, bundle.passing_tests, bundle.skipped_tests]:
+ if test_set:
+ print "adding tests"
+ for test in test_set:
+ Test_node = Node("Test", test["status"], \
+ name=test["test"], \
+ status=test["status"], \
+ start_time=test["start_time"], \
+ stop_time=test["stop_time"], \
+ test_class=test["class"])
+ Test_relationship = Relationship(TempestRun_node, \
+ "HAS_TEST", Test_node, status=test["status"])
+ graph.create(Test_node, Test_relationship)
+
+
+
+
+def main():
+ lava = LAVADashboard(LAVA_XMLRPC_ENDPOINT, BUNDLE_STREAM_NAME)
+ lava.connect()
+
+ store = BundleStore(BUNDLES_ROOT)
+ store_sha1_list = store.bundle_list()
+
+ bundle_sha1_list, bundle_list = lava.server_bundle_list()
+ for entry in bundle_list:
+ sha1 = entry["content_sha1"]
+ if sha1 in store_sha1_list:
+ print "[%s] skipping, already processed" % sha1
+ continue
+
+ print "------------------------------------------------------------------------------------------------------"
+
+ print "downloading new entry:"
+ print entry
+
+ bundle = lava.retrieve_bundle(entry)
+
+ print "[%s]:" % sha1
+
+ metadata = bundle.expand(WHITELIST, LOGS_ROOT)
+
+ database = Neo4JDatabase(NEO4J_ENDPOINT)
+ database.store_bundle(bundle)
+
+ store.write_bundle_receipt(bundle)
+ del bundle
+ gc.collect()
+
+
+main()
diff --git a/tempest-pull/src/util.py b/tempest-pull/src/util.py
new file mode 100644
index 0000000..b9b3582
--- /dev/null
+++ b/tempest-pull/src/util.py
@@ -0,0 +1,31 @@
+import os
+from datetime import datetime
+
+#
+# create_dir - create a directory if it does not already exist
+#
+
+def create_dir(path):
+ if not os.path.exists(path):
+ os.makedirs(path)
+
+#
+# get_recursive_file_list - returns a list containing all files
+# in a directory
+#
+
+def get_recursive_file_list(path):
+ return [os.path.join(dp[len(path):], f) \
+ for dp, dn, fn in os.walk(os.path.expanduser(path)) for f in fn]
+
+#
+# json_serial - json serializer for datetime
+#
+
+def json_serial(obj):
+ """JSON serializer for objects not serializable by default json code"""
+ if isinstance(obj, datetime):
+ serial = obj.isoformat()
+ return serial
+ raise TypeError ("Type not serializable")
+