#!/usr/bin/env python import datetime import os import re import sys import shlex import shutil import subprocess import xml.etree.ElementTree as ET import argparse import logging import time sys.path.insert(0, '../../lib/') import py_test_lib # nopep8 OUTPUT = '%s/output' % os.getcwd() RESULT_FILE = '%s/result.txt' % OUTPUT TRADEFED_STDOUT = '%s/tradefed-stdout.txt' % OUTPUT TRADEFED_LOGCAT = '%s/tradefed-logcat.txt' % OUTPUT TEST_PARAMS = '' AGGREGATED = 'aggregated' ATOMIC = 'atomic' def result_parser(xml_file, result_format): etree_file = open(xml_file, 'rb') etree_content = etree_file.read() rx = re.compile("&#([0-9]+);|&#x([0-9a-fA-F]+);") endpos = len(etree_content) pos = 0 while pos < endpos: # remove characters that don't conform to XML spec m = rx.search(etree_content, pos) if not m: break mstart, mend = m.span() target = m.group(1) if target: num = int(target) else: num = int(m.group(2), 16) # #x9 | #xA | #xD | [#x20-#xD7FF] | [#xE000-#xFFFD] | [#x10000-#x10FFFF] if not(num in (0x9, 0xA, 0xD) or 0x20 <= num <= 0xD7FF or 0xE000 <= num <= 0xFFFD or 0x10000 <= num <= 0x10FFFF): etree_content = etree_content[:mstart] + etree_content[mend:] endpos = len(etree_content) pos = mend try: root = ET.fromstring(etree_content) except ET.ParseError as e: logger.error('xml.etree.ElementTree.ParseError: %s' % e) logger.info('Please Check %s manually' % xml_file) sys.exit(1) logger.info('Test modules in %s: %s' % (xml_file, str(len(root.findall('Module'))))) failures_count = 0 for elem in root.findall('Module'): # Naming: Module Name + Test Case Name + Test Name if 'abi' in elem.attrib.keys(): module_name = '.'.join([elem.attrib['abi'], elem.attrib['name']]) else: module_name = elem.attrib['name'] if result_format == AGGREGATED: tests_executed = len(elem.findall('.//Test')) tests_passed = len(elem.findall('.//Test[@result="pass"]')) tests_failed = len(elem.findall('.//Test[@result="fail"]')) result = '%s_executed pass %s' % (module_name, str(tests_executed)) py_test_lib.add_result(RESULT_FILE, result) result = '%s_passed pass %s' % (module_name, str(tests_passed)) py_test_lib.add_result(RESULT_FILE, result) failed_result = 'pass' if tests_failed > 0: failed_result = 'fail' result = '%s_failed %s %s' % (module_name, failed_result, str(tests_failed)) py_test_lib.add_result(RESULT_FILE, result) # output result to show if the module is done or not tests_done = elem.get('done', 'false') if tests_done == 'false': result = '%s_done fail' % module_name else: result = '%s_done pass' % module_name py_test_lib.add_result(RESULT_FILE, result) if args.FAILURES_PRINTED > 0 and failures_count < args.FAILURES_PRINTED: # print failed test cases for debug test_cases = elem.findall('.//TestCase') for test_case in test_cases: failed_tests = test_case.findall('.//Test[@result="fail"]') for failed_test in failed_tests: test_name = '%s/%s.%s' % (module_name, test_case.get("name"), failed_test.get("name")) failures = failed_test.findall('.//Failure') failure_msg = '' for failure in failures: failure_msg = '%s \n %s' % (failure_msg, failure.get('message')) logger.info('%s %s' % (test_name, failure_msg.strip())) failures_count = failures_count + 1 if failures_count > args.FAILURES_PRINTED: logger.info('There are more than %d test cases ' 'failed, the output for the rest ' 'failed test cases will be ' 'skipped.' % (args.FAILURES_PRINTED)) #break the for loop of failed_tests break if failures_count > args.FAILURES_PRINTED: #break the for loop of test_cases break if result_format == ATOMIC: test_cases = elem.findall('.//TestCase') for test_case in test_cases: tests = test_case.findall('.//Test') for atomic_test in tests: atomic_test_result = atomic_test.get("result") atomic_test_name = "%s/%s.%s" % (module_name, test_case.get("name"), atomic_test.get("name")) py_test_lib.add_result( RESULT_FILE, "%s %s" % (atomic_test_name, atomic_test_result)) parser = argparse.ArgumentParser() parser.add_argument('-t', dest='TEST_PARAMS', required=True, help="tradefed shell test parameters") parser.add_argument('-p', dest='TEST_PATH', required=True, help="path to tradefed package top directory") parser.add_argument('-r', dest='RESULTS_FORMAT', required=False, default=AGGREGATED, choices=[AGGREGATED, ATOMIC], help="The format of the saved results. 'aggregated' means number of \ passed and failed tests are recorded for each module. 'atomic' means \ each test result is recorded separately") ## The total number of failed test cases to be printed for this job ## Print too much failures would cause the lava job timed out ## Default to not print any failures parser.add_argument('-f', dest='FAILURES_PRINTED', type=int, required=False, default=0, help="Speciy the number of failed test cases to be\ printed, 0 means not print any failures.") args = parser.parse_args() # TEST_PARAMS = args.TEST_PARAMS if os.path.exists(OUTPUT): suffix = datetime.datetime.now().strftime('%Y%m%d%H%M%S') shutil.move(OUTPUT, '%s_%s' % (OUTPUT, suffix)) os.makedirs(OUTPUT) # Setup logger. # There might be an issue in lava/local dispatcher, most likely problem of # pexpect. It prints the messages from print() last, not by sequence. # Use logging and subprocess.call() to work around this. logger = logging.getLogger('Tradefed') logger.setLevel(logging.DEBUG) ch = logging.StreamHandler() ch.setLevel(logging.DEBUG) formatter = logging.Formatter('%(asctime)s - %(name)s: %(levelname)s: %(message)s') ch.setFormatter(formatter) logger.addHandler(ch) tradefed_stdout = open(TRADEFED_STDOUT, 'w') tradefed_logcat_out = open(TRADEFED_LOGCAT, 'w') tradefed_logcat = subprocess.Popen(['adb', 'logcat'], stdout=tradefed_logcat_out) logger.info('Test params: %s' % args.TEST_PARAMS) logger.info('Starting tradefed shell test...') command = None prompt = None if args.TEST_PATH == "android-cts": command = "android-cts/tools/cts-tradefed run commandAndExit " + args.TEST_PARAMS if args.TEST_PATH == "android-vts": os.environ["VTS_ROOT"] = os.getcwd() command = "android-vts/tools/vts-tradefed run commandAndExit " + args.TEST_PARAMS if command is None: logger.error("Not supported path: %s" % args.TEST_PATH) sys.exit(1) child = subprocess.Popen(shlex.split(command), stderr=subprocess.STDOUT, stdout=tradefed_stdout) fail_to_complete = child.wait() if fail_to_complete: py_test_lib.add_result(RESULT_FILE, 'tradefed-test-run fail') else: py_test_lib.add_result(RESULT_FILE, 'tradefed-test-run pass') logger.info('Tradefed test finished') tradefed_stdout.close() tradefed_logcat.kill() tradefed_logcat_out.close() # Locate and parse test result. result_dir = '%s/results' % args.TEST_PATH test_result = 'test_result.xml' if os.path.exists(result_dir) and os.path.isdir(result_dir): for root, dirs, files in os.walk(result_dir): for name in files: if name == test_result: result_parser(os.path.join(root, name), args.RESULTS_FORMAT)