aboutsummaryrefslogtreecommitdiff
path: root/lava_scheduler_app/tests/test_pipeline.py
diff options
context:
space:
mode:
authorNeil Williams <neil.williams@linaro.org>2017-03-17 13:58:58 +0000
committerNeil Williams <neil.williams@linaro.org>2017-03-17 13:58:58 +0000
commita561326bc23a9540e2fa58dff5a76e3e2d141cd0 (patch)
treecd7816c7f56e1a02270170a19f5738dc5acabbdd /lava_scheduler_app/tests/test_pipeline.py
parentc94238613d334cfd637b6e445d7504a84c8e4694 (diff)
Add a test case for secondary connections
Tidy the mustang-grub-efi template to use line concatentation in YAML Change-Id: Idde26b51e4c0302cde144aa7bd9952c78bb61853
Diffstat (limited to 'lava_scheduler_app/tests/test_pipeline.py')
-rw-r--r--lava_scheduler_app/tests/test_pipeline.py68
1 files changed, 68 insertions, 0 deletions
diff --git a/lava_scheduler_app/tests/test_pipeline.py b/lava_scheduler_app/tests/test_pipeline.py
index 2179ae608..17763247f 100644
--- a/lava_scheduler_app/tests/test_pipeline.py
+++ b/lava_scheduler_app/tests/test_pipeline.py
@@ -843,6 +843,74 @@ class TestYamlMultinode(TestCaseWithFactory):
if role == 'server':
self.assertEqual(job, yaml.load(open(server_check, 'r')))
+ def test_secondary_connection(self):
+ user = self.factory.make_user()
+ device_type = self.factory.make_device_type(name='mustang')
+ device = self.factory.make_device(device_type, 'mustang1')
+ mustang = DeviceDictionary(hostname=device.hostname)
+ mustang.parameters = {
+ 'connection_command': 'telnet serial4 7012',
+ 'extends': 'mustang-grub-efi.jinja2',
+ }
+ mustang.save()
+ submission = yaml.load(open(
+ os.path.join(os.path.dirname(__file__), 'mustang-ssh-multinode.yaml'), 'r'))
+ target_group = 'arbitrary-group-id' # for unit tests only
+ jobs_dict = split_multinode_yaml(submission, target_group)
+ self.assertIsNotNone(jobs_dict)
+ jobs = TestJob.from_yaml_and_user(yaml.dump(submission), user)
+ self.assertIsNotNone(jobs)
+ host_job = None
+ guest_job = None
+ for job in jobs:
+ if job.device_role == 'host':
+ host_job = job
+ if job.device_role == 'guest':
+ guest_job = job
+ self.assertIsNotNone(host_job)
+ self.assertIsNotNone(guest_job)
+ self.assertTrue(guest_job.dynamic_connection)
+ parser = JobParser()
+ job_ctx = {}
+ host_job.actual_device = device
+
+ try:
+ device_config = device.load_device_configuration(job_ctx, system=False) # raw dict
+ except (jinja2.TemplateError, yaml.YAMLError, IOError) as exc:
+ # FIXME: report the exceptions as useful user messages
+ self.fail("[%d] jinja2 error: %s" % (host_job.id, exc))
+ if not device_config or not isinstance(device_config, dict):
+ # it is an error to have a pipeline device without a device dictionary as it will never get any jobs.
+ msg = "Administrative error. Device '%s' has no device dictionary." % device.hostname
+ self.fail('[%d] device-dictionary error: %s' % (host_job.id, msg))
+
+ device_object = PipelineDevice(device_config, device.hostname) # equivalent of the NewDevice in lava-dispatcher, without .yaml file.
+ # FIXME: drop this nasty hack once 'target' is dropped as a parameter
+ if 'target' not in device_object:
+ device_object.target = device.hostname
+ device_object['hostname'] = device.hostname
+ self.assertIsNotNone(device_object)
+ parser_device = device_object
+ try:
+ # pass (unused) output_dir just for validation as there is no zmq socket either.
+ pipeline_job = parser.parse(
+ host_job.definition, parser_device,
+ host_job.id, None, "", output_dir=host_job.output_dir)
+ except (AttributeError, JobError, NotImplementedError, KeyError, TypeError) as exc:
+ self.fail('[%s] parser error: %s' % (host_job.sub_id, exc))
+ pipeline_job._validate(False)
+ self.assertEqual([], pipeline_job.pipeline.errors)
+
+ try:
+ # pass (unused) output_dir just for validation as there is no zmq socket either.
+ pipeline_job = parser.parse(
+ guest_job.definition, parser_device,
+ guest_job.id, None, "", output_dir=guest_job.output_dir)
+ except (AttributeError, JobError, NotImplementedError, KeyError, TypeError) as exc:
+ self.fail('[%s] parser error: %s' % (guest_job.sub_id, exc))
+ pipeline_job._validate(False)
+ self.assertEqual([], pipeline_job.pipeline.errors)
+
def test_multinode_tags(self):
Tag.objects.all().delete()
self.factory.ensure_tag('tap'),