aboutsummaryrefslogtreecommitdiff
path: root/lava_scheduler_app/templatetags/utils.py
blob: dd61d33a6e32ee1411013c48044fffb84875b197 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
import pytz
import yaml
from dateutil import parser
from django import template
from django.conf import settings
from collections import OrderedDict
from django.utils.safestring import mark_safe
from lava_scheduler_app.models import TestJob
from lava_scheduler_app.utils import load_devicetype_template


register = template.Library()


@register.filter
def get_priority_select(current):
    select = ""
    val = TestJob.PRIORITY_CHOICES
    for priority, label in val:
        check = " checked" if priority == current else ""
        default = " [default]" if current != 50 and priority == 50 else ""
        select += '<label class="checkbox-inline">'
        select += '<input type="radio" name="priority" style="..." id="%s" value="%d"%s>%s%s</input><br/>' %\
                  (label.lower(), priority, check, label, default)
        select += '</label>'
    return mark_safe(select)


@register.filter
def get_item(dictionary, key):
    return dictionary.get(key)


@register.filter
def get_settings(value):
    """This is only a filter, so use for boolean checks """
    if hasattr(settings, value):
        return getattr(settings, value)


@register.assignment_tag()
def assign_setting(value):
    """Returns the value of the setting"""
    if hasattr(settings, value):
        return getattr(settings, value)


def _get_pipeline_data(pipeline, levels):
    """
    Recursive check on the pipeline description dictionary
    """
    for action in pipeline:
        levels[action['level']] = {
            'name': action['name'],
            'description': action['description'],
            'summary': action['summary'],
            'timeout': action['timeout'],
        }
        if 'url' in action:
            levels[action['level']].update({'url': action['url']})
        if 'pipeline' in action:
            _get_pipeline_data(action['pipeline'], levels)


@register.assignment_tag()
def get_pipeline_sections(pipeline):
    """
    Just a top level view of the pipeline sections
    """
    sections = []
    for action in pipeline:
        if 'section' in action:
            sections.append({action['section']: action['level']})
    return sections


@register.assignment_tag()
def get_pipeline_levels(pipeline):
    """
    Retrieve the full set of action levels in this pipeline.
    """
    levels = OrderedDict()
    _get_pipeline_data(pipeline, levels)
    return levels


@register.assignment_tag()
def parse_timestamp(time_str):
    """
    Convert the pipeline log timestamp into datetime
    :param time_str: timestamp generated by the log handler
       of the form 2015-10-13T08:21:48.646202
    :return: datetime.datetime object or None
    """
    try:
        retval = parser.parse(time_str, ignoretz=True)
    except (AttributeError, TypeError):
        return None
    return pytz.utc.localize(retval)


@register.assignment_tag()
def logging_levels(request):
    levels = ['info', 'warning', 'exception']
    if 'info' in request.GET and request.GET['info'] == 'off':
        del levels[levels.index('info')]
    if 'debug' in request.GET and request.GET['debug'] == 'on':
        levels.append('debug')
    if 'warning' in request.GET and request.GET['warning'] == 'off':
        del levels[levels.index('warning')]
        del levels[levels.index('exception')]
    return levels


@register.filter()
def dump_exception(entry):
    data = [entry['exception']]
    if 'debug' in entry:
        data.append(entry['debug'])
    return yaml.dump(data)


@register.filter()
def deploy_methods(device_type, methods):
    data = load_devicetype_template(device_type)
    if not data or 'actions' not in data or methods not in data['actions']:
        return None
    methods = data['actions'][methods]['methods']
    if isinstance(methods, dict):
        return methods.keys()
    return [methods]


@register.assignment_tag()
def device_type_timeouts(device_type):
    data = load_devicetype_template(device_type)
    if not data or 'timeouts' not in data:
        return None
    return data['timeouts']


@register.filter()
def result_url(result_dict, job_id):
    if not isinstance(result_dict, dict):
        return None
    if 'test_definition' in result_dict:
        testdef = result_dict['test_definition']
        testcase = None
        for key, _ in result_dict.items():
            if key == 'test_definition':
                continue
            testcase = key
            break
        # 8125/singlenode-intermediate/tar-tgz
        return mark_safe('/results/%s/%s/%s' % (
            job_id, testdef, testcase
        ))
    elif len(result_dict.keys()) == 1:
        # action based result
        testdef = 'lava'
        if isinstance(result_dict.values()[0], OrderedDict):
            testcase = result_dict.keys()[0]
            return mark_safe('/results/%s/%s/%s' % (
                job_id, testdef, testcase
            ))
    else:
        return None


@register.assignment_tag()
def result_name(result_dict):
    if not isinstance(result_dict, dict):
        return None
    testcase = None
    testresult = None
    if 'test_definition' in result_dict:
        testdef = result_dict['test_definition']
        for key, value in result_dict.items():
            if key == 'test_definition':
                continue
            testcase = key
            testresult = value
            break
        # 8125/singlenode-intermediate/tar-tgz
        return mark_safe('%s - %s - %s' % (
            testdef, testcase, testresult
        ))
    elif len(result_dict.keys()) == 1:
        # action based result
        testdef = 'lava'
        if isinstance(result_dict.values()[0], OrderedDict):
            testcase = result_dict.keys()[0]
            if 'success' in result_dict.values()[0]:
                testresult = 'pass'
            if 'status' in result_dict.values()[0]:
                testresult = 'pass'  # FIXME
            # 8125/singlenode-intermediate/tar-tgz
            return mark_safe('%s - %s - %s' % (
                testdef, testcase, testresult
            ))
    else:
        return None


@register.filter()
def metadata_key(key, index=0):
    return '.'.join(key.split('.')[index:]).replace('definition.', '')


@register.filter()
def markup_metadata(key, value):
    if 'target.device_type' in key:
        return mark_safe("<a href='/scheduler/device_type/%s'>%s</a>" % (value, value))
    elif 'target.hostname' in key:
        return mark_safe("<a href='/scheduler/device/%s'>%s</a>" % (value, value))
    elif 'definition.repository' in key:
        repo = value.replace('git:', 'http:')
        return mark_safe("<a href='%s'>%s</a>" % (repo, value))
    else:
        return value


@register.filter()
def markup_completion(data):
    return [key for key, _ in data.items() if 'test' in key]