diff options
author | Neil Williams <neil.williams@linaro.org> | 2017-03-17 13:58:58 +0000 |
---|---|---|
committer | Neil Williams <neil.williams@linaro.org> | 2017-03-17 13:58:58 +0000 |
commit | a561326bc23a9540e2fa58dff5a76e3e2d141cd0 (patch) | |
tree | cd7816c7f56e1a02270170a19f5738dc5acabbdd /lava_scheduler_app/tests/test_pipeline.py | |
parent | c94238613d334cfd637b6e445d7504a84c8e4694 (diff) |
Add a test case for secondary connections
Tidy the mustang-grub-efi template to use line concatentation
in YAML
Change-Id: Idde26b51e4c0302cde144aa7bd9952c78bb61853
Diffstat (limited to 'lava_scheduler_app/tests/test_pipeline.py')
-rw-r--r-- | lava_scheduler_app/tests/test_pipeline.py | 68 |
1 files changed, 68 insertions, 0 deletions
diff --git a/lava_scheduler_app/tests/test_pipeline.py b/lava_scheduler_app/tests/test_pipeline.py index 2179ae608..17763247f 100644 --- a/lava_scheduler_app/tests/test_pipeline.py +++ b/lava_scheduler_app/tests/test_pipeline.py @@ -843,6 +843,74 @@ class TestYamlMultinode(TestCaseWithFactory): if role == 'server': self.assertEqual(job, yaml.load(open(server_check, 'r'))) + def test_secondary_connection(self): + user = self.factory.make_user() + device_type = self.factory.make_device_type(name='mustang') + device = self.factory.make_device(device_type, 'mustang1') + mustang = DeviceDictionary(hostname=device.hostname) + mustang.parameters = { + 'connection_command': 'telnet serial4 7012', + 'extends': 'mustang-grub-efi.jinja2', + } + mustang.save() + submission = yaml.load(open( + os.path.join(os.path.dirname(__file__), 'mustang-ssh-multinode.yaml'), 'r')) + target_group = 'arbitrary-group-id' # for unit tests only + jobs_dict = split_multinode_yaml(submission, target_group) + self.assertIsNotNone(jobs_dict) + jobs = TestJob.from_yaml_and_user(yaml.dump(submission), user) + self.assertIsNotNone(jobs) + host_job = None + guest_job = None + for job in jobs: + if job.device_role == 'host': + host_job = job + if job.device_role == 'guest': + guest_job = job + self.assertIsNotNone(host_job) + self.assertIsNotNone(guest_job) + self.assertTrue(guest_job.dynamic_connection) + parser = JobParser() + job_ctx = {} + host_job.actual_device = device + + try: + device_config = device.load_device_configuration(job_ctx, system=False) # raw dict + except (jinja2.TemplateError, yaml.YAMLError, IOError) as exc: + # FIXME: report the exceptions as useful user messages + self.fail("[%d] jinja2 error: %s" % (host_job.id, exc)) + if not device_config or not isinstance(device_config, dict): + # it is an error to have a pipeline device without a device dictionary as it will never get any jobs. + msg = "Administrative error. Device '%s' has no device dictionary." % device.hostname + self.fail('[%d] device-dictionary error: %s' % (host_job.id, msg)) + + device_object = PipelineDevice(device_config, device.hostname) # equivalent of the NewDevice in lava-dispatcher, without .yaml file. + # FIXME: drop this nasty hack once 'target' is dropped as a parameter + if 'target' not in device_object: + device_object.target = device.hostname + device_object['hostname'] = device.hostname + self.assertIsNotNone(device_object) + parser_device = device_object + try: + # pass (unused) output_dir just for validation as there is no zmq socket either. + pipeline_job = parser.parse( + host_job.definition, parser_device, + host_job.id, None, "", output_dir=host_job.output_dir) + except (AttributeError, JobError, NotImplementedError, KeyError, TypeError) as exc: + self.fail('[%s] parser error: %s' % (host_job.sub_id, exc)) + pipeline_job._validate(False) + self.assertEqual([], pipeline_job.pipeline.errors) + + try: + # pass (unused) output_dir just for validation as there is no zmq socket either. + pipeline_job = parser.parse( + guest_job.definition, parser_device, + guest_job.id, None, "", output_dir=guest_job.output_dir) + except (AttributeError, JobError, NotImplementedError, KeyError, TypeError) as exc: + self.fail('[%s] parser error: %s' % (guest_job.sub_id, exc)) + pipeline_job._validate(False) + self.assertEqual([], pipeline_job.pipeline.errors) + def test_multinode_tags(self): Tag.objects.all().delete() self.factory.ensure_tag('tap'), |