diff options
Diffstat (limited to 'tempest-pull/src')
-rw-r--r-- | tempest-pull/src/__init__.py | 0 | ||||
-rw-r--r-- | tempest-pull/src/bundle.py | 208 | ||||
-rw-r--r-- | tempest-pull/src/lava.py | 25 | ||||
-rw-r--r-- | tempest-pull/src/subunitresults.py | 88 | ||||
-rw-r--r-- | tempest-pull/src/tempest-lava-pull.py | 149 | ||||
-rw-r--r-- | tempest-pull/src/util.py | 31 |
6 files changed, 501 insertions, 0 deletions
diff --git a/tempest-pull/src/__init__.py b/tempest-pull/src/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/tempest-pull/src/__init__.py diff --git a/tempest-pull/src/bundle.py b/tempest-pull/src/bundle.py new file mode 100644 index 0000000..6a79004 --- /dev/null +++ b/tempest-pull/src/bundle.py @@ -0,0 +1,208 @@ +import base64 +import datetime +import fnmatch +import json +import os +import string +import subprocess +import xmlrpclib + +from util import * +from subunitresults import SubunitResults + +SUBUNIT_RESULTS_FILE = "results.subunit" +ALL_TESTS_FILE = "all-tests.txt" + +class BundleStore(object): + def __init__(self, root_path): + self.root_path = root_path + + def bundle_list(self): + dirs = [] + root = os.path.expanduser(self.root_path) + for subdir in os.listdir(root): + if os.path.isdir(os.path.join(root, subdir)): + dirs.append(subdir) + return dirs + + def is_bundle_present(self, bundle): + if not isinstance(bundle, Bundle): + raise Exception("argument is not a Bundle") + bundles = self.bundle_list() + return bundle.sha1 in bundles + + def write_bundle_receipt(self, bundle, include_data=False): + if not isinstance(bundle, Bundle): + raise Exception("argument is not a Bundle") + root = os.path.expanduser(self.root_path) + bundle_root = os.path.join(root, bundle.sha1) + create_dir(bundle_root) + + full_meta_path = os.path.join(bundle_root, "metadata") + with open(full_meta_path, "w") as f: + f.write(json.dumps(bundle.entry_data, default=json_serial)) + if include_data: + full_data_path = os.path.join(bundle_root, "data") + with open(full_data_path, "w") as f: + f.write(json.dumps(bundle.data, default=json_serial)) + +class Bundle(object): + def __init__(self, sha1, entry_data, data): + self.metadata = None # not populated until expand() completes + self.entry_data = entry_data + self.data = json.loads(data) + self.sha1 = sha1 + self.upload_date = entry_data["uploaded_on"] + self.lava_job_id = entry_data["associated_job"] + self.subdir_whitelist = "logs" + self.subdir_unimportant = "xtra" + + self.tests_run = None + self.all_tests = None + self.failing_tests = None + self.passing_tests = None + self.skipped_tests = None + + # these are files that will be present in attachments for a test run that + # are not to be extracted + self.skip_files = [ + "testdef.yaml", + "return_code", + "run.sh", + "lava-results.sh" + ] + + # + # expand - process the bundle data, extracting and moving files to their + # desired locations + # + + def expand(self, whitelist, output_root): + # quit if there is not a valid LAVA job ID for it + if self.lava_job_id == "NA": + print "Invalid LAVA Job ID - skipping" + return + + # attempt to retrieve attributes from the first test in the bundle + attributes = self.data["test_runs"][0]["attributes"] + if not attributes: + print "Invalid bundle data - no test attributes present" + return + + # start building up the metadata to write out (and return) about the bundle + bundle_metadata = {} + bundle_metadata['bundle_sha1'] = self.sha1 + bundle_metadata['lava_job_id'] = self.lava_job_id + bundle_metadata['date_uploaded'] = self.upload_date.strftime('%Y-%m-%dT%H:%M:%S') + bundle_metadata['lava_job_attributes'] = attributes + + # create the names for the root directories + output_subdir = str(self.lava_job_id) + #if "os-distro" in attributes and "os-version" in attributes and "devstack-branch" in attributes: + # output_subdir = "%s,os=%s,osver=%s,branch=%s" % \ + # (self.lava_job_id, attributes["os-distro"], attributes["os-version"], attributes["devstack-branch"]) + full_root_path = os.path.join(os.path.expanduser(output_root), output_subdir) + + whitelist_path = os.path.join(full_root_path, self.subdir_whitelist) + unimp_path = os.path.join(full_root_path, self.subdir_unimportant) + print "storing output here: { root = '%s', whitelist = '%s', unimportant = '%s' }" % \ + (full_root_path, whitelist_path, unimp_path) + + # create the root and top-level subdirectories + create_dir(full_root_path) + create_dir(whitelist_path) + create_dir(unimp_path) + + # loop through all of the tests in the bundle + for test_run in self.data["test_runs"]: + test_id = test_run["test_id"] + print "processing test [%s]" % test_id + + # create directories if necessary + test_root_path = os.path.join(unimp_path, test_id) + create_dir(test_root_path) + + # see if there is a whitelist specified for the test + test_whitelist = None + if test_id in whitelist: + test_whitelist = whitelist[test_id] + + # process attachments if there are any + if "attachments" in test_run: + for attachment in test_run["attachments"]: + matching_whitelist_filter = None + + filename = attachment["pathname"] + if not filename in self.skip_files: + if test_whitelist: + # see if the file matches one of the whitelist patterns + for filter in test_whitelist: + pattern = filter["src"] + if fnmatch.fnmatch(filename, pattern): + matching_whitelist_filter = filter + break + + # build the full path to the output file, assuming it + # has not been whitelisted + full_file_path = os.path.join(test_root_path, filename) + + if matching_whitelist_filter: + filename2 = filename + # the file has been whitelisted -- see if it is supposed + # to be renamed + if 'new-name' in matching_whitelist_filter: + filename2 = matching_whitelist_filter['new-name'] + # build the full path for the whitelisted file + full_file_path = os.path.join(whitelist_path, filename2) + + # create the directory if necessary + dir_name = os.path.dirname(full_file_path) + create_dir(dir_name) + + # finally - write the attachment + with open(full_file_path, "w") as f: + decoded_data = base64.b64decode(attachment["content"]) + f.write(decoded_data) + + # check for results.subunut in whitelisted files and process + self.process_subunit(whitelist_path) + + # get final list of whitelisted files and add to the metadata + whitelist_file_list = get_recursive_file_list(whitelist_path) + bundle_metadata["file_list"] = whitelist_file_list + + # write the metadata + with open(os.path.join(full_root_path, "metadata.json"), "w") as f: + json.dump(bundle_metadata, f) + + # touch the directory with the original creation date + subprocess.check_output(["touch", "--date=%s" % self.upload_date, full_root_path]) + + self.metadata = bundle_metadata + + return bundle_metadata + + def process_subunit(self, whitelist_path): + subunit_stream_path = os.path.join(whitelist_path, SUBUNIT_RESULTS_FILE) + all_tests_path = os.path.join(whitelist_path, ALL_TESTS_FILE) + if not os.path.exists(subunit_stream_path): + return None + if not os.path.exists(all_tests_path): + return None + r = SubunitResults(subunit_stream_path, all_tests_path) + self.tests_run = r.get_tests_run() + with open(os.path.join(whitelist_path, "tests-run.json"), "w") as f: + json.dump(self.tests_run, f) + self.all_tests = r.get_all_tests() + with open(os.path.join(whitelist_path, "tests-all.json"), "w") as f: + json.dump(self.all_tests, f) + self.failing_tests = r.get_failing_tests() + with open(os.path.join(whitelist_path, "tests-failing.json"), "w") as f: + json.dump(self.failing_tests, f) + self.passing_tests = r.get_passing_tests() + with open(os.path.join(whitelist_path, "tests-passing.json"), "w") as f: + json.dump(self.passing_tests, f) + self.skipped_tests = r.get_skipped_tests() + with open(os.path.join(whitelist_path, "tests-skipped.json"), "w") as f: + json.dump(self.skipped_tests, f) + diff --git a/tempest-pull/src/lava.py b/tempest-pull/src/lava.py new file mode 100644 index 0000000..5be75cc --- /dev/null +++ b/tempest-pull/src/lava.py @@ -0,0 +1,25 @@ +import xmlrpclib + +from bundle import Bundle + +class LAVADashboard(object): + def __init__(self, endpoint, bundle_stream_name): + self.endpoint = endpoint + self.bundle_stream_name = bundle_stream_name + self.rpcserver = None + + def connect(self): + self.rpcserver = xmlrpclib.ServerProxy(self.endpoint, \ + None, None, None, None, True) + + def retrieve_bundle(self, bundle_entry): + sha1 = bundle_entry["content_sha1"] + data = self.rpcserver.dashboard.get(sha1) + return Bundle(sha1, bundle_entry, data["content"]) + + def server_bundle_list(self): + bundles = self.rpcserver.dashboard.bundles(self.bundle_stream_name) + bundles_sha1 = [ b["content_sha1"] for b in bundles ] + return (bundles_sha1, bundles) + + diff --git a/tempest-pull/src/subunitresults.py b/tempest-pull/src/subunitresults.py new file mode 100644 index 0000000..e2caf7e --- /dev/null +++ b/tempest-pull/src/subunitresults.py @@ -0,0 +1,88 @@ +import csv +import subprocess +import string +import os +import StringIO +import re +import datetime + + +class SubunitResults(object): + def __init__(self, subunit_stream, all_tests_file): + self.subunit_stream = subunit_stream + self.all_tests_file = all_tests_file + + # clean up a test id + def clean_test_id(self, test_id): + if '[' in test_id: + test_id = test_id[:test_id.index('[')] + return re.sub('[^-0-9A-Za-z_.]', '-', test_id) + + # extract a "class name" from a test id + def get_class_name(self, test_id, class_depth): + clean_name = self.clean_test_id(test_id) + words = clean_name.split(".") + return ".".join(words[1:class_depth]) + + # convert subunit2csv data to json + def csv_to_json(self, input_csv): + f = StringIO.StringIO(input_csv) + reader = csv.DictReader(f) + result = [] + for row in reader: + test_id = row["test"] + # clean up the test name (get rid of the [uuid]) part + row["test"] = self.clean_test_id(test_id) + # add an entry for the class + row["class"] = self.get_class_name(test_id, 3) + result.append(row) + return result + + def get_tests_run(self): + p1 = subprocess.Popen(["cat", self.subunit_stream], stdout=subprocess.PIPE) + p2 = subprocess.Popen(["subunit2csv", "--no-passthrough"], stdin=p1.stdout, stdout=subprocess.PIPE) + p1.stdout.close() + output = p2.communicate()[0] + return self.csv_to_json(output) + + def get_failing_tests(self): + p1 = subprocess.Popen(["cat", self.subunit_stream], stdout = subprocess.PIPE) + p2 = subprocess.Popen(["subunit-filter", "--only-genuine-failures", "--no-passthrough"], stdin=p1.stdout, stdout=subprocess.PIPE) + p3 = subprocess.Popen(["subunit2csv", "--no-passthrough"], stdin=p2.stdout, stdout=subprocess.PIPE) + p1.stdout.close() + p2.stdout.close() + output = p3.communicate()[0] + return self.csv_to_json(output) + + def get_failing_tests_xml(self): + p1 = subprocess.Popen(["cat", self.subunit_stream], stdout = subprocess.PIPE) + p2 = subprocess.Popen(["subunit-filter", "--only-genuine-failures", "--passthrough"], stdin=p1.stdout, stdout=subprocess.PIPE) + p3 = subprocess.Popen(["subunit2junitxml"], stdin=p2.stdout, stdout=subprocess.PIPE) + p1.stdout.close() + p2.stdout.close() + output = p3.communicate()[0] + return output + + def get_passing_tests(self): + p1 = subprocess.Popen(["cat", self.subunit_stream], stdout = subprocess.PIPE) + p2 = subprocess.Popen(["subunit-filter", "--no-skip", "--no-failure", "--success", "--no-passthrough"], stdin=p1.stdout, stdout=subprocess.PIPE) + p3 = subprocess.Popen(["subunit2csv", "--no-passthrough"], stdin=p2.stdout, stdout=subprocess.PIPE) + p1.stdout.close() + p2.stdout.close() + output = p3.communicate()[0] + return self.csv_to_json(output) + + def get_skipped_tests(self): + p1 = subprocess.Popen(["cat", self.subunit_stream], stdout = subprocess.PIPE) + p2 = subprocess.Popen(["subunit-filter", "--no-error", "--no-failure", "--no-success", "--no-xfail", "--no-passthrough"], stdin=p1.stdout, stdout=subprocess.PIPE) + p3 = subprocess.Popen(["subunit2csv", "--no-passthrough"], stdin=p2.stdout, stdout=subprocess.PIPE) + p1.stdout.close() + p2.stdout.close() + output = p3.communicate()[0] + return self.csv_to_json(output) + + def get_all_tests(self): + with open(self.all_tests_file) as f: + return [self.clean_test_id(line.rstrip('\n')) for line in f] + + diff --git a/tempest-pull/src/tempest-lava-pull.py b/tempest-pull/src/tempest-lava-pull.py new file mode 100644 index 0000000..aa74c50 --- /dev/null +++ b/tempest-pull/src/tempest-lava-pull.py @@ -0,0 +1,149 @@ +import gc +import os +import string + +from py2neo import Graph, Node, Relationship +from lava import LAVADashboard +from bundle import BundleStore +from bundle import Bundle + +# this is the URL of the LAVA instance to retrieve from +LAVA_XMLRPC_ENDPOINT = "https://openstack.validation.linaro.org/RPC2" + +# this is the URL of the Neo4J instance +NEO4J_ENDPOINT = "http://neo4j:linaro@localhost:7474/db/data/" + +# this is the name of the bundle stream to retrieve from +BUNDLE_STREAM_NAME = "/public/team/openstack/tempest-ci/" + +# this is the root location where the raw bundle JSON will be downloaded +# and stored. Each bundle will be stored as a single .txt file inside +# a directory named with the bundle UUID. Any bundle that already has a +# directory present will be skipped on subsequent runs. +BUNDLES_ROOT = os.environ["LAVA_PULL_BUNDLEROOT"] #"/srv/tempest/bundles/" + +# this is the root location where the bundle's extracted log data and +# other files will be written +LOGS_ROOT = os.environ["LAVA_PULL_LOGROOT"] #"/srv/tempest/logs/" + +# these are the important files that are to be brought to the top-level of +# the output directory +WHITELIST = { + 'devstack' : [ + { 'src' : "stdout.log", 'new-name' : "stdout-devstack.log" } + ], + 'tempest-summary' : [ + { 'src' : "tempest-summary.txt" } + ], + 'tempest-run' : [ + { 'src' : "*.log.gz" }, + { 'src' : "*.txt.gz" }, + { 'src' : "apache2/*" }, + { 'src' : "libvirt/*" }, + { 'src' : "all-tests.txt" }, + { 'src' : "results.subunit" }, + { 'src' : "stack.sh.summary.gz" }, + { 'src' : "stdout.log", 'new-name' : "stdout-tempest.log" }, + { 'src' : "tempest_conf.txt" } + ] +} + +class Neo4JDatabase(object): + def __init__(self, endpoint): + self.endpoint = endpoint + + def store_bundle(self, bundle): + if not isinstance(bundle, Bundle): + raise Exception("argument is not a Bundle") + + graph = Graph(self.endpoint) + + print "creating graph for bundle" + + # find (or create) the OS version + distro node + os_version = bundle.metadata["lava_job_attributes"]["os-version"] + os_distro = bundle.metadata["lava_job_attributes"]["os-distro"] + os_name = "%s/%s" % (os_distro, os_version) + OS_node = graph.find_one("OS", "name", os_name) + if not OS_node: + OS_node = Node("OS", name=os_name, distro=os_distro, version=os_version) + graph.create(OS_node) + + # find (or create) the devstack branch node + devstack_branch = bundle.metadata["lava_job_attributes"]["devstack-branch"] + Branch_node = graph.find_one("Devstack", "name", devstack_branch) + if not Branch_node: + Branch_node = Node("Devstack", name=devstack_branch) + graph.create(Branch_node) + + # create the main tempest run node and associate with the OS and Branch + TempestRun_node = Node("TempestRun", \ + date = bundle.metadata["date_uploaded"], \ + lava_job = bundle.metadata["lava_job_id"], \ + sha1 = bundle.metadata["bundle_sha1"]) + if bundle.all_tests: + TempestRun_node.properties["all_tests"] = len(bundle.all_tests) + if bundle.tests_run: + TempestRun_node.properties["tests_run"] = len(bundle.tests_run) + if bundle.failing_tests: + TempestRun_node.properties["failing_tests"] = len(bundle.failing_tests) + if bundle.skipped_tests: + TempestRun_node.properties["skipped_tests"] = len(bundle.skipped_tests) + if bundle.passing_tests: + TempestRun_node.properties["passing_tests"] = len(bundle.passing_tests) + OS_relationship = Relationship(TempestRun_node, "ON", OS_node) + Branch_relationship = Relationship(TempestRun_node, "USING", Branch_node) + graph.create(TempestRun_node, OS_relationship, Branch_relationship) + + # create all of the tests and relate them back to the tempest node + for test_set in [bundle.failing_tests, bundle.passing_tests, bundle.skipped_tests]: + if test_set: + print "adding tests" + for test in test_set: + Test_node = Node("Test", test["status"], \ + name=test["test"], \ + status=test["status"], \ + start_time=test["start_time"], \ + stop_time=test["stop_time"], \ + test_class=test["class"]) + Test_relationship = Relationship(TempestRun_node, \ + "HAS_TEST", Test_node, status=test["status"]) + graph.create(Test_node, Test_relationship) + + + + +def main(): + lava = LAVADashboard(LAVA_XMLRPC_ENDPOINT, BUNDLE_STREAM_NAME) + lava.connect() + + store = BundleStore(BUNDLES_ROOT) + store_sha1_list = store.bundle_list() + + bundle_sha1_list, bundle_list = lava.server_bundle_list() + for entry in bundle_list: + sha1 = entry["content_sha1"] + if sha1 in store_sha1_list: + print "[%s] skipping, already processed" % sha1 + continue + + print "------------------------------------------------------------------------------------------------------" + + print "downloading new entry:" + print entry + + bundle = lava.retrieve_bundle(entry) + + print "[%s]:" % sha1 + + metadata = bundle.expand(WHITELIST, LOGS_ROOT) + + database = Neo4JDatabase(NEO4J_ENDPOINT) + database.store_bundle(bundle) + + store.write_bundle_receipt(bundle) + del bundle + gc.collect() + + +main() diff --git a/tempest-pull/src/util.py b/tempest-pull/src/util.py new file mode 100644 index 0000000..b9b3582 --- /dev/null +++ b/tempest-pull/src/util.py @@ -0,0 +1,31 @@ +import os +from datetime import datetime + +# +# create_dir - create a directory if it does not already exist +# + +def create_dir(path): + if not os.path.exists(path): + os.makedirs(path) + +# +# get_recursive_file_list - returns a list containing all files +# in a directory +# + +def get_recursive_file_list(path): + return [os.path.join(dp[len(path):], f) \ + for dp, dn, fn in os.walk(os.path.expanduser(path)) for f in fn] + +# +# json_serial - json serializer for datetime +# + +def json_serial(obj): + """JSON serializer for objects not serializable by default json code""" + if isinstance(obj, datetime): + serial = obj.isoformat() + return serial + raise TypeError ("Type not serializable") + |