diff options
authorMarc Bonnici <marc.bonnici@arm.com>2018-10-31 14:40:09 +0000
committersetrofim <setrofim@gmail.com>2018-12-07 09:55:17 +0000
commit0fee3debea0adfe03de7cef7b476f16dcb391009 (patch)
parent423882a8e6cde8d071e06e4f548a84b9e4dee67d (diff)
fw/output: Implement the Output API for using a database backend
Allow for the creating of a RunDatabaseOutput to allow for utilizing WA output API from run data stored in a postgres database.
1 files changed, 469 insertions, 4 deletions
diff --git a/wa/framework/output.py b/wa/framework/output.py
index aa2d66c2..a613ef98 100644
--- a/wa/framework/output.py
+++ b/wa/framework/output.py
@@ -13,23 +13,32 @@
# limitations under the License.
+ import psycopg2
+ from psycopg2 import Error as Psycopg2Error
+except ImportError:
+ psycopg2 = None
+ Psycopg2Error = None
import logging
import os
import shutil
-from collections import OrderedDict
+from collections import OrderedDict, defaultdict
from copy import copy, deepcopy
from datetime import datetime
+from io import StringIO
import devlib
from wa.framework.configuration.core import JobSpec, Status
from wa.framework.configuration.execution import CombinedConfig
-from wa.framework.exception import HostError
+from wa.framework.exception import HostError, SerializerSyntaxError, ConfigError
from wa.framework.run import RunState, RunInfo
from wa.framework.target.info import TargetInfo
from wa.framework.version import get_wa_version_with_commit
+from wa.utils.doc import format_simple_table
from wa.utils.misc import touch, ensure_directory_exists, isiterable
-from wa.utils.serializer import write_pod, read_pod, Podable
+from wa.utils.serializer import write_pod, read_pod, Podable, json
from wa.utils.types import enum, numeric
@@ -165,6 +174,7 @@ class Output(object):
def __str__(self):
return os.path.basename(self.basepath)
class RunOutputCommon(object):
''' Split out common functionality to form a second base of
the RunOutput classes
@@ -320,7 +330,6 @@ class RunOutput(Output, RunOutputCommon):
job_output.basepath = failed_path
class JobOutput(Output):
kind = 'job'
@@ -736,3 +745,459 @@ def _save_raw_config(meta_dir, state):
basename = os.path.basename(source)
dest_path = os.path.join(raw_config_dir, 'cfg{}-{}'.format(i, basename))
shutil.copy(source, dest_path)
+class DatabaseOutput(Output):
+ kind = None
+ @property
+ def resultfile(self):
+ if self.conn is None or self.oid is None:
+ return {}
+ pod = self._get_pod_version()
+ pod['metrics'] = self._get_metrics()
+ pod['status'] = self._get_status()
+ pod['classifiers'] = self._get_classifiers(self.oid, 'run')
+ pod['events'] = self._get_events()
+ pod['artifacts'] = self._get_artifacts()
+ return pod
+ @staticmethod
+ def _build_command(columns, tables, conditions=None, joins=None):
+ cmd = '''SELECT\n\t{}\nFROM\n\t{}'''.format(',\n\t'.join(columns), ',\n\t'.join(tables))
+ if joins:
+ for join in joins:
+ cmd += '''\nLEFT JOIN {} ON {}'''.format(join[0], join[1])
+ if conditions:
+ cmd += '''\nWHERE\n\t{}'''.format('\nAND\n\t'.join(conditions))
+ return cmd + ';'
+ def __init__(self, conn, oid=None, reload=True): # pylint: disable=super-init-not-called
+ self.conn = conn
+ self.oid = oid
+ self.result = None
+ if reload:
+ self.reload()
+ def __repr__(self):
+ return '<{} {}>'.format(self.__class__.__name__, self.oid)
+ def __str__(self):
+ return self.oid
+ def reload(self):
+ try:
+ self.result = Result.from_pod(self.resultfile)
+ except Exception as e: # pylint: disable=broad-except
+ self.result = Result()
+ self.result.status = Status.UNKNOWN
+ self.add_event(str(e))
+ def get_artifact_path(self, name):
+ artifact = self.get_artifact(name)
+ artifact = StringIO(self.conn.lobject(int(artifact.path)).read())
+ self.conn.commit()
+ return artifact
+ # pylint: disable=too-many-locals
+ def _read_db(self, columns, tables, conditions=None, join=None, as_dict=True):
+ # Automatically remove table name from column when using column names as keys or
+ # allow for column names to be aliases when retrieving the data,
+ # (db_column_name, alias)
+ db_columns = []
+ aliases_colunms = []
+ for column in columns:
+ if isinstance(column, tuple):
+ db_columns.append(column[0])
+ aliases_colunms.append(column[1])
+ else:
+ db_columns.append(column)
+ aliases_colunms.append(column.rsplit('.', 1)[-1])
+ cmd = self._build_command(db_columns, tables, conditions, join)
+ logger.debug(cmd)
+ with self.conn.cursor() as cursor:
+ cursor.execute(cmd)
+ results = cursor.fetchall()
+ self.conn.commit()
+ if not as_dict:
+ return results
+ # Format the output dict using column names as keys
+ output = []
+ for result in results:
+ entry = {}
+ for k, v in zip(aliases_colunms, result):
+ entry[k] = v
+ output.append(entry)
+ return output
+ def _get_pod_version(self):
+ columns = ['_pod_version', '_pod_serialization_version']
+ tables = ['{}s'.format(self.kind)]
+ conditions = ['{}s.oid = \'{}\''.format(self.kind, self.oid)]
+ results = self._read_db(columns, tables, conditions)
+ if results:
+ return results[0]
+ else:
+ return None
+ def _populate_classifers(self, pod, kind):
+ for entry in pod:
+ oid = entry.pop('oid')
+ entry['classifiers'] = self._get_classifiers(oid, kind)
+ return pod
+ def _get_classifiers(self, oid, kind):
+ columns = ['classifiers.key', 'classifiers.value']
+ tables = ['classifiers']
+ conditions = ['{}_oid = \'{}\''.format(kind, oid)]
+ results = self._read_db(columns, tables, conditions, as_dict=False)
+ classifiers = {}
+ for (k, v) in results:
+ classifiers[k] = v
+ return classifiers
+ def _get_metrics(self):
+ columns = ['metrics.name', 'metrics.value', 'metrics.units',
+ 'metrics.lower_is_better',
+ 'metrics.oid', 'metrics._pod_version',
+ 'metrics._pod_serialization_version']
+ tables = ['metrics']
+ joins = [('classifiers', 'classifiers.metric_oid = metrics.oid')]
+ conditions = ['metrics.{}_oid = \'{}\''.format(self.kind, self.oid)]
+ pod = self._read_db(columns, tables, conditions, joins)
+ return self._populate_classifers(pod, 'metric')
+ def _get_status(self):
+ columns = ['{}s.status'.format(self.kind)]
+ tables = ['{}s'.format(self.kind)]
+ conditions = ['{}s.oid = \'{}\''.format(self.kind, self.oid)]
+ results = self._read_db(columns, tables, conditions, as_dict=False)
+ if results:
+ return results[0][0]
+ else:
+ return None
+ def _get_artifacts(self):
+ columns = ['artifacts.name', 'artifacts.description', 'artifacts.kind',
+ ('largeobjects.lo_oid', 'path'), 'artifacts.oid',
+ 'artifacts._pod_version', 'artifacts._pod_serialization_version']
+ tables = ['largeobjects', 'artifacts']
+ joins = [('classifiers', 'classifiers.artifact_oid = artifacts.oid')]
+ conditions = ['artifacts.{}_oid = \'{}\''.format(self.kind, self.oid),
+ 'artifacts.large_object_uuid = largeobjects.oid',
+ 'artifacts.job_oid IS NULL']
+ pod = self._read_db(columns, tables, conditions, joins)
+ for artifact in pod:
+ artifact['path'] = str(artifact['path'])
+ return self._populate_classifers(pod, 'metric')
+ def _get_events(self):
+ columns = ['events.message', 'events.timestamp']
+ tables = ['events']
+ conditions = ['events.{}_oid = \'{}\''.format(self.kind, self.oid)]
+ return self._read_db(columns, tables, conditions)
+def kernel_config_from_db(raw):
+ kernel_config = {}
+ for k, v in zip(raw[0], raw[1]):
+ kernel_config[k] = v
+ return kernel_config
+class RunDatabaseOutput(DatabaseOutput, RunOutputCommon):
+ kind = 'run'
+ @property
+ def basepath(self):
+ return 'db:({})-{}@{}:{}'.format(self.dbname, self.user,
+ self.host, self.port)
+ @property
+ def augmentations(self):
+ columns = ['augmentations.name']
+ tables = ['augmentations']
+ conditions = ['augmentations.run_oid = \'{}\''.format(self.oid)]
+ results = self._read_db(columns, tables, conditions, as_dict=False)
+ return [a for augs in results for a in augs]
+ @property
+ def _db_infofile(self):
+ columns = ['start_time', 'project', ('run_uuid', 'uuid'), 'end_time',
+ 'run_name', 'duration', '_pod_version', '_pod_serialization_version']
+ tables = ['runs']
+ conditions = ['runs.run_uuid = \'{}\''.format(self.run_uuid)]
+ pod = self._read_db(columns, tables, conditions)
+ if not pod:
+ return {}
+ return pod[0]
+ @property
+ def _db_targetfile(self):
+ columns = ['os', 'is_rooted', 'target', 'abi', 'cpus', 'os_version',
+ 'hostid', 'hostname', 'kernel_version', 'kernel_release',
+ 'kernel_sha1', 'kernel_config', 'sched_features',
+ '_pod_version', '_pod_serialization_version']
+ tables = ['targets']
+ conditions = ['targets.run_oid = \'{}\''.format(self.oid)]
+ pod = self._read_db(columns, tables, conditions)
+ if not pod:
+ return {}
+ pod = pod[0]
+ try:
+ pod['cpus'] = [json.loads(cpu) for cpu in pod.pop('cpus')]
+ except SerializerSyntaxError:
+ pod['cpus'] = []
+ logger.debug('Failed to deserialize target cpu information')
+ pod['kernel_config'] = kernel_config_from_db(pod['kernel_config'])
+ return pod
+ @property
+ def _db_statefile(self):
+ # Read overall run information
+ columns = ['runs.state']
+ tables = ['runs']
+ conditions = ['runs.run_uuid = \'{}\''.format(self.run_uuid)]
+ pod = self._read_db(columns, tables, conditions)
+ pod = pod[0].get('state')
+ if not pod:
+ return {}
+ # Read job information
+ columns = ['jobs.job_id', 'jobs.oid']
+ tables = ['jobs']
+ conditions = ['jobs.run_oid = \'{}\''.format(self.oid)]
+ job_oids = self._read_db(columns, tables, conditions)
+ # Match job oid with jobs from state file
+ for job in pod.get('jobs', []):
+ for job_oid in job_oids:
+ if job['id'] == job_oid['job_id']:
+ job['oid'] = job_oid['oid']
+ break
+ return pod
+ @property
+ def _db_jobsfile(self):
+ workload_params = self._get_parameters('workload')
+ runtime_params = self._get_parameters('runtime')
+ columns = [('jobs.job_id', 'id'), 'jobs.label', 'jobs.workload_name',
+ 'jobs.oid', 'jobs._pod_version', 'jobs._pod_serialization_version']
+ tables = ['jobs']
+ conditions = ['jobs.run_oid = \'{}\''.format(self.oid)]
+ jobs = self._read_db(columns, tables, conditions)
+ for job in jobs:
+ job['workload_parameters'] = workload_params.pop(job['oid'], {})
+ job['runtime_parameters'] = runtime_params.pop(job['oid'], {})
+ job.pop('oid')
+ return jobs
+ @property
+ def _db_run_config(self):
+ pod = defaultdict(dict)
+ parameter_types = ['augmentation', 'resource_getter']
+ for parameter_type in parameter_types:
+ columns = ['parameters.name', 'parameters.value',
+ 'parameters.value_type',
+ ('{}s.name'.format(parameter_type), '{}'.format(parameter_type))]
+ tables = ['parameters', '{}s'.format(parameter_type)]
+ conditions = ['parameters.run_oid = \'{}\''.format(self.oid),
+ 'parameters.type = \'{}\''.format(parameter_type),
+ 'parameters.{0}_oid = {0}s.oid'.format(parameter_type)]
+ configs = self._read_db(columns, tables, conditions)
+ for config in configs:
+ entry = {config['name']: json.loads(config['value'])}
+ pod['{}s'.format(parameter_type)][config.pop(parameter_type)] = entry
+ # run config
+ columns = ['runs.max_retries', 'runs.allow_phone_home',
+ 'runs.bail_on_init_failure', 'runs.retry_on_status']
+ tables = ['runs']
+ conditions = ['runs.oid = \'{}\''.format(self.oid)]
+ config = self._read_db(columns, tables, conditions)
+ if not config:
+ return {}
+ config = config[0]
+ # Convert back into a string representation of an enum list
+ config['retry_on_status'] = config['retry_on_status'][1:-1].split(',')
+ pod.update(config)
+ return pod
+ def __init__(self,
+ password=None,
+ dbname='wa',
+ host='localhost',
+ port='5432',
+ user='postgres',
+ run_uuid=None,
+ list_runs=False):
+ if psycopg2 is None:
+ msg = 'Please install the psycopg2 in order to connect to postgres databases'
+ raise HostError(msg)
+ self.dbname = dbname
+ self.host = host
+ self.port = port
+ self.user = user
+ self.password = password
+ self.run_uuid = run_uuid
+ self.conn = None
+ self.info = None
+ self.state = None
+ self.result = None
+ self.target_info = None
+ self._combined_config = None
+ self.jobs = []
+ self.job_specs = []
+ self.connect()
+ super(RunDatabaseOutput, self).__init__(conn=self.conn, reload=False)
+ if list_runs:
+ print('Available runs are:')
+ self._list_runs()
+ self.disconnect()
+ return
+ if not self.run_uuid:
+ print('Please specify "Run uuid"')
+ self._list_runs()
+ self.disconnect()
+ return
+ if not self.oid:
+ self.oid = self._get_oid()
+ self.reload()
+ def read_job_specs(self):
+ job_specs = []
+ for job in self._db_jobsfile:
+ job_specs.append(JobSpec.from_pod(job))
+ return job_specs
+ def connect(self):
+ if self.conn and not self.conn.closed:
+ return
+ try:
+ self.conn = psycopg2.connect(dbname=self.dbname,
+ user=self.user,
+ host=self.host,
+ password=self.password,
+ port=self.port)
+ except Psycopg2Error as e:
+ raise HostError('Unable to connect to the Database: "{}'.format(e.args[0]))
+ def disconnect(self):
+ self.conn.commit()
+ self.conn.close()
+ def reload(self):
+ super(RunDatabaseOutput, self).reload()
+ info_pod = self._db_infofile
+ state_pod = self._db_statefile
+ if not info_pod or not state_pod:
+ msg = '"{}" does not appear to be a valid WA Database Output.'
+ raise ValueError(msg.format(self.oid))
+ self.info = RunInfo.from_pod(info_pod)
+ self.state = RunState.from_pod(state_pod)
+ self._combined_config = CombinedConfig.from_pod({'run_config': self._db_run_config})
+ self.target_info = TargetInfo.from_pod(self._db_targetfile)
+ self.job_specs = self.read_job_specs()
+ for job_state in self._db_statefile['jobs']:
+ job = JobDatabaseOutput(self.conn, job_state.get('oid'), job_state['id'],
+ job_state['label'], job_state['iteration'],
+ job_state['retries'])
+ job.status = job_state['status']
+ job.spec = self.get_job_spec(job.id)
+ if job.spec is None:
+ logger.warning('Could not find spec for job {}'.format(job.id))
+ self.jobs.append(job)
+ def _get_oid(self):
+ columns = ['{}s.oid'.format(self.kind)]
+ tables = ['{}s'.format(self.kind)]
+ conditions = ['runs.run_uuid = \'{}\''.format(self.run_uuid)]
+ oid = self._read_db(columns, tables, conditions, as_dict=False)
+ if not oid:
+ raise ConfigError('No matching run entries found for run_uuid {}'.format(self.run_uuid))
+ if len(oid) > 1:
+ raise ConfigError('Multiple entries found for run_uuid: {}'.format(self.run_uuid))
+ return oid[0][0]
+ def _get_parameters(self, param_type):
+ columns = ['parameters.job_oid', 'parameters.name', 'parameters.value']
+ tables = ['parameters']
+ conditions = ['parameters.type = \'{}\''.format(param_type),
+ 'parameters.run_oid = \'{}\''.format(self.oid)]
+ params = self._read_db(columns, tables, conditions, as_dict=False)
+ parm_dict = defaultdict(dict)
+ for (job_oid, k, v) in params:
+ try:
+ parm_dict[job_oid][k] = json.loads(v)
+ except SerializerSyntaxError:
+ logger.debug('Failed to deserialize job_oid:{}-"{}":"{}"'.format(job_oid, k, v))
+ return parm_dict
+ def _list_runs(self):
+ columns = ['runs.run_uuid', 'runs.run_name', 'runs.project',
+ 'runs.project_stage', 'runs.status', 'runs.start_time', 'runs.end_time']
+ tables = ['runs']
+ pod = self._read_db(columns, tables)
+ if pod:
+ headers = ['Run Name', 'Project', 'Project Stage', 'Start Time', 'End Time',
+ 'run_uuid']
+ run_list = []
+ for entry in pod:
+ # Format times to display better
+ start_time = entry['start_time']
+ end_time = entry['end_time']
+ if start_time:
+ start_time = start_time.strftime("%Y-%m-%d %H:%M:%S")
+ if end_time:
+ end_time = end_time.strftime("%Y-%m-%d %H:%M:%S")
+ run_list.append([
+ entry['run_name'],
+ entry['project'],
+ entry['project_stage'],
+ start_time,
+ end_time,
+ entry['run_uuid']])
+ print(format_simple_table(run_list, headers))
+ else:
+ print('No Runs Found')
+class JobDatabaseOutput(DatabaseOutput):
+ kind = 'job'
+ def __init__(self, conn, oid, job_id, label, iteration, retry):
+ super(JobDatabaseOutput, self).__init__(conn, oid=oid)
+ self.id = job_id
+ self.label = label
+ self.iteration = iteration
+ self.retry = retry
+ self.result = None
+ self.spec = None
+ self.reload()
+ def __repr__(self):
+ return '<{} {}-{}-{}>'.format(self.__class__.__name__,
+ self.id, self.label, self.iteration)
+ def __str__(self):
+ return '{}-{}-{}'.format(self.id, self.label, self.iteration)