From eabe15750c7f65f89410baac1f50e5b7f66316e5 Mon Sep 17 00:00:00 2001 From: Marc Bonnici Date: Tue, 30 Oct 2018 17:10:37 +0000 Subject: commands/create: Allow for upgrading database schema Provide a method of upgrading existing postgres databases to a new schema version. --- wa/commands/create.py | 284 ++++++++++++++------- .../postgres_schema_update_v1.2.sql | 30 +++ .../postgres_schemas/postgres_schema_v1.2.sql | 30 --- 3 files changed, 217 insertions(+), 127 deletions(-) create mode 100644 wa/commands/postgres_schemas/postgres_schema_update_v1.2.sql delete mode 100644 wa/commands/postgres_schemas/postgres_schema_v1.2.sql diff --git a/wa/commands/create.py b/wa/commands/create.py index 24931cfa..603feeae 100644 --- a/wa/commands/create.py +++ b/wa/commands/create.py @@ -40,10 +40,12 @@ from wa.framework.exception import ConfigError, CommandError from wa.instruments.energy_measurement import EnergyInstrumentBackend from wa.utils.misc import (ensure_directory_exists as _d, capitalize, ensure_file_directory_exists as _f) +from wa.utils.postgres import get_schema from wa.utils.serializer import yaml TEMPLATES_DIR = os.path.join(os.path.dirname(__file__), 'templates') +POSTGRES_SCHEMA_DIR = os.path.join(os.path.dirname(__file__), 'postgres_schemas') class CreateDatabaseSubcommand(SubCommand): @@ -54,14 +56,21 @@ class CreateDatabaseSubcommand(SubCommand): output processor. """ - schemafilepath = 'postgres_schema.sql' + schemafilepath = os.path.join(POSTGRES_SCHEMA_DIR, 'postgres_schema.sql') + schemaupdatefilepath = os.path.join(POSTGRES_SCHEMA_DIR, 'postgres_schema_update_v{}.{}.sql') def __init__(self, *args, **kwargs): super(CreateDatabaseSubcommand, self).__init__(*args, **kwargs) self.sql_commands = None - self.schemaversion = None self.schema_major = None self.schema_minor = None + self.postgres_host = None + self.postgres_port = None + self.username = None + self.password = None + self.dbname = None + self.config_file = None + self.force = None def initialize(self, context): self.parser.add_argument( @@ -91,25 +100,35 @@ class CreateDatabaseSubcommand(SubCommand): self.parser.add_argument( '-x', '--schema-version', action='store_true', help='Display the current schema version.') + self.parser.add_argument( + '-U', '--upgrade', action='store_true', + help='Upgrade the database to use the latest schema version.') def execute(self, state, args): # pylint: disable=too-many-branches if not psycopg2: raise CommandError( 'The module psycopg2 is required for the wa ' + 'create database command.') - self.get_schema(self.schemafilepath) + + if args.dbname == 'postgres': + raise ValueError('Databasename to create cannot be postgres.') + + self._parse_args(args) + self.schema_major, self.schema_minor, self.sql_commands = _get_schema(self.schemafilepath) # Display the version if needed and exit if args.schema_version: self.logger.info( - 'The current schema version is {}'.format(self.schemaversion)) + 'The current schema version is {}.{}'.format(self.schema_major, + self.schema_minor)) return - if args.dbname == 'postgres': - raise ValueError('Databasename to create cannot be postgres.') + if args.upgrade: + self.update_schema() + return # Open user configuration - with open(args.config_file, 'r') as config_file: + with open(self.config_file, 'r') as config_file: config = yaml.load(config_file) if 'postgres' in config and not args.force_update_config: raise CommandError( @@ -149,39 +168,158 @@ class CreateDatabaseSubcommand(SubCommand): # Attempt to create database try: - self.create_database(args) + self.create_database() except OperationalError as e: for handle in possible_connection_errors: predicate(e, handle) raise e # Update the configuration file - _update_configuration_file(args, config) - - def create_database(self, args): - _validate_version(args) - - _check_database_existence(args) - - _create_database_postgres(args) - - _apply_database_schema(args, self.sql_commands, self.schema_major, self.schema_minor) - - self.logger.debug( - "Successfully created the database {}".format(args.dbname)) - - def get_schema(self, schemafilepath): - postgres_output_processor_dir = os.path.dirname(__file__) - sqlfile = open(os.path.join( - postgres_output_processor_dir, schemafilepath)) - self.sql_commands = sqlfile.read() - sqlfile.close() - # Extract schema version - if self.sql_commands.startswith('--!VERSION'): - splitcommands = self.sql_commands.split('!ENDVERSION!\n') - self.schemaversion = splitcommands[0].strip('--!VERSION!') - (self.schema_major, self.schema_minor) = self.schemaversion.split('.') - self.sql_commands = splitcommands[1] + self._update_configuration_file(config) + + def create_database(self): + self._validate_version() + + self._check_database_existence() + + self._create_database_postgres() + + self._apply_database_schema(self.sql_commands, self.schema_major, self.schema_minor) + + self.logger.info( + "Successfully created the database {}".format(self.dbname)) + + def update_schema(self): + self._validate_version() + schema_major, schema_minor, _ = _get_schema(self.schemafilepath) + meta_oid, current_major, current_minor = self._get_database_schema_version() + + while not (schema_major == current_major and schema_minor == current_minor): + current_minor = self._update_schema_minors(current_major, current_minor, meta_oid) + current_major, current_minor = self._update_schema_major(current_major, current_minor, meta_oid) + msg = "Database schema update of '{}' to v{}.{} complete" + self.logger.info(msg.format(self.dbname, schema_major, schema_minor)) + + def _update_schema_minors(self, major, minor, meta_oid): + # Upgrade all available minor versions + while True: + minor += 1 + schema_update = os.path.join(POSTGRES_SCHEMA_DIR, + self.schemaupdatefilepath.format(major, minor)) + if not os.path.exists(schema_update): + break + + _, _, sql_commands = _get_schema(schema_update) + self._apply_database_schema(sql_commands, major, minor, meta_oid) + msg = "Updated the database schema to v{}.{}" + self.logger.debug(msg.format(major, minor)) + + # Return last existing update file version + return minor - 1 + + def _update_schema_major(self, current_major, current_minor, meta_oid): + current_major += 1 + schema_update = os.path.join(POSTGRES_SCHEMA_DIR, + self.schemaupdatefilepath.format(current_major, 0)) + if not os.path.exists(schema_update): + return (current_major - 1, current_minor) + + # Reset minor to 0 with major version bump + current_minor = 0 + _, _, sql_commands = _get_schema(schema_update) + self._apply_database_schema(sql_commands, current_major, current_minor, meta_oid) + msg = "Updated the database schema to v{}.{}" + self.logger.debug(msg.format(current_major, current_minor)) + return (current_major, current_minor) + + def _validate_version(self): + conn = connect(user=self.username, + password=self.password, host=self.postgres_host, port=self.postgres_port) + if conn.server_version < 90400: + msg = 'Postgres version too low. Please ensure that you are using atleast v9.4' + raise CommandError(msg) + + def _get_database_schema_version(self): + conn = connect(dbname=self.dbname, user=self.username, + password=self.password, host=self.postgres_host, port=self.postgres_port) + cursor = conn.cursor() + cursor.execute('''SELECT + DatabaseMeta.oid, + DatabaseMeta.schema_major, + DatabaseMeta.schema_minor + FROM + DatabaseMeta;''') + return cursor.fetchone() + + def _check_database_existence(self): + try: + connect(dbname=self.dbname, user=self.username, + password=self.password, host=self.postgres_host, port=self.postgres_port) + except OperationalError as e: + # Expect an operational error (database's non-existence) + if not re.compile('FATAL: database ".*" does not exist').match(str(e)): + raise e + else: + if not self.force: + raise CommandError( + "Database {} already exists. ".format(self.dbname) + + "Please specify the -f flag to create it from afresh." + ) + + def _create_database_postgres(self): + conn = connect(dbname='postgres', user=self.username, + password=self.password, host=self.postgres_host, port=self.postgres_port) + conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT) + cursor = conn.cursor() + cursor.execute('DROP DATABASE IF EXISTS ' + self.dbname) + cursor.execute('CREATE DATABASE ' + self.dbname) + conn.commit() + cursor.close() + conn.close() + + def _apply_database_schema(self, sql_commands, schema_major, schema_minor, meta_uuid=None): + conn = connect(dbname=self.dbname, user=self.username, + password=self.password, host=self.postgres_host, port=self.postgres_port) + cursor = conn.cursor() + cursor.execute(sql_commands) + + if not meta_uuid: + extras.register_uuid() + meta_uuid = uuid.uuid4() + cursor.execute("INSERT INTO DatabaseMeta VALUES (%s, %s, %s)", + (meta_uuid, + schema_major, + schema_minor + )) + else: + cursor.execute("UPDATE DatabaseMeta SET schema_major = %s, schema_minor = %s WHERE oid = %s;", + (schema_major, + schema_minor, + meta_uuid + )) + + conn.commit() + cursor.close() + conn.close() + + def _update_configuration_file(self, config): + ''' Update the user configuration file with the newly created database's + configuration. + ''' + config['postgres'] = OrderedDict( + [('host', self.postgres_host), ('port', self.postgres_port), + ('dbname', self.dbname), ('username', self.username), ('password', self.password)]) + with open(self.config_file, 'w+') as config_file: + yaml.dump(config, config_file) + + def _parse_args(self, args): + self.postgres_host = args.postgres_host + self.postgres_port = args.postgres_port + self.username = args.username + self.password = args.password + self.dbname = args.dbname + self.config_file = args.config_file + self.force = args.force class CreateAgendaSubcommand(SubCommand): @@ -431,68 +569,20 @@ def touch(path): pass -def _validate_version(args): - conn = connect(user=args.username, - password=args.password, host=args.postgres_host, port=args.postgres_port) - if conn.server_version < 90400: - msg = 'Postgres version too low. Please ensure that you are using atleast v9.4' - raise CommandError(msg) +def _get_schema(schemafilepath): + sqlfile_path = os.path.join( + POSTGRES_SCHEMA_DIR, schemafilepath) + with open(sqlfile_path, 'r') as sqlfile: + sql_commands = sqlfile.read() -def _check_database_existence(args): - try: - connect(dbname=args.dbname, user=args.username, - password=args.password, host=args.postgres_host, port=args.postgres_port) - except OperationalError as e: - # Expect an operational error (database's non-existence) - if not re.compile('FATAL: database ".*" does not exist').match(str(e)): - raise e - else: - if not args.force: - raise CommandError( - "Database {} already exists. ".format(args.dbname) + - "Please specify the -f flag to create it from afresh." - ) - - -def _create_database_postgres(args): # pylint: disable=no-self-use - conn = connect(dbname='postgres', user=args.username, - password=args.password, host=args.postgres_host, port=args.postgres_port) - conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT) - cursor = conn.cursor() - cursor.execute('DROP DATABASE IF EXISTS ' + args.dbname) - cursor.execute('CREATE DATABASE ' + args.dbname) - conn.commit() - cursor.close() - conn.close() - - -def _apply_database_schema(args, sql_commands, schema_major, schema_minor): - conn = connect(dbname=args.dbname, user=args.username, - password=args.password, host=args.postgres_host, port=args.postgres_port) - cursor = conn.cursor() - cursor.execute(sql_commands) - - extras.register_uuid() - cursor.execute("INSERT INTO DatabaseMeta VALUES (%s, %s, %s)", - ( - uuid.uuid4(), - schema_major, - schema_minor - ) - ) - - conn.commit() - cursor.close() - conn.close() - - -def _update_configuration_file(args, config): - ''' Update the user configuration file with the newly created database's - configuration. - ''' - config['postgres'] = OrderedDict( - [('host', args.postgres_host), ('port', args.postgres_port), - ('dbname', args.dbname), ('username', args.username), ('password', args.password)]) - with open(args.config_file, 'w+') as config_file: - yaml.dump(config, config_file) + schema_major = None + schema_minor = None + # Extract schema version if present + if sql_commands.startswith('--!VERSION'): + splitcommands = sql_commands.split('!ENDVERSION!\n') + schema_major, schema_minor = splitcommands[0].strip('--!VERSION!').split('.') + schema_major = int(schema_major) + schema_minor = int(schema_minor) + sql_commands = splitcommands[1] + return schema_major, schema_minor, sql_commands diff --git a/wa/commands/postgres_schemas/postgres_schema_update_v1.2.sql b/wa/commands/postgres_schemas/postgres_schema_update_v1.2.sql new file mode 100644 index 00000000..1c982226 --- /dev/null +++ b/wa/commands/postgres_schemas/postgres_schema_update_v1.2.sql @@ -0,0 +1,30 @@ +ALTER TABLE resourcegetters RENAME TO resource_getters; + +ALTER TABLE classifiers ADD COLUMN job_oid uuid references Jobs(oid); +ALTER TABLE classifiers ADD COLUMN run_oid uuid references Runs(oid); + +ALTER TABLE targets ADD COLUMN page_size_kb int; +ALTER TABLE targets ADD COLUMN screen_resolution int[]; +ALTER TABLE targets ADD COLUMN prop text; +ALTER TABLE targets ADD COLUMN android_id text; +ALTER TABLE targets ADD COLUMN _pod_version int; +ALTER TABLE targets ADD COLUMN _pod_serialization_version int; + +ALTER TABLE jobs RENAME COLUMN retries TO retry; +ALTER TABLE jobs ADD COLUMN _pod_version int; +ALTER TABLE jobs ADD COLUMN _pod_serialization_version int; + +ALTER TABLE runs ADD COLUMN project_stage text; +ALTER TABLE runs ADD COLUMN state jsonb; +ALTER TABLE runs ADD COLUMN duration float; +ALTER TABLE runs ADD COLUMN _pod_version int; +ALTER TABLE runs ADD COLUMN _pod_serialization_version int; + +ALTER TABLE artifacts ADD COLUMN _pod_version int; +ALTER TABLE artifacts ADD COLUMN _pod_serialization_version int; + +ALTER TABLE events ADD COLUMN _pod_version int; +ALTER TABLE events ADD COLUMN _pod_serialization_version int; + +ALTER TABLE metrics ADD COLUMN _pod_version int; +ALTER TABLE metrics ADD COLUMN _pod_serialization_version int; diff --git a/wa/commands/postgres_schemas/postgres_schema_v1.2.sql b/wa/commands/postgres_schemas/postgres_schema_v1.2.sql deleted file mode 100644 index 1c982226..00000000 --- a/wa/commands/postgres_schemas/postgres_schema_v1.2.sql +++ /dev/null @@ -1,30 +0,0 @@ -ALTER TABLE resourcegetters RENAME TO resource_getters; - -ALTER TABLE classifiers ADD COLUMN job_oid uuid references Jobs(oid); -ALTER TABLE classifiers ADD COLUMN run_oid uuid references Runs(oid); - -ALTER TABLE targets ADD COLUMN page_size_kb int; -ALTER TABLE targets ADD COLUMN screen_resolution int[]; -ALTER TABLE targets ADD COLUMN prop text; -ALTER TABLE targets ADD COLUMN android_id text; -ALTER TABLE targets ADD COLUMN _pod_version int; -ALTER TABLE targets ADD COLUMN _pod_serialization_version int; - -ALTER TABLE jobs RENAME COLUMN retries TO retry; -ALTER TABLE jobs ADD COLUMN _pod_version int; -ALTER TABLE jobs ADD COLUMN _pod_serialization_version int; - -ALTER TABLE runs ADD COLUMN project_stage text; -ALTER TABLE runs ADD COLUMN state jsonb; -ALTER TABLE runs ADD COLUMN duration float; -ALTER TABLE runs ADD COLUMN _pod_version int; -ALTER TABLE runs ADD COLUMN _pod_serialization_version int; - -ALTER TABLE artifacts ADD COLUMN _pod_version int; -ALTER TABLE artifacts ADD COLUMN _pod_serialization_version int; - -ALTER TABLE events ADD COLUMN _pod_version int; -ALTER TABLE events ADD COLUMN _pod_serialization_version int; - -ALTER TABLE metrics ADD COLUMN _pod_version int; -ALTER TABLE metrics ADD COLUMN _pod_serialization_version int; -- cgit v1.2.3