aboutsummaryrefslogtreecommitdiff
path: root/wa/output_processors
diff options
context:
space:
mode:
authorWaleed El-Geresy <waleed.el-geresy@arm.com>2018-07-20 15:17:03 +0100
committerMarc Bonnici <marc.bonnici@arm.com>2018-09-12 10:13:34 +0100
commit6d654157b29fa0a5f2331483c0ba4ce879e53270 (patch)
treef23a4a0ba49cc07d0957703406460a7346ab4936 /wa/output_processors
parentbb255de9add4c71ca0c3ee753bfb544c99968311 (diff)
Add Postgres Output Processor
The Output processor which is used to upload the results found in the wa_output folder to a Postgres database, whose schema is defined by the WA Create Database command.
Diffstat (limited to 'wa/output_processors')
-rw-r--r--wa/output_processors/postgresql.py481
1 files changed, 481 insertions, 0 deletions
diff --git a/wa/output_processors/postgresql.py b/wa/output_processors/postgresql.py
new file mode 100644
index 00000000..5218e7b7
--- /dev/null
+++ b/wa/output_processors/postgresql.py
@@ -0,0 +1,481 @@
+# Copyright 2018 ARM Limited
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import os
+import uuid
+import collections
+import inspect
+
+try:
+ import psycopg2
+ from psycopg2 import (connect, extras)
+ from psycopg2 import Error as Psycopg2Error
+except ImportError as e:
+ psycopg2 = None
+ import_error_msg = e.args[0] if e.args else str(e)
+from devlib.target import KernelVersion, KernelConfig
+
+import wa
+from wa.utils import postgres_convert
+from wa import OutputProcessor, Parameter, OutputProcessorError
+from wa.utils.types import level
+from wa.framework.target.info import CpuInfo
+
+
+class PostgresqlResultProcessor(OutputProcessor):
+
+ name = 'postgres'
+ description = """
+ Stores results in a Postgresql database.
+
+ The structure of this database can easily be understood by examining
+ the postgres_schema.sql file (the schema used to generate it):
+ {}
+ """.format(os.path.join(
+ os.path.dirname(inspect.getfile(wa)),
+ 'commands',
+ 'postgres_schema.sql'))
+ parameters = [
+ Parameter('username', default='postgres',
+ description="""
+ This is the username that will be used to connect to the
+ Postgresql database. Note that depending on whether the user
+ has privileges to modify the database (normally only possible
+ on localhost), the user may only be able to append entries.
+ """),
+ Parameter('password', default=None,
+ description="""
+ The password to be used to connect to the specified database
+ with the specified username.
+ """),
+ Parameter('dbname', default='wa',
+ description="""
+ Name of the database that will be created or added to. Note,
+ to override this, you can specify a value in your user
+ wa configuration file.
+ """),
+ Parameter('host', kind=str, default='localhost',
+ description="""
+ The host where the Postgresql server is running. The default
+ is localhost (i.e. the machine that wa is running on).
+ This is useful for complex systems where multiple machines
+ may be executing workloads and uploading their results to
+ a remote, centralised database.
+ """),
+ Parameter('port', kind=str, default='5432',
+ description="""
+ The port the Postgresql server is running on, on the host.
+ The default is Postgresql's default, so do not change this
+ unless you have modified the default port for Postgresql.
+ """),
+ ]
+
+ # Commands
+ sql_command = {
+ "create_run": "INSERT INTO Runs (oid, event_summary, basepath, status, timestamp, run_name, project, retry_on_status, max_retries, bail_on_init_failure, allow_phone_home, run_uuid, start_time, metadata) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)",
+ "update_run": "UPDATE Runs SET event_summary=%s, status=%s, timestamp=%s, end_time=%s WHERE oid=%s;",
+ "create_job": "INSERT INTO Jobs (oid, run_oid, status, retries, label, job_id, iterations, workload_name, metadata) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s);",
+ "create_target": "INSERT INTO Targets (oid, run_oid, target, cpus, os, os_version, hostid, hostname, abi, is_rooted, kernel_version, kernel_release, kernel_sha1, kernel_config, sched_features) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)",
+ "create_event": "INSERT INTO Events (oid, run_oid, job_oid, timestamp, message) VALUES (%s, %s, %s, %s, %s)",
+ "create_artifact": "INSERT INTO Artifacts (oid, run_oid, job_oid, name, large_object_uuid, description, kind) VALUES (%s, %s, %s, %s, %s, %s, %s)",
+ "create_metric": "INSERT INTO Metrics (oid, run_oid, job_oid, name, value, units, lower_is_better) VALUES (%s, %s, %s, %s , %s, %s, %s)",
+ "create_augmentation": "INSERT INTO Augmentations (oid, run_oid, name) VALUES (%s, %s, %s)",
+ "create_classifier": "INSERT INTO Classifiers (oid, artifact_oid, metric_oid, key, value) VALUES (%s, %s, %s, %s, %s)",
+ "create_parameter": "INSERT INTO Parameters (oid, run_oid, job_oid, augmentation_oid, resource_getter_oid, name, value, value_type, type) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)",
+ "create_resource_getter": "INSERT INTO ResourceGetters (oid, run_oid, name) VALUES (%s, %s, %s)",
+ "create_job_aug": "INSERT INTO Jobs_Augs (oid, job_oid, augmentation_oid) VALUES (%s, %s, %s)",
+ "create_large_object": "INSERT INTO LargeObjects (oid, lo_oid) VALUES (%s, %s)"}
+
+ # Lists to track which run-related items have already been added
+ metrics_already_added = []
+ artifacts_already_added = []
+ # Dict needed so that jobs can look up the augmentation_uuid
+ augmentations_already_added = {}
+
+ # Status bits (flags)
+ first_job_run = True
+
+ def __init__(self, *args, **kwargs):
+ super(PostgresqlResultProcessor, self).__init__(*args, **kwargs)
+ self.conn = None
+ self.cursor = None
+ self.current_lobj = None
+ self.current_loid = None
+ self.run_uuid = None
+ self.current_job_uuid = None
+ self.target_uuid = None
+ self.current_artifact_uuid = None
+ self.current_large_object_uuid = None
+ self.current_classifier_uuid = None
+ self.current_metric_uuid = None
+ self.current_augmentation_uuid = None
+ self.current_resource_getter_uuid = None
+ self.current_event_uuid = None
+ self.current_job_aug_uuid = None
+ self.current_parameter_uuid = None
+
+ def initialize(self, context):
+
+ if not psycopg2:
+ raise ImportError(
+ 'The psycopg2 module is required for the ' +
+ 'Postgresql Output Processor: {}'.format(import_error_msg))
+ # N.B. Typecasters are for postgres->python and adapters the opposite
+ self.connect_to_database()
+ self.cursor = self.conn.cursor()
+ # Register the adapters and typecasters for enum types
+ self.cursor.execute("SELECT NULL::status_enum")
+ status_oid = self.cursor.description[0][1]
+ self.cursor.execute("SELECT NULL::param_enum")
+ param_oid = self.cursor.description[0][1]
+ LEVEL = psycopg2.extensions.new_type(
+ (status_oid,), "LEVEL", postgres_convert.cast_level)
+ psycopg2.extensions.register_type(LEVEL)
+ PARAM = psycopg2.extensions.new_type(
+ (param_oid,), "PARAM", postgres_convert.cast_vanilla)
+ psycopg2.extensions.register_type(PARAM)
+ psycopg2.extensions.register_adapter(level, postgres_convert.return_as_is(postgres_convert.adapt_level))
+ psycopg2.extensions.register_adapter(
+ postgres_convert.ListOfLevel, postgres_convert.adapt_ListOfX(postgres_convert.adapt_level))
+ psycopg2.extensions.register_adapter(KernelVersion, postgres_convert.adapt_vanilla)
+ psycopg2.extensions.register_adapter(
+ CpuInfo, postgres_convert.adapt_vanilla)
+ psycopg2.extensions.register_adapter(
+ collections.OrderedDict, extras.Json)
+ psycopg2.extensions.register_adapter(dict, extras.Json)
+ psycopg2.extensions.register_adapter(
+ KernelConfig, postgres_convert.create_iterable_adapter(2, explicit_iterate=True))
+ # Register ready-made UUID type adapter
+ extras.register_uuid()
+ # Insert a run_uuid which will be globally accessible during the run
+ self.run_uuid = uuid.UUID(str(uuid.uuid4()))
+ run_output = context.run_output
+ retry_on_status = postgres_convert.ListOfLevel(run_output.run_config.retry_on_status)
+ self.cursor.execute(
+ self.sql_command['create_run'],
+ (
+ self.run_uuid,
+ run_output.event_summary,
+ run_output.basepath,
+ run_output.status,
+ run_output.state.timestamp,
+ run_output.info.run_name,
+ run_output.info.project,
+ retry_on_status,
+ run_output.run_config.max_retries,
+ run_output.run_config.bail_on_init_failure,
+ run_output.run_config.allow_phone_home,
+ run_output.info.uuid,
+ run_output.info.start_time,
+ run_output.metadata))
+ self.target_uuid = uuid.uuid4()
+ target_info = context.target_info
+ self.cursor.execute(
+ self.sql_command['create_target'],
+ (
+ self.target_uuid,
+ self.run_uuid,
+ target_info.target,
+ target_info.cpus,
+ target_info.os,
+ target_info.os_version,
+ target_info.hostid,
+ target_info.hostname,
+ target_info.abi,
+ target_info.is_rooted,
+ # Important caveat: kernel_version is the name of the column in the Targets table
+ # However, this refers to kernel_version.version, not to kernel_version as a whole
+ target_info.kernel_version.version,
+ target_info.kernel_version.release,
+ target_info.kernel_version.sha1,
+ target_info.kernel_config,
+ target_info.sched_features))
+ # Commit cursor commands
+ self.conn.commit()
+
+ def export_job_output(self, job_output, target_info, run_output): # pylint: disable=too-many-branches, too-many-statements, too-many-locals, unused-argument
+ ''' Run once for each job to upload information that is
+ updated on a job by job basis.
+ '''
+ self.current_job_uuid = uuid.uuid4()
+ # Create a new job
+ self.cursor.execute(
+ self.sql_command['create_job'],
+ (
+ self.current_job_uuid,
+ self.run_uuid,
+ job_output.status,
+ job_output.retry,
+ job_output.label,
+ job_output.id,
+ job_output.iteration,
+ job_output.spec.workload_name,
+ job_output.metadata))
+ # Update the run table and run-level parameters
+ self.cursor.execute(
+ self.sql_command['update_run'],
+ (
+ run_output.event_summary,
+ run_output.status,
+ run_output.state.timestamp,
+ run_output.info.end_time,
+ self.run_uuid))
+ self.sql_upload_artifacts(run_output, record_in_added=True)
+ self.sql_upload_metrics(run_output, record_in_added=True)
+ self.sql_upload_augmentations(run_output)
+ self.sql_upload_resource_getters(run_output)
+ self.sql_upload_events(job_output)
+ self.sql_upload_artifacts(job_output)
+ self.sql_upload_metrics(job_output)
+ self.sql_upload_job_augmentations(job_output)
+ self.sql_upload_parameters(
+ "workload",
+ job_output.spec.workload_parameters,
+ job_specific=True)
+ self.sql_upload_parameters(
+ "runtime",
+ job_output.spec.runtime_parameters,
+ job_specific=True)
+ self.conn.commit()
+
+ def export_run_output(self, run_output, target_info): # pylint: disable=unused-argument, too-many-locals
+ ''' A final export of the RunOutput that updates existing parameters
+ and uploads ones which are only generated after jobs have run.
+ '''
+ self.current_job_uuid = None
+ # Update the job statuses following completion of the run
+ for job in run_output.jobs:
+ job_id = job.id
+ job_status = job.status
+ self.cursor.execute(
+ "UPDATE Jobs SET status=%s WHERE job_id=%s and run_oid=%s",
+ (
+ job_status,
+ job_id,
+ self.run_uuid))
+
+ run_uuid = self.run_uuid
+ # Update the run entry after jobs have completed
+ sql_command_update_run = self.sql_command['update_run']
+ self.cursor.execute(
+ sql_command_update_run,
+ (
+ run_output.event_summary,
+ run_output.status,
+ run_output.state.timestamp,
+ run_output.info.end_time,
+ run_uuid))
+ self.sql_upload_events(run_output)
+ self.sql_upload_artifacts(run_output, check_uniqueness=True)
+ self.sql_upload_metrics(run_output, check_uniqueness=True)
+ self.sql_upload_augmentations(run_output)
+ self.conn.commit()
+
+ # Upload functions for use with both jobs and runs
+
+ def sql_upload_resource_getters(self, output_object):
+ for resource_getter in output_object.run_config.resource_getters:
+ self.current_resource_getter_uuid = uuid.uuid4()
+ self.cursor.execute(
+ self.sql_command['create_resource_getter'],
+ (
+ self.current_resource_getter_uuid,
+ self.run_uuid,
+ resource_getter))
+ self.sql_upload_parameters(
+ 'resource_getter',
+ output_object.run_config.resource_getters[resource_getter],
+ owner_id=self.current_resource_getter_uuid,
+ job_specific=False)
+
+ def sql_upload_events(self, output_object):
+ for event in output_object.events:
+ self.current_event_uuid = uuid.uuid4()
+ self.cursor.execute(
+ self.sql_command['create_event'],
+ (
+ self.current_event_uuid,
+ self.run_uuid,
+ self.current_job_uuid,
+ event.timestamp,
+ event.message))
+
+ def sql_upload_job_augmentations(self, output_object):
+ ''' This is a table which links the uuids of augmentations to jobs.
+ Note that the augmentations table is prepopulated, leading to the necessity
+ of an augmentaitions_already_added dictionary, which gives us the corresponding
+ uuids.
+ Augmentations which are prefixed by ~ are toggled off and not part of the job,
+ therefore not added.
+ '''
+ for augmentation in output_object.spec.augmentations:
+ if augmentation.startswith('~'):
+ continue
+ self.current_augmentation_uuid = self.augmentations_already_added[augmentation]
+ self.current_job_aug_uuid = uuid.uuid4()
+ self.cursor.execute(
+ self.sql_command['create_job_aug'],
+ (
+ self.current_job_aug_uuid,
+ self.current_job_uuid,
+ self.current_augmentation_uuid))
+
+ def sql_upload_augmentations(self, output_object):
+ for augmentation in output_object.augmentations:
+ if augmentation.startswith('~') or augmentation in self.augmentations_already_added:
+ continue
+ self.current_augmentation_uuid = uuid.uuid4()
+ self.cursor.execute(
+ self.sql_command['create_augmentation'],
+ (
+ self.current_augmentation_uuid,
+ self.run_uuid,
+ augmentation))
+ self.sql_upload_parameters(
+ 'augmentation',
+ output_object.run_config.augmentations[augmentation],
+ owner_id=self.current_augmentation_uuid,
+ job_specific=False)
+ self.augmentations_already_added[augmentation] = self.current_augmentation_uuid
+
+ def sql_upload_metrics(self, output_object, record_in_added=False, check_uniqueness=False):
+ for metric in output_object.metrics:
+ if metric in self.metrics_already_added and check_uniqueness:
+ continue
+ self.current_metric_uuid = uuid.uuid4()
+ self.cursor.execute(
+ self.sql_command['create_metric'],
+ (
+ self.current_metric_uuid,
+ self.run_uuid,
+ self.current_job_uuid,
+ metric.name,
+ metric.value,
+ metric.units,
+ metric.lower_is_better))
+ for classifier in metric.classifiers:
+ self.current_classifier_uuid = uuid.uuid4()
+ self.cursor.execute(
+ self.sql_command['create_classifier'],
+ (
+ self.current_classifier_uuid,
+ None,
+ self.current_metric_uuid,
+ classifier,
+ metric.classifiers[classifier]))
+ if record_in_added:
+ self.metrics_already_added.append(metric)
+
+ def sql_upload_artifacts(self, output_object, record_in_added=False, check_uniqueness=False):
+ ''' Uploads artifacts to the database.
+ record_in_added will record the artifacts added in artifacts_aleady_added
+ check_uniqueness will ensure artifacts in artifacts_already_added do not get added again
+ '''
+ for artifact in output_object.artifacts:
+ if artifact in self.artifacts_already_added and check_uniqueness:
+ continue
+ self.current_artifact_uuid = uuid.uuid4()
+ self.current_lobj = self.conn.lobject()
+ self.current_loid = self.current_lobj.oid
+ self.current_large_object_uuid = uuid.uuid4()
+ with open(os.path.join(output_object.basepath, artifact.path)) as lobj_file:
+ lobj_data = lobj_file.read()
+ lo_len = self.current_lobj.write(lobj_data)
+ if lo_len > 50000000: # Notify if LO inserts larger than 50MB
+ self.logger.debug(
+ "Inserting large object of size {}".format(lo_len))
+ self.cursor.execute(
+ self.sql_command['create_large_object'],
+ (
+ self.current_large_object_uuid,
+ self.current_loid))
+ self.cursor.execute(
+ self.sql_command['create_artifact'],
+ (
+ self.current_artifact_uuid,
+ self.run_uuid,
+ self.current_job_uuid,
+ artifact.name,
+ self.current_large_object_uuid,
+ artifact.description,
+ artifact.kind))
+ for classifier in artifact.classifiers:
+ self.current_classifier_uuid = uuid.uuid4()
+ self.cursor.execute(
+ self.sql_command['create_classifier'],
+ (
+ self.current_classifier_uuid,
+ self.current_artifact_uuid,
+ None,
+ classifier,
+ artifact.classifiers[classifier]))
+ if record_in_added:
+ self.artifacts_already_added.append(artifact)
+
+ def sql_upload_parameters(self, parameter_type, parameter_dict, owner_id=None, job_specific=False):
+ # Note, currently no augmentation parameters are workload specific, but in the future
+ # this may change
+ run_uuid = self.run_uuid
+ job_uuid = None # Default initial value
+ augmentation_id = None
+ resource_getter_id = None
+ if parameter_type == "workload":
+ job_uuid = self.current_job_uuid
+ elif parameter_type == "resource_getter":
+ job_uuid = None
+ resource_getter_id = owner_id
+ elif parameter_type == "augmentation":
+ if job_specific:
+ job_uuid = self.current_job_uuid
+ augmentation_id = owner_id
+ elif parameter_type == "runtime":
+ job_uuid = self.current_job_uuid
+ else:
+ # boot parameters are not yet implemented
+ # device parameters are redundant due to the targets table
+ raise NotImplementedError("{} is not a valid parameter type.".format(parameter_type))
+ for parameter in parameter_dict:
+ self.current_parameter_uuid = uuid.uuid4()
+ self.cursor.execute(
+ self.sql_command['create_parameter'],
+ (
+ self.current_parameter_uuid,
+ run_uuid,
+ job_uuid,
+ augmentation_id,
+ resource_getter_id,
+ parameter,
+ str(parameter_dict[parameter]),
+ str(type(parameter_dict[parameter])),
+ parameter_type))
+
+ def connect_to_database(self):
+ dsn = "dbname={} user={} password={} host={} port={}".format(
+ self.dbname, self.username, self.password, self.host, self.port)
+ try:
+ self.conn = connect(dsn=dsn)
+ except Psycopg2Error as e:
+ raise OutputProcessorError(
+ "Database error, if the database doesn't exist, " +
+ "please use 'wa create database' to create the database: {}".format(e))
+
+ def execute_sql_line_by_line(self, sql):
+ cursor = self.conn.cursor()
+ for line in sql.replace('\n', "").replace(";", ";\n").split("\n"):
+ if line and not line.startswith('--'):
+ cursor.execute(line)
+ cursor.close()
+ self.conn.commit()
+ self.conn.reset()