# Copyright 2014-2018 ARM Limited # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import os from copy import copy, deepcopy from collections import OrderedDict, defaultdict from wa.framework.exception import ConfigError, NotFoundError from wa.framework.configuration.tree import SectionNode from wa.utils import log from wa.utils.misc import (get_article, merge_config_values) from wa.utils.types import (identifier, integer, boolean, list_of_strings, list_of, toggle_set, obj_dict, enum) from wa.utils.serializer import is_pod # Mapping for kind conversion; see docs for convert_types below KIND_MAP = { int: integer, bool: boolean, dict: OrderedDict, } Status = enum(['UNKNOWN', 'NEW', 'PENDING', 'STARTED', 'CONNECTED', 'INITIALIZED', 'RUNNING', 'OK', 'PARTIAL', 'FAILED', 'ABORTED', 'SKIPPED']) ########################## ### CONFIG POINT TYPES ### ########################## class RebootPolicy(object): """ Represents the reboot policy for the execution -- at what points the device should be rebooted. This, in turn, is controlled by the policy value that is passed in on construction and would typically be read from the user's settings. Valid policy values are: :never: The device will never be rebooted. :as_needed: Only reboot the device if it becomes unresponsive, or needs to be flashed, etc. :initial: The device will be rebooted when the execution first starts, just before executing the first workload spec. :each_spec: The device will be rebooted before running a new workload spec. :each_iteration: The device will be rebooted before each new iteration. """ valid_policies = ['never', 'as_needed', 'initial', 'each_spec', 'each_job'] @staticmethod def from_pod(pod): return RebootPolicy(pod) def __init__(self, policy): if isinstance(policy, RebootPolicy): policy = policy.policy policy = policy.strip().lower().replace(' ', '_') if policy not in self.valid_policies: message = 'Invalid reboot policy {}; must be one of {}'.format(policy, ', '.join(self.valid_policies)) raise ConfigError(message) self.policy = policy @property def can_reboot(self): return self.policy != 'never' @property def perform_initial_reboot(self): return self.policy == 'initial' @property def reboot_on_each_job(self): return self.policy == 'each_job' @property def reboot_on_each_spec(self): return self.policy == 'each_spec' def __str__(self): return self.policy __repr__ = __str__ def __eq__(self, other): if isinstance(other, RebootPolicy): return self.policy == other.policy else: return self.policy == other def to_pod(self): return self.policy class status_list(list): def append(self, item): list.append(self, str(item).upper()) class LoggingConfig(dict): defaults = { 'file_format': '%(asctime)s %(levelname)-8s %(name)s: %(message)s', 'verbose_format': '%(asctime)s %(levelname)-8s %(name)s: %(message)s', 'regular_format': '%(levelname)-8s %(message)s', 'color': True, } @staticmethod def from_pod(pod): return LoggingConfig(pod) def __init__(self, config=None): dict.__init__(self) if isinstance(config, dict): config = {identifier(k.lower()): v for k, v in config.items()} self['regular_format'] = config.pop('regular_format', self.defaults['regular_format']) self['verbose_format'] = config.pop('verbose_format', self.defaults['verbose_format']) self['file_format'] = config.pop('file_format', self.defaults['file_format']) self['color'] = config.pop('colour_enabled', self.defaults['color']) # legacy self['color'] = config.pop('color', self.defaults['color']) if config: message = 'Unexpected logging configuration parameters: {}' raise ValueError(message.format(bad_vals=', '.join(list(config.keys())))) elif config is None: for k, v in self.defaults.items(): self[k] = v else: raise ValueError(config) def to_pod(self): return self def expanded_path(path): """ Ensure that the provided path has been expanded if applicable """ return os.path.expanduser(str(path)) def get_type_name(kind): typename = str(kind) if '\'' in typename: typename = typename.split('\'')[1] elif typename.startswith('`) rather than in the config file. ''', ), ConfigurationPoint( 'project', kind=str, description=''' A string naming the project for which data is being collected. This may be useful, e.g. when uploading data to a shared database that is populated from multiple projects. ''', ), ConfigurationPoint( 'project_stage', kind=dict, description=''' A dict or a string that allows adding additional identifier. This is may be useful for long-running projects. ''', ), ] config_points = [ ConfigurationPoint( 'execution_order', kind=str, default='by_iteration', allowed_values=['by_iteration', 'by_section', 'by_workload', 'random'], description=''' Defines the order in which the agenda spec will be executed. At the moment, the following execution orders are supported: ``"by_iteration"`` The first iteration of each workload spec is executed one after the other, so all workloads are executed before proceeding on to the second iteration. E.g. A1 B1 C1 A2 C2 A3. This is the default if no order is explicitly specified. In case of multiple sections, this will spread them out, such that specs from the same section are further part. E.g. given sections X and Y, global specs A and B, and two iterations, this will run :: X.A1, Y.A1, X.B1, Y.B1, X.A2, Y.A2, X.B2, Y.B2 ``"by_section"`` Same as ``"by_iteration"``, however this will group specs from the same section together, so given sections X and Y, global specs A and B, and two iterations, this will run :: X.A1, X.B1, Y.A1, Y.B1, X.A2, X.B2, Y.A2, Y.B2 ``"by_workload"`` All iterations of the first spec are executed before moving on to the next spec. E.g:: X.A1, X.A2, Y.A1, Y.A2, X.B1, X.B2, Y.B1, Y.B2 ``"random"`` Execution order is entirely random. ''', ), ConfigurationPoint( 'reboot_policy', kind=RebootPolicy, default='as_needed', allowed_values=RebootPolicy.valid_policies, description=''' This defines when during execution of a run the Device will be rebooted. The possible values are: ``"as_needed"`` The device will only be rebooted if the need arises (e.g. if it becomes unresponsive. ``"never"`` The device will never be rebooted. ``"initial"`` The device will be rebooted when the execution first starts, just before executing the first workload spec. ``"each_job"`` The device will be rebooted before each new job. ``"each_spec"`` The device will be rebooted before running a new workload spec. .. note:: this acts the same as each_job when execution order is set to by_iteration '''), ConfigurationPoint( 'device', kind=str, default='generic_android', description=''' This setting defines what specific Device subclass will be used to interact the connected device. Obviously, this must match your setup. ''', ), ConfigurationPoint( 'retry_on_status', kind=list_of(Status), default=['FAILED', 'PARTIAL'], allowed_values=Status.levels[Status.RUNNING.value:], description=''' This is list of statuses on which a job will be considered to have failed and will be automatically retried up to ``max_retries`` times. This defaults to ``["FAILED", "PARTIAL"]`` if not set. Possible values are: ``"OK"`` This iteration has completed and no errors have been detected ``"PARTIAL"`` One or more instruments have failed (the iteration may still be running). ``"FAILED"`` The workload itself has failed. ``"ABORTED"`` The user interrupted the workload. ''', ), ConfigurationPoint( 'max_retries', kind=int, default=2, description=''' The maximum number of times failed jobs will be retried before giving up. .. note:: This number does not include the original attempt ''', ), ConfigurationPoint( 'bail_on_init_failure', kind=bool, default=True, description=''' When jobs fail during their main setup and run phases, WA will continue attempting to run the remaining jobs. However, by default, if they fail during their early initialization phase, the entire run will end without continuing to run jobs. Setting this to ``False`` means that WA will instead skip all the jobs from the job spec that failed, but continue attempting to run others. ''' ), ConfigurationPoint( 'allow_phone_home', kind=bool, default=True, description=''' Setting this to ``False`` prevents running any workloads that are marked with 'phones_home', meaning they are at risk of exposing information about the device to the outside world. For example, some benchmark applications upload device data to a database owned by the maintainers. This can be used to minimise the risk of accidentally running such workloads when testing confidential devices. '''), ] configuration = {cp.name: cp for cp in config_points + meta_data} @classmethod def from_pod(cls, pod): meta_pod = {} for cfg_point in cls.meta_data: meta_pod[cfg_point.name] = pod.pop(cfg_point.name, None) device_config = pod.pop('device_config', None) augmentations = pod.pop('augmentations', {}) getters = pod.pop('resource_getters', {}) instance = super(RunConfiguration, cls).from_pod(pod) instance.device_config = device_config instance.augmentations = augmentations instance.resource_getters = getters for cfg_point in cls.meta_data: cfg_point.set_value(instance, meta_pod[cfg_point.name]) return instance def __init__(self): super(RunConfiguration, self).__init__() for confpoint in self.meta_data: confpoint.set_value(self, check_mandatory=False) self.device_config = None self.augmentations = {} self.resource_getters = {} def merge_device_config(self, plugin_cache): """ Merges global device config and validates that it is correct for the selected device. """ # pylint: disable=no-member if self.device is None: msg = 'Attempting to merge device config with unspecified device' raise RuntimeError(msg) self.device_config = plugin_cache.get_plugin_config(self.device, generic_name="device_config") def add_augmentation(self, aug): if aug.name in self.augmentations: raise ValueError('Augmentation "{}" already added.'.format(aug.name)) self.augmentations[aug.name] = aug.get_config() def add_resource_getter(self, getter): if getter.name in self.resource_getters: raise ValueError('Resource getter "{}" already added.'.format(getter.name)) self.resource_getters[getter.name] = getter.get_config() def to_pod(self): pod = super(RunConfiguration, self).to_pod() pod['device_config'] = dict(self.device_config or {}) pod['augmentations'] = self.augmentations pod['resource_getters'] = self.resource_getters return pod class JobSpec(Configuration): # pylint: disable=access-member-before-definition,attribute-defined-outside-init name = "Job Spec" config_points = [ ConfigurationPoint('iterations', kind=int, default=1, description=''' How many times to repeat this workload spec '''), ConfigurationPoint('workload_name', kind=str, mandatory=True, aliases=["name"], description=''' The name of the workload to run. '''), ConfigurationPoint('workload_parameters', kind=obj_dict, aliases=["params", "workload_params", "parameters"], description=''' Parameter to be passed to the workload '''), ConfigurationPoint('runtime_parameters', kind=obj_dict, aliases=["runtime_params"], description=''' Runtime parameters to be set prior to running the workload. '''), ConfigurationPoint('boot_parameters', kind=obj_dict, aliases=["boot_params"], description=''' Parameters to be used when rebooting the target prior to running the workload. '''), ConfigurationPoint('label', kind=str, description=''' Similar to IDs but do not have the uniqueness restriction. If specified, labels will be used by some output processors instead of (or in addition to) the workload name. For example, the csv output processor will put the label in the "workload" column of the CSV file. '''), ConfigurationPoint('augmentations', kind=toggle_set, merge=True, aliases=["instruments", "processors", "instrumentation", "output_processors", "augment", "result_processors"], description=''' The instruments and output processors to enable (or disabled using a ~) during this workload spec. This combines the "instrumentation" and "result_processors" from previous versions of WA (the old entries are now aliases for this). '''), ConfigurationPoint('flash', kind=dict, merge=True, description=''' '''), ConfigurationPoint('classifiers', kind=dict, merge=True, description=''' Classifiers allow you to tag metrics from this workload spec to help in post processing them. Theses are often used to help identify what runtime_parameters were used for results when post processing. '''), ] configuration = {cp.name: cp for cp in config_points} @classmethod def from_pod(cls, pod): job_id = pod.pop('id') instance = super(JobSpec, cls).from_pod(pod) instance.id = job_id return instance @property def section_id(self): if self.id is not None: return self.id.rsplit('-', 1)[0] @property def workload_id(self): if self.id is not None: return self.id.rsplit('-', 1)[-1] def __init__(self): super(JobSpec, self).__init__() if self.classifiers is None: self.classifiers = OrderedDict() self.to_merge = defaultdict(OrderedDict) self._sources = [] self.id = None if self.boot_parameters is None: self.boot_parameters = obj_dict() if self.runtime_parameters is None: self.runtime_parameters = obj_dict() def to_pod(self): pod = super(JobSpec, self).to_pod() pod['id'] = self.id return pod def update_config(self, source, check_mandatory=True): # pylint: disable=arguments-differ self._sources.append(source) values = source.config for k, v in values.items(): if k == "id": continue elif k.endswith('_parameters'): if v: self.to_merge[k][source] = copy(v) else: try: self.set(k, v, check_mandatory=check_mandatory) except ConfigError as e: msg = 'Error in {}:\n\t{}' raise ConfigError(msg.format(source.name, e.message)) def merge_workload_parameters(self, plugin_cache): # merge global generic and specific config workload_params = plugin_cache.get_plugin_config(self.workload_name, generic_name="workload_parameters", is_final=False) cfg_points = plugin_cache.get_plugin_parameters(self.workload_name) for source in self._sources: config = dict(self.to_merge["workload_parameters"].get(source, {})) if not config: continue for name, cfg_point in cfg_points.items(): if name in config: value = config.pop(name) cfg_point.set_value(workload_params, value, check_mandatory=False) if config: msg = 'Unexpected config "{}" for "{}"' raise ConfigError(msg.format(config, self.workload_name)) self.workload_parameters = workload_params def merge_runtime_parameters(self, plugin_cache, target_manager): # Order global runtime parameters runtime_parameters = OrderedDict() try: global_runtime_params = plugin_cache.get_plugin_config("runtime_parameters") except NotFoundError: global_runtime_params = {} for source in plugin_cache.sources: if source in global_runtime_params: runtime_parameters[source] = global_runtime_params[source] # Add runtime parameters from JobSpec for source, values in self.to_merge['runtime_parameters'].items(): runtime_parameters[source] = values # Merge self.runtime_parameters = target_manager.merge_runtime_parameters(runtime_parameters) def finalize(self): self.id = "-".join([str(source.config['id']) for source in self._sources[1:]]) # ignore first id, "global" # ensure *_parameters are always obj_dict's self.boot_parameters = obj_dict(list((self.boot_parameters or {}).items())) self.runtime_parameters = obj_dict(list((self.runtime_parameters or {}).items())) self.workload_parameters = obj_dict(list((self.workload_parameters or {}).items())) if self.label is None: self.label = self.workload_name # This is used to construct the list of Jobs WA will run class JobGenerator(object): name = "Jobs Configuration" @property def enabled_instruments(self): self._read_augmentations = True if self._enabled_instruments is None: self._enabled_instruments = [] for entry in list(self._enabled_augmentations.merge_with(self.disabled_augmentations).values()): entry_cls = self.plugin_cache.get_plugin_class(entry) if entry_cls.kind == 'instrument': self._enabled_instruments.append(entry) return self._enabled_instruments @property def enabled_processors(self): self._read_augmentations = True if self._enabled_processors is None: self._enabled_processors = [] for entry in list(self._enabled_augmentations.merge_with(self.disabled_augmentations).values()): entry_cls = self.plugin_cache.get_plugin_class(entry) if entry_cls.kind == 'output_processor': self._enabled_processors.append(entry) return self._enabled_processors def __init__(self, plugin_cache): self.plugin_cache = plugin_cache self.ids_to_run = [] self.workloads = [] self._enabled_augmentations = toggle_set() self._enabled_instruments = None self._enabled_processors = None self._read_augmentations = False self.disabled_augmentations = set() self.job_spec_template = obj_dict(not_in_dict=['name']) self.job_spec_template.name = "globally specified job spec configuration" self.job_spec_template.id = "global" # Load defaults for cfg_point in JobSpec.configuration.values(): cfg_point.set_value(self.job_spec_template, check_mandatory=False) self.root_node = SectionNode(self.job_spec_template) def set_global_value(self, name, value): JobSpec.configuration[name].set_value(self.job_spec_template, value, check_mandatory=False) if name == "augmentations": self.update_augmentations(value) def add_section(self, section, workloads): new_node = self.root_node.add_section(section) with log.indentcontext(): for workload in workloads: new_node.add_workload(workload) def add_workload(self, workload): self.root_node.add_workload(workload) def disable_augmentations(self, augmentations): for entry in augmentations: if entry == '~~': continue if entry.startswith('~'): entry = entry[1:] try: self.plugin_cache.get_plugin_class(entry) except NotFoundError: raise ConfigError('Error disabling unknown augmentation: "{}"'.format(entry)) self.disabled_augmentations = self.disabled_augmentations.union(augmentations) def update_augmentations(self, value): if self._read_augmentations: msg = 'Cannot update augmentations after they have been accessed' raise RuntimeError(msg) self._enabled_augmentations = self._enabled_augmentations.merge_with(value) def only_run_ids(self, ids): if isinstance(ids, str): ids = [ids] self.ids_to_run = ids def generate_job_specs(self, target_manager): specs = [] for leaf in self.root_node.leaves(): workload_entries = leaf.workload_entries sections = [leaf] for ancestor in leaf.ancestors(): workload_entries = ancestor.workload_entries + workload_entries sections.insert(0, ancestor) for workload_entry in workload_entries: job_spec = create_job_spec(deepcopy(workload_entry), sections, target_manager, self.plugin_cache, self.disabled_augmentations) if self.ids_to_run: for job_id in self.ids_to_run: if job_id in job_spec.id: break else: continue self.update_augmentations(list(job_spec.augmentations.values())) specs.append(job_spec) return specs def create_job_spec(workload_entry, sections, target_manager, plugin_cache, disabled_augmentations): job_spec = JobSpec() # PHASE 2.1: Merge general job spec configuration for section in sections: job_spec.update_config(section, check_mandatory=False) job_spec.update_config(workload_entry, check_mandatory=False) # PHASE 2.2: Merge global, section and workload entry "workload_parameters" job_spec.merge_workload_parameters(plugin_cache) # TODO: PHASE 2.3: Validate device runtime/boot parameters job_spec.merge_runtime_parameters(plugin_cache, target_manager) target_manager.validate_runtime_parameters(job_spec.runtime_parameters) # PHASE 2.4: Disable globally disabled augmentations job_spec.set("augmentations", disabled_augmentations) job_spec.finalize() return job_spec def get_config_point_map(params): pmap = {} for p in params: pmap[p.name] = p for alias in p.aliases: pmap[alias] = p return pmap settings = MetaConfiguration(os.environ)