From 250bf61c4b5ae27c6375587168eb95661574016b Mon Sep 17 00:00:00 2001 From: Marc Bonnici Date: Tue, 30 Oct 2018 17:08:14 +0000 Subject: postgres: Update schema to v1.2 Update the postgres database schema: - Rename "resourcegetters" schema to "resource_getters" for consistency - Rename "retreies" colum to "retry" to better relflect it purpose - Store additional information including: - POD serialization data - Missing target information - JSON formatted runstate --- wa/commands/postgres_schema.sql | 170 ----------------- wa/commands/postgres_schemas/postgres_schema.sql | 192 +++++++++++++++++++ .../postgres_schemas/postgres_schema_v1.2.sql | 30 +++ wa/output_processors/postgresql.py | 205 ++++++++++++++++----- 4 files changed, 381 insertions(+), 216 deletions(-) delete mode 100644 wa/commands/postgres_schema.sql create mode 100644 wa/commands/postgres_schemas/postgres_schema.sql create mode 100644 wa/commands/postgres_schemas/postgres_schema_v1.2.sql diff --git a/wa/commands/postgres_schema.sql b/wa/commands/postgres_schema.sql deleted file mode 100644 index 26bca17a..00000000 --- a/wa/commands/postgres_schema.sql +++ /dev/null @@ -1,170 +0,0 @@ ---!VERSION!1.1!ENDVERSION! -CREATE EXTENSION IF NOT EXISTS "uuid-ossp"; -CREATE EXTENSION IF NOT EXISTS "lo"; - --- In future, it may be useful to implement rules on which Parameter oid fields can be none depeendent on the value in the type column; - -DROP TABLE IF EXISTS DatabaseMeta; -DROP TABLE IF EXISTS Parameters; -DROP TABLE IF EXISTS Classifiers; -DROP TABLE IF EXISTS LargeObjects; -DROP TABLE IF EXISTS Artifacts; -DROP TABLE IF EXISTS Metrics; -DROP TABLE IF EXISTS Augmentations; -DROP TABLE IF EXISTS Jobs_Augs; -DROP TABLE IF EXISTS ResourceGetters; -DROP TABLE IF EXISTS Events; -DROP TABLE IF EXISTS Targets; -DROP TABLE IF EXISTS Jobs; -DROP TABLE IF EXISTS Runs; - -DROP TYPE IF EXISTS status_enum; -DROP TYPE IF EXISTS param_enum; - -CREATE TYPE status_enum AS ENUM ('UNKNOWN(0)','NEW(1)','PENDING(2)','STARTED(3)','CONNECTED(4)', 'INITIALIZED(5)', 'RUNNING(6)', 'OK(7)', 'PARTIAL(8)', 'FAILED(9)', 'ABORTED(10)', 'SKIPPED(11)'); - -CREATE TYPE param_enum AS ENUM ('workload', 'resource_getter', 'augmentation', 'device', 'runtime', 'boot'); - --- In future, it might be useful to create an ENUM type for the artifact kind, or simply a generic enum type; - -CREATE TABLE DatabaseMeta ( - oid uuid NOT NULL, - schema_major int, - schema_minor int, - PRIMARY KEY (oid) -); - -CREATE TABLE Runs ( - oid uuid NOT NULL, - event_summary text, - basepath text, - status status_enum, - timestamp timestamp, - run_name text, - project text, - retry_on_status status_enum[], - max_retries int, - bail_on_init_failure boolean, - allow_phone_home boolean, - run_uuid uuid, - start_time timestamp, - end_time timestamp, - metadata jsonb, - PRIMARY KEY (oid) -); - -CREATE TABLE Jobs ( - oid uuid NOT NULL, - run_oid uuid NOT NULL references Runs(oid), - status status_enum, - retries int, - label text, - job_id text, - iterations int, - workload_name text, - metadata jsonb, - PRIMARY KEY (oid) -); - -CREATE TABLE Targets ( - oid uuid NOT NULL, - run_oid uuid NOT NULL references Runs(oid), - target text, - cpus text[], - os text, - os_version jsonb, - hostid int, - hostname text, - abi text, - is_rooted boolean, - kernel_version text, - kernel_release text, - kernel_sha1 text, - kernel_config text[], - sched_features text[], - PRIMARY KEY (oid) -); - -CREATE TABLE Events ( - oid uuid NOT NULL, - run_oid uuid NOT NULL references Runs(oid), - job_oid uuid references Jobs(oid), - timestamp timestamp, - message text, - PRIMARY KEY (oid) -); - -CREATE TABLE ResourceGetters ( - oid uuid NOT NULL, - run_oid uuid NOT NULL references Runs(oid), - name text, - PRIMARY KEY (oid) -); - -CREATE TABLE Augmentations ( - oid uuid NOT NULL, - run_oid uuid NOT NULL references Runs(oid), - name text, - PRIMARY KEY (oid) -); - -CREATE TABLE Jobs_Augs ( - oid uuid NOT NULL, - job_oid uuid NOT NULL references Jobs(oid), - augmentation_oid uuid NOT NULL references Augmentations(oid), - PRIMARY KEY (oid) -); - -CREATE TABLE Metrics ( - oid uuid NOT NULL, - run_oid uuid NOT NULL references Runs(oid), - job_oid uuid references Jobs(oid), - name text, - value double precision, - units text, - lower_is_better boolean, - PRIMARY KEY (oid) -); - -CREATE TABLE LargeObjects ( - oid uuid NOT NULL, - lo_oid lo NOT NULL, - PRIMARY KEY (oid) -); - --- Trigger that allows you to manage large objects from the LO table directly; -CREATE TRIGGER t_raster BEFORE UPDATE OR DELETE ON LargeObjects - FOR EACH ROW EXECUTE PROCEDURE lo_manage(lo_oid); - -CREATE TABLE Artifacts ( - oid uuid NOT NULL, - run_oid uuid NOT NULL references Runs(oid), - job_oid uuid references Jobs(oid), - name text, - large_object_uuid uuid NOT NULL references LargeObjects(oid), - description text, - kind text, - PRIMARY KEY (oid) -); - -CREATE TABLE Classifiers ( - oid uuid NOT NULL, - artifact_oid uuid references Artifacts(oid), - metric_oid uuid references Metrics(oid), - key text, - value text, - PRIMARY KEY (oid) -); - -CREATE TABLE Parameters ( - oid uuid NOT NULL, - run_oid uuid NOT NULL references Runs(oid), - job_oid uuid references Jobs(oid), - augmentation_oid uuid references Augmentations(oid), - resource_getter_oid uuid references ResourceGetters(oid), - name text, - value text, - value_type text, - type param_enum, - PRIMARY KEY (oid) -); diff --git a/wa/commands/postgres_schemas/postgres_schema.sql b/wa/commands/postgres_schemas/postgres_schema.sql new file mode 100644 index 00000000..7510c778 --- /dev/null +++ b/wa/commands/postgres_schemas/postgres_schema.sql @@ -0,0 +1,192 @@ +--!VERSION!1.2!ENDVERSION! +CREATE EXTENSION IF NOT EXISTS "uuid-ossp"; +CREATE EXTENSION IF NOT EXISTS "lo"; + +-- In future, it may be useful to implement rules on which Parameter oid fields can be none depeendent on the value in the type column; + +DROP TABLE IF EXISTS DatabaseMeta; +DROP TABLE IF EXISTS Parameters; +DROP TABLE IF EXISTS Classifiers; +DROP TABLE IF EXISTS LargeObjects; +DROP TABLE IF EXISTS Artifacts; +DROP TABLE IF EXISTS Metrics; +DROP TABLE IF EXISTS Augmentations; +DROP TABLE IF EXISTS Jobs_Augs; +DROP TABLE IF EXISTS ResourceGetters; +DROP TABLE IF EXISTS Resource_Getters; +DROP TABLE IF EXISTS Events; +DROP TABLE IF EXISTS Targets; +DROP TABLE IF EXISTS Jobs; +DROP TABLE IF EXISTS Runs; + +DROP TYPE IF EXISTS status_enum; +DROP TYPE IF EXISTS param_enum; + +CREATE TYPE status_enum AS ENUM ('UNKNOWN(0)','NEW(1)','PENDING(2)','STARTED(3)','CONNECTED(4)', 'INITIALIZED(5)', 'RUNNING(6)', 'OK(7)', 'PARTIAL(8)', 'FAILED(9)', 'ABORTED(10)', 'SKIPPED(11)'); + +CREATE TYPE param_enum AS ENUM ('workload', 'resource_getter', 'augmentation', 'device', 'runtime', 'boot'); + +-- In future, it might be useful to create an ENUM type for the artifact kind, or simply a generic enum type; + +CREATE TABLE DatabaseMeta ( + oid uuid NOT NULL, + schema_major int, + schema_minor int, + PRIMARY KEY (oid) +); + +CREATE TABLE Runs ( + oid uuid NOT NULL, + event_summary text, + basepath text, + status status_enum, + timestamp timestamp, + run_name text, + project text, + project_stage text, + retry_on_status status_enum[], + max_retries int, + bail_on_init_failure boolean, + allow_phone_home boolean, + run_uuid uuid, + start_time timestamp, + end_time timestamp, + duration float, + metadata jsonb, + _pod_version int, + _pod_serialization_version int, + state jsonb, + PRIMARY KEY (oid) +); + +CREATE TABLE Jobs ( + oid uuid NOT NULL, + run_oid uuid NOT NULL references Runs(oid), + status status_enum, + retry int, + label text, + job_id text, + iterations int, + workload_name text, + metadata jsonb, + _pod_version int, + _pod_serialization_version int, + PRIMARY KEY (oid) +); + +CREATE TABLE Targets ( + oid uuid NOT NULL, + run_oid uuid NOT NULL references Runs(oid), + target text, + cpus text[], + os text, + os_version jsonb, + hostid int, + hostname text, + abi text, + is_rooted boolean, + kernel_version text, + kernel_release text, + kernel_sha1 text, + kernel_config text[], + sched_features text[], + page_size_kb int, + screen_resolution int[], + prop json, + android_id text, + _pod_version int, + _pod_serialization_version int, + PRIMARY KEY (oid) +); + +CREATE TABLE Events ( + oid uuid NOT NULL, + run_oid uuid NOT NULL references Runs(oid), + job_oid uuid references Jobs(oid), + timestamp timestamp, + message text, + _pod_version int, + _pod_serialization_version int, + PRIMARY KEY (oid) +); + +CREATE TABLE Resource_Getters ( + oid uuid NOT NULL, + run_oid uuid NOT NULL references Runs(oid), + name text, + PRIMARY KEY (oid) +); + +CREATE TABLE Augmentations ( + oid uuid NOT NULL, + run_oid uuid NOT NULL references Runs(oid), + name text, + PRIMARY KEY (oid) +); + +CREATE TABLE Jobs_Augs ( + oid uuid NOT NULL, + job_oid uuid NOT NULL references Jobs(oid), + augmentation_oid uuid NOT NULL references Augmentations(oid), + PRIMARY KEY (oid) +); + +CREATE TABLE Metrics ( + oid uuid NOT NULL, + run_oid uuid NOT NULL references Runs(oid), + job_oid uuid references Jobs(oid), + name text, + value double precision, + units text, + lower_is_better boolean, + _pod_version int, + _pod_serialization_version int, + PRIMARY KEY (oid) +); + +CREATE TABLE LargeObjects ( + oid uuid NOT NULL, + lo_oid lo NOT NULL, + PRIMARY KEY (oid) +); + +-- Trigger that allows you to manage large objects from the LO table directly; +CREATE TRIGGER t_raster BEFORE UPDATE OR DELETE ON LargeObjects + FOR EACH ROW EXECUTE PROCEDURE lo_manage(lo_oid); + +CREATE TABLE Artifacts ( + oid uuid NOT NULL, + run_oid uuid NOT NULL references Runs(oid), + job_oid uuid references Jobs(oid), + name text, + large_object_uuid uuid NOT NULL references LargeObjects(oid), + description text, + kind text, + _pod_version int, + _pod_serialization_version int, + PRIMARY KEY (oid) +); + +CREATE TABLE Classifiers ( + oid uuid NOT NULL, + artifact_oid uuid references Artifacts(oid), + metric_oid uuid references Metrics(oid), + job_oid uuid references Jobs(oid), + run_oid uuid references Runs(oid), + key text, + value text, + PRIMARY KEY (oid) +); + +CREATE TABLE Parameters ( + oid uuid NOT NULL, + run_oid uuid NOT NULL references Runs(oid), + job_oid uuid references Jobs(oid), + augmentation_oid uuid references Augmentations(oid), + resource_getter_oid uuid references Resource_Getters(oid), + name text, + value text, + value_type text, + type param_enum, + PRIMARY KEY (oid) +); diff --git a/wa/commands/postgres_schemas/postgres_schema_v1.2.sql b/wa/commands/postgres_schemas/postgres_schema_v1.2.sql new file mode 100644 index 00000000..1c982226 --- /dev/null +++ b/wa/commands/postgres_schemas/postgres_schema_v1.2.sql @@ -0,0 +1,30 @@ +ALTER TABLE resourcegetters RENAME TO resource_getters; + +ALTER TABLE classifiers ADD COLUMN job_oid uuid references Jobs(oid); +ALTER TABLE classifiers ADD COLUMN run_oid uuid references Runs(oid); + +ALTER TABLE targets ADD COLUMN page_size_kb int; +ALTER TABLE targets ADD COLUMN screen_resolution int[]; +ALTER TABLE targets ADD COLUMN prop text; +ALTER TABLE targets ADD COLUMN android_id text; +ALTER TABLE targets ADD COLUMN _pod_version int; +ALTER TABLE targets ADD COLUMN _pod_serialization_version int; + +ALTER TABLE jobs RENAME COLUMN retries TO retry; +ALTER TABLE jobs ADD COLUMN _pod_version int; +ALTER TABLE jobs ADD COLUMN _pod_serialization_version int; + +ALTER TABLE runs ADD COLUMN project_stage text; +ALTER TABLE runs ADD COLUMN state jsonb; +ALTER TABLE runs ADD COLUMN duration float; +ALTER TABLE runs ADD COLUMN _pod_version int; +ALTER TABLE runs ADD COLUMN _pod_serialization_version int; + +ALTER TABLE artifacts ADD COLUMN _pod_version int; +ALTER TABLE artifacts ADD COLUMN _pod_serialization_version int; + +ALTER TABLE events ADD COLUMN _pod_version int; +ALTER TABLE events ADD COLUMN _pod_serialization_version int; + +ALTER TABLE metrics ADD COLUMN _pod_version int; +ALTER TABLE metrics ADD COLUMN _pod_serialization_version int; diff --git a/wa/output_processors/postgresql.py b/wa/output_processors/postgresql.py index cdf360b4..5c7059de 100644 --- a/wa/output_processors/postgresql.py +++ b/wa/output_processors/postgresql.py @@ -16,7 +16,6 @@ import os import uuid import collections -import inspect try: import psycopg2 @@ -27,10 +26,12 @@ except ImportError as e: import_error_msg = e.args[0] if e.args else str(e) from devlib.target import KernelVersion, KernelConfig -import wa from wa import OutputProcessor, Parameter, OutputProcessorError from wa.framework.target.info import CpuInfo -from wa.utils import postgres +from wa.utils.postgres import (POSTGRES_SCHEMA_DIR, cast_level, cast_vanilla, + adapt_vanilla, return_as_is, adapt_level, + ListOfLevel, adapt_ListOfX, create_iterable_adapter, + get_schema, get_database_schema_version) from wa.utils.serializer import json from wa.utils.types import level @@ -44,10 +45,8 @@ class PostgresqlResultProcessor(OutputProcessor): The structure of this database can easily be understood by examining the postgres_schema.sql file (the schema used to generate it): {} - """.format(os.path.join( - os.path.dirname(inspect.getfile(wa)), - 'commands', - 'postgres_schema.sql')) + """.format(os.path.join(POSTGRES_SCHEMA_DIR, 'postgres_schema.sql')) + parameters = [ Parameter('username', default='postgres', description=""" @@ -85,19 +84,23 @@ class PostgresqlResultProcessor(OutputProcessor): # Commands sql_command = { - "create_run": "INSERT INTO Runs (oid, event_summary, basepath, status, timestamp, run_name, project, retry_on_status, max_retries, bail_on_init_failure, allow_phone_home, run_uuid, start_time, metadata) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)", - "update_run": "UPDATE Runs SET event_summary=%s, status=%s, timestamp=%s, end_time=%s WHERE oid=%s;", - "create_job": "INSERT INTO Jobs (oid, run_oid, status, retries, label, job_id, iterations, workload_name, metadata) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s);", - "create_target": "INSERT INTO Targets (oid, run_oid, target, cpus, os, os_version, hostid, hostname, abi, is_rooted, kernel_version, kernel_release, kernel_sha1, kernel_config, sched_features) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)", - "create_event": "INSERT INTO Events (oid, run_oid, job_oid, timestamp, message) VALUES (%s, %s, %s, %s, %s)", - "create_artifact": "INSERT INTO Artifacts (oid, run_oid, job_oid, name, large_object_uuid, description, kind) VALUES (%s, %s, %s, %s, %s, %s, %s)", - "create_metric": "INSERT INTO Metrics (oid, run_oid, job_oid, name, value, units, lower_is_better) VALUES (%s, %s, %s, %s , %s, %s, %s)", + "create_run": "INSERT INTO Runs (oid, event_summary, basepath, status, timestamp, run_name, project, project_stage, retry_on_status, max_retries, bail_on_init_failure, allow_phone_home, run_uuid, start_time, metadata, state, _pod_version, _pod_serialization_version) " + "VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)", + "update_run": "UPDATE Runs SET event_summary=%s, status=%s, timestamp=%s, end_time=%s, duration=%s, state=%s WHERE oid=%s;", + "create_job": "INSERT INTO Jobs (oid, run_oid, status, retry, label, job_id, iterations, workload_name, metadata, _pod_version, _pod_serialization_version) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);", + "create_target": "INSERT INTO Targets (oid, run_oid, target, cpus, os, os_version, hostid, hostname, abi, is_rooted, kernel_version, kernel_release, kernel_sha1, kernel_config, sched_features, page_size_kb, screen_resolution, prop, android_id, _pod_version, _pod_serialization_version) " + "VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)", + "create_event": "INSERT INTO Events (oid, run_oid, job_oid, timestamp, message, _pod_version, _pod_serialization_version) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s", + "create_artifact": "INSERT INTO Artifacts (oid, run_oid, job_oid, name, large_object_uuid, description, kind, _pod_version, _pod_serialization_version) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)", + "create_metric": "INSERT INTO Metrics (oid, run_oid, job_oid, name, value, units, lower_is_better, _pod_version, _pod_serialization_version) VALUES (%s, %s, %s, %s, %s, %s , %s, %s, %s)", "create_augmentation": "INSERT INTO Augmentations (oid, run_oid, name) VALUES (%s, %s, %s)", - "create_classifier": "INSERT INTO Classifiers (oid, artifact_oid, metric_oid, key, value) VALUES (%s, %s, %s, %s, %s)", - "create_parameter": "INSERT INTO Parameters (oid, run_oid, job_oid, augmentation_oid, resource_getter_oid, name, value, value_type, type) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)", - "create_resource_getter": "INSERT INTO ResourceGetters (oid, run_oid, name) VALUES (%s, %s, %s)", + "create_classifier": "INSERT INTO Classifiers (oid, artifact_oid, metric_oid, job_oid, run_oid, key, value) VALUES (%s, %s, %s, %s, %s, %s, %s)", + "create_parameter": "INSERT INTO Parameters (oid, run_oid, job_oid, augmentation_oid, resource_getter_oid, name, value, value_type, type) " + "VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)", + "create_resource_getter": "INSERT INTO Resource_Getters (oid, run_oid, name) VALUES (%s, %s, %s)", "create_job_aug": "INSERT INTO Jobs_Augs (oid, job_oid, augmentation_oid) VALUES (%s, %s, %s)", - "create_large_object": "INSERT INTO LargeObjects (oid, lo_oid) VALUES (%s, %s)"} + "create_large_object": "INSERT INTO LargeObjects (oid, lo_oid) VALUES (%s, %s)" + } # Lists to track which run-related items have already been added metrics_already_added = [] @@ -124,34 +127,37 @@ class PostgresqlResultProcessor(OutputProcessor): # N.B. Typecasters are for postgres->python and adapters the opposite self.connect_to_database() self.cursor = self.conn.cursor() + self.check_schema_versions() + # Register the adapters and typecasters for enum types self.cursor.execute("SELECT NULL::status_enum") status_oid = self.cursor.description[0][1] self.cursor.execute("SELECT NULL::param_enum") param_oid = self.cursor.description[0][1] LEVEL = psycopg2.extensions.new_type( - (status_oid,), "LEVEL", postgres.cast_level) + (status_oid,), "LEVEL", cast_level) psycopg2.extensions.register_type(LEVEL) PARAM = psycopg2.extensions.new_type( - (param_oid,), "PARAM", postgres.cast_vanilla) + (param_oid,), "PARAM", cast_vanilla) psycopg2.extensions.register_type(PARAM) - psycopg2.extensions.register_adapter(level, postgres.return_as_is(postgres.adapt_level)) + psycopg2.extensions.register_adapter(level, return_as_is(adapt_level)) psycopg2.extensions.register_adapter( - postgres.ListOfLevel, postgres.adapt_ListOfX(postgres.adapt_level)) - psycopg2.extensions.register_adapter(KernelVersion, postgres.adapt_vanilla) + ListOfLevel, adapt_ListOfX(adapt_level)) + psycopg2.extensions.register_adapter(KernelVersion, adapt_vanilla) psycopg2.extensions.register_adapter( - CpuInfo, postgres.adapt_vanilla) + CpuInfo, adapt_vanilla) psycopg2.extensions.register_adapter( collections.OrderedDict, extras.Json) psycopg2.extensions.register_adapter(dict, extras.Json) psycopg2.extensions.register_adapter( - KernelConfig, postgres.create_iterable_adapter(2, explicit_iterate=True)) + KernelConfig, create_iterable_adapter(2, explicit_iterate=True)) # Register ready-made UUID type adapter extras.register_uuid() + # Insert a run_uuid which will be globally accessible during the run self.run_uuid = uuid.UUID(str(uuid.uuid4())) run_output = context.run_output - retry_on_status = postgres.ListOfLevel(run_output.run_config.retry_on_status) + retry_on_status = ListOfLevel(run_output.run_config.retry_on_status) self.cursor.execute( self.sql_command['create_run'], ( @@ -162,13 +168,19 @@ class PostgresqlResultProcessor(OutputProcessor): run_output.state.timestamp, run_output.info.run_name, run_output.info.project, + run_output.info.project_stage, retry_on_status, run_output.run_config.max_retries, run_output.run_config.bail_on_init_failure, run_output.run_config.allow_phone_home, run_output.info.uuid, run_output.info.start_time, - run_output.metadata)) + run_output.metadata, + json.dumps(run_output.state.to_pod()), + run_output.result._pod_version, # pylint: disable=protected-access + run_output.result._pod_serialization_version, # pylint: disable=protected-access + ) + ) self.target_uuid = uuid.uuid4() target_info = context.target_info target_pod = target_info.to_pod() @@ -191,7 +203,17 @@ class PostgresqlResultProcessor(OutputProcessor): target_pod['kernel_release'], target_info.kernel_version.sha1, target_info.kernel_config, - target_pod['sched_features'])) + target_pod['sched_features'], + target_pod['page_size_kb'], + # Android Specific + target_pod.get('screen_resolution'), + target_pod.get('prop'), + target_pod.get('android_id'), + target_pod.get('pod_version'), + target_pod.get('pod_serialization_version'), + ) + ) + # Commit cursor commands self.conn.commit() @@ -212,7 +234,26 @@ class PostgresqlResultProcessor(OutputProcessor): job_output.id, job_output.iteration, job_output.spec.workload_name, - job_output.metadata)) + job_output.metadata, + job_output.spec._pod_version, # pylint: disable=protected-access + job_output.spec._pod_serialization_version, # pylint: disable=protected-access + ) + ) + + for classifier in job_output.classifiers: + classifier_uuid = uuid.uuid4() + self.cursor.execute( + self.sql_command['create_classifier'], + ( + classifier_uuid, + None, + None, + job_uuid, + None, + classifier, + job_output.classifiers[classifier] + ) + ) # Update the run table and run-level parameters self.cursor.execute( self.sql_command['update_run'], @@ -221,7 +262,24 @@ class PostgresqlResultProcessor(OutputProcessor): run_output.status, run_output.state.timestamp, run_output.info.end_time, + None, + json.dumps(run_output.state.to_pod()), self.run_uuid)) + for classifier in run_output.classifiers: + classifier_uuid = uuid.uuid4() + self.cursor.execute( + self.sql_command['create_classifier'], + ( + classifier_uuid, + None, + None, + None, + None, + self.run_uuid, + classifier, + run_output.classifiers[classifier] + ) + ) self.sql_upload_artifacts(run_output, record_in_added=True) self.sql_upload_metrics(run_output, record_in_added=True) self.sql_upload_augmentations(run_output) @@ -255,19 +313,27 @@ class PostgresqlResultProcessor(OutputProcessor): ( job_status, job_id, - self.run_uuid)) + self.run_uuid + ) + ) run_uuid = self.run_uuid # Update the run entry after jobs have completed + run_info_pod = run_output.info.to_pod() + run_state_pod = run_output.state.to_pod() sql_command_update_run = self.sql_command['update_run'] self.cursor.execute( sql_command_update_run, ( run_output.event_summary, run_output.status, - run_output.state.timestamp, - run_output.info.end_time, - run_uuid)) + run_info_pod['start_time'], + run_info_pod['end_time'], + run_info_pod['duration'], + json.dumps(run_state_pod), + run_uuid, + ) + ) self.sql_upload_events(run_output) self.sql_upload_artifacts(run_output, check_uniqueness=True) self.sql_upload_metrics(run_output, check_uniqueness=True) @@ -284,11 +350,14 @@ class PostgresqlResultProcessor(OutputProcessor): ( resource_getter_uuid, self.run_uuid, - resource_getter)) + resource_getter, + ) + ) self.sql_upload_parameters( 'resource_getter', output_object.run_config.resource_getters[resource_getter], - owner_id=resource_getter_uuid) + owner_id=resource_getter_uuid, + ) def sql_upload_events(self, output_object, job_uuid=None): for event in output_object.events: @@ -300,7 +369,11 @@ class PostgresqlResultProcessor(OutputProcessor): self.run_uuid, job_uuid, event.timestamp, - event.message)) + event.message, + event._pod_version, # pylint: disable=protected-access + event._pod_serialization_version, # pylint: disable=protected-access + ) + ) def sql_upload_job_augmentations(self, output_object, job_uuid=None): ''' This is a table which links the uuids of augmentations to jobs. @@ -320,7 +393,9 @@ class PostgresqlResultProcessor(OutputProcessor): ( job_aug_uuid, job_uuid, - augmentation_uuid)) + augmentation_uuid, + ) + ) def sql_upload_augmentations(self, output_object): for augmentation in output_object.augmentations: @@ -332,11 +407,14 @@ class PostgresqlResultProcessor(OutputProcessor): ( augmentation_uuid, self.run_uuid, - augmentation)) + augmentation, + ) + ) self.sql_upload_parameters( 'augmentation', output_object.run_config.augmentations[augmentation], - owner_id=augmentation_uuid) + owner_id=augmentation_uuid, + ) self.augmentations_already_added[augmentation] = augmentation_uuid def sql_upload_metrics(self, output_object, record_in_added=False, check_uniqueness=False, job_uuid=None): @@ -353,7 +431,11 @@ class PostgresqlResultProcessor(OutputProcessor): metric.name, metric.value, metric.units, - metric.lower_is_better)) + metric.lower_is_better, + metric._pod_version, # pylint: disable=protected-access + metric._pod_serialization_version, # pylint: disable=protected-access + ) + ) for classifier in metric.classifiers: classifier_uuid = uuid.uuid4() self.cursor.execute( @@ -362,8 +444,12 @@ class PostgresqlResultProcessor(OutputProcessor): classifier_uuid, None, metric_uuid, + None, + None, classifier, - metric.classifiers[classifier])) + metric.classifiers[classifier], + ) + ) if record_in_added: self.metrics_already_added.append(metric) @@ -374,7 +460,7 @@ class PostgresqlResultProcessor(OutputProcessor): ''' for artifact in output_object.artifacts: if artifact in self.artifacts_already_added and check_uniqueness: - self.logger.debug('Skipping uploading {} as already added' .format(artifact)) + self.logger.debug('Skipping uploading {} as already added'.format(artifact)) continue if artifact in self.artifacts_already_added: @@ -411,7 +497,9 @@ class PostgresqlResultProcessor(OutputProcessor): parameter, json.dumps(parameter_dict[parameter]), str(type(parameter_dict[parameter])), - parameter_type)) + parameter_type, + ) + ) def connect_to_database(self): dsn = "dbname={} user={} password={} host={} port={}".format( @@ -432,6 +520,21 @@ class PostgresqlResultProcessor(OutputProcessor): self.conn.commit() self.conn.reset() + def check_schema_versions(self): + schemafilepath = os.path.join(POSTGRES_SCHEMA_DIR, 'postgres_schema.sql') + cur_major_version, cur_minor_version, _ = get_schema(schemafilepath) + db_schema_version = get_database_schema_version(self.cursor) + if (cur_major_version, cur_minor_version) != db_schema_version: + self.cursor.close() + self.cursor = None + self.conn.commit() + self.conn.reset() + msg = 'The current database schema is v{} however the local ' \ + 'schema version is v{}. Please update your database ' \ + 'with the create command' + raise OutputProcessorError(msg.format(db_schema_version, + (cur_major_version, cur_minor_version))) + def _sql_write_lobject(self, source, lobject): with open(source) as lobj_file: lobj_data = lobj_file.read() @@ -458,7 +561,9 @@ class PostgresqlResultProcessor(OutputProcessor): self.sql_command['create_large_object'], ( large_object_uuid, - loid)) + loid, + ) + ) self.cursor.execute( self.sql_command['create_artifact'], ( @@ -468,7 +573,11 @@ class PostgresqlResultProcessor(OutputProcessor): artifact.name, large_object_uuid, artifact.description, - artifact.kind)) + str(artifact.kind), + artifact._pod_version, # pylint: disable=protected-access + artifact._pod_serialization_version, # pylint: disable=protected-access + ) + ) for classifier in artifact.classifiers: classifier_uuid = uuid.uuid4() self.cursor.execute( @@ -477,7 +586,11 @@ class PostgresqlResultProcessor(OutputProcessor): classifier_uuid, artifact_uuid, None, + None, + None, classifier, - artifact.classifiers[classifier])) + artifact.classifiers[classifier], + ) + ) if record_in_added: self.artifacts_already_added[artifact] = loid -- cgit v1.2.3