aboutsummaryrefslogtreecommitdiff
path: root/wlauto/external/daq_server/src/daqpower/server.py
blob: e33a5f9c99487d952a4caef605b89df7d129ae97 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
#    Copyright 2014-2015 ARM Limited
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#


# pylint: disable=E1101,W0613
from __future__ import division
import os
import sys
import argparse
import shutil
import socket
import time
from datetime import datetime, timedelta

from zope.interface import implements
from twisted.protocols.basic import LineReceiver
from twisted.internet.protocol import Factory, Protocol
from twisted.internet import reactor, interfaces
from twisted.internet.error import ConnectionLost, ConnectionDone


if __name__ == "__main__":  # for debugging
    sys.path.append(os.path.join(os.path.dirname(__file__), '..'))
from daqpower import log
from daqpower.config import DeviceConfiguration
from daqpower.common import DaqServerRequest, DaqServerResponse, Status
try:
    from daqpower.daq import DaqRunner, list_available_devices, CAN_ENUMERATE_DEVICES
    __import_error = None
except ImportError as e:
    # May be using debug mode.
    __import_error = e
    DaqRunner = None
    list_available_devices = lambda: ['Dev1']


class ProtocolError(Exception):
    pass


class DummyDaqRunner(object):
    """Dummy stub used when running in debug mode."""

    num_rows = 200

    @property
    def number_of_ports(self):
        return self.config.number_of_ports

    def __init__(self, config, output_directory):
        log.info('Creating runner with {} {}'.format(config, output_directory))
        self.config = config
        self.output_directory = output_directory
        self.is_running = False

    def start(self):
        import csv, random
        log.info('runner started')
        for i in xrange(self.config.number_of_ports):
            rows = [['power', 'voltage']] + [[random.gauss(1.0, 1.0), random.gauss(1.0, 0.1)]
                                             for _ in xrange(self.num_rows)]
            with open(self.get_port_file_path(self.config.labels[i]), 'wb') as wfh:
                writer = csv.writer(wfh)
                writer.writerows(rows)

        self.is_running = True

    def stop(self):
        self.is_running = False
        log.info('runner stopped')

    def get_port_file_path(self, port_id):
        if port_id in self.config.labels:
            return os.path.join(self.output_directory, '{}.csv'.format(port_id))
        else:
            raise Exception('Invalid port id: {}'.format(port_id))


class DaqServer(object):

    def __init__(self, base_output_directory):
        self.base_output_directory = os.path.abspath(base_output_directory)
        if os.path.isdir(self.base_output_directory):
            log.info('Using output directory: {}'.format(self.base_output_directory))
        else:
            log.info('Creating new output directory: {}'.format(self.base_output_directory))
            os.makedirs(self.base_output_directory)
        self.runner = None
        self.output_directory = None
        self.labels = None

    def configure(self, config_string):
        message = None
        if self.runner:
            message = 'Configuring a new session before previous session has been terminated.'
            log.warning(message)
            if self.runner.is_running:
                self.runner.stop()
        config = DeviceConfiguration.deserialize(config_string)
        config.validate()
        self.output_directory = self._create_output_directory()
        self.labels = config.labels
        log.info('Writing port files to {}'.format(self.output_directory))
        self.runner = DaqRunner(config, self.output_directory)
        return message

    def start(self):
        if self.runner:
            if not self.runner.is_running:
                self.runner.start()
            else:
                message = 'Calling start() before stop() has been called. Data up to this point will be lost.'
                log.warning(message)
                self.runner.stop()
                self.runner.start()
                return message
        else:
            raise ProtocolError('Start called before a session has been configured.')

    def stop(self):
        if self.runner:
            if self.runner.is_running:
                self.runner.stop()
            else:
                message = 'Attempting to stop() before start() was invoked.'
                log.warning(message)
                self.runner.stop()
                return message
        else:
            raise ProtocolError('Stop called before a session has been configured.')

    def list_devices(self):  # pylint: disable=no-self-use
        return list_available_devices()

    def list_ports(self):
        return self.labels

    def list_port_files(self):
        if not self.runner:
            raise ProtocolError('Attempting to list port files before session has been configured.')
        ports_with_files = []
        for port_id in self.labels:
            path = self.get_port_file_path(port_id)
            if os.path.isfile(path):
                ports_with_files.append(port_id)
        return ports_with_files

    def get_port_file_path(self, port_id):
        if not self.runner:
            raise ProtocolError('Attepting to get port file path before session has been configured.')
        return self.runner.get_port_file_path(port_id)

    def terminate(self):
        message = None
        if self.runner:
            if self.runner.is_running:
                message = 'Terminating session before runner has been stopped.'
                log.warning(message)
                self.runner.stop()
            self.runner = None
            if self.output_directory and os.path.isdir(self.output_directory):
                shutil.rmtree(self.output_directory)
            self.output_directory = None
            log.info('Session terminated.')
        else:  # Runner has not been created.
            message = 'Attempting to close session before it has been configured.'
            log.warning(message)
        return message

    def _create_output_directory(self):
        basename = datetime.now().strftime('%Y-%m-%d_%H%M%S%f')
        dirname = os.path.join(self.base_output_directory, basename)
        os.makedirs(dirname)
        return dirname

    def __del__(self):
        if self.runner:
            self.runner.stop()

    def __str__(self):
        return '({})'.format(self.base_output_directory)

    __repr__ = __str__


class DaqControlProtocol(LineReceiver):  # pylint: disable=W0223

    def __init__(self, daq_server):
        self.daq_server = daq_server
        self.factory = None

    def lineReceived(self, line):
        line = line.strip()
        log.info('Received: {}'.format(line))
        try:
            request = DaqServerRequest.deserialize(line)
        except Exception, e:  # pylint: disable=W0703
            # PyDAQmx exceptions use "mess" rather than the standard "message"
            # to pass errors...
            message = getattr(e, 'mess', e.message)
            self.sendError('Received bad request ({}: {})'.format(e.__class__.__name__, message))
        else:
            self.processRequest(request)

    def processRequest(self, request):
        try:
            if request.command == 'configure':
                self.configure(request)
            elif request.command == 'start':
                self.start(request)
            elif request.command == 'stop':
                self.stop(request)
            elif request.command == 'list_devices':
                self.list_devices(request)
            elif request.command == 'list_ports':
                self.list_ports(request)
            elif request.command == 'list_port_files':
                self.list_port_files(request)
            elif request.command == 'pull':
                self.pull_port_data(request)
            elif request.command == 'close':
                self.terminate(request)
            else:
                self.sendError('Received unknown command: {}'.format(request.command))
        except Exception, e:  # pylint: disable=W0703
            message = getattr(e, 'mess', e.message)
            self.sendError('{}: {}'.format(e.__class__.__name__, message))

    def configure(self, request):
        if 'config' in request.params:
            result = self.daq_server.configure(request.params['config'])
            if not result:
                self.sendResponse(Status.OK)
            else:
                self.sendResponse(Status.OKISH, message=result)
        else:
            self.sendError('Invalid config; config string not provided.')

    def start(self, request):
        result = self.daq_server.start()
        if not result:
            self.sendResponse(Status.OK)
        else:
            self.sendResponse(Status.OKISH, message=result)

    def stop(self, request):
        result = self.daq_server.stop()
        if not result:
            self.sendResponse(Status.OK)
        else:
            self.sendResponse(Status.OKISH, message=result)

    def pull_port_data(self, request):
        if 'port_id' in request.params:
            port_id = request.params['port_id']
            port_file = self.daq_server.get_port_file_path(port_id)
            if os.path.isfile(port_file):
                port = self._initiate_file_transfer(port_file)
                self.sendResponse(Status.OK, data={'port_number': port})
            else:
                self.sendError('File for port {} does not exist.'.format(port_id))
        else:
            self.sendError('Invalid pull request; port id not provided.')

    def list_devices(self, request):
        if CAN_ENUMERATE_DEVICES:
            devices = self.daq_server.list_devices()
            self.sendResponse(Status.OK, data={'devices': devices})
        else:
            message = "Server does not support DAQ device enumration"
            self.sendResponse(Status.OKISH, message=message)

    def list_ports(self, request):
        port_labels = self.daq_server.list_ports()
        self.sendResponse(Status.OK, data={'ports': port_labels})

    def list_port_files(self, request):
        port_labels = self.daq_server.list_port_files()
        self.sendResponse(Status.OK, data={'ports': port_labels})

    def terminate(self, request):
        status = Status.OK
        message = ''
        if self.factory.transfer_sessions:
            message = 'Terminating with file tranfer sessions in progress. '
            log.warning(message)
            for session in self.factory.transfer_sessions:
                self.factory.transferComplete(session)
        message += self.daq_server.terminate() or ''
        if message:
            status = Status.OKISH
        self.sendResponse(status, message)

    def sendError(self, message):
        log.error(message)
        self.sendResponse(Status.ERROR, message)

    def sendResponse(self, status, message=None, data=None):
        response = DaqServerResponse(status, message=message, data=data)
        self.sendLine(response.serialize())

    def sendLine(self, line):
        log.info('Responding: {}'.format(line))
        LineReceiver.sendLine(self, line.replace('\r\n', ''))

    def _initiate_file_transfer(self, filepath):
        sender_factory = FileSenderFactory(filepath, self.factory)
        connector = reactor.listenTCP(0, sender_factory)
        self.factory.transferInitiated(sender_factory, connector)
        return connector.getHost().port


class DaqFactory(Factory):

    protocol = DaqControlProtocol
    check_alive_period = 5 * 60
    max_transfer_lifetime = 30 * 60

    def __init__(self, server, cleanup_period=24 * 60 * 60, cleanup_after_days=5):
        self.server = server
        self.cleanup_period = cleanup_period
        self.cleanup_threshold = timedelta(cleanup_after_days)
        self.transfer_sessions = {}

    def buildProtocol(self, addr):
        proto = DaqControlProtocol(self.server)
        proto.factory = self
        reactor.callLater(self.check_alive_period, self.pulse)
        reactor.callLater(self.cleanup_period, self.perform_cleanup)
        return proto

    def clientConnectionLost(self, connector, reason):
        log.msg('client connection lost: {}.'.format(reason))
        if not isinstance(reason, ConnectionLost):
            log.msg('ERROR: Client terminated connection mid-transfer.')
            for session in self.transfer_sessions:
                self.transferComplete(session)

    def transferInitiated(self, session, connector):
        self.transfer_sessions[session] = (time.time(), connector)

    def transferComplete(self, session, reason='OK'):
        if reason != 'OK':
            log.error(reason)
        self.transfer_sessions[session][1].stopListening()
        del self.transfer_sessions[session]

    def pulse(self):
        """Close down any file tranfer sessions that have been open for too long."""
        current_time = time.time()
        for session in self.transfer_sessions:
            start_time, conn = self.transfer_sessions[session]
            if (current_time - start_time) > self.max_transfer_lifetime:
                message = '{} session on port {} timed out'
                self.transferComplete(session, message.format(session, conn.getHost().port))
        if self.transfer_sessions:
            reactor.callLater(self.check_alive_period, self.pulse)

    def perform_cleanup(self):
        """
        Cleanup and old uncollected data files to recover disk space.

        """
        log.msg('Performing cleanup of the output directory...')
        base_directory = self.server.base_output_directory
        current_time = datetime.now()
        for entry in os.listdir(base_directory):
            entry_path = os.path.join(base_directory, entry)
            entry_ctime = datetime.fromtimestamp(os.path.getctime(entry_path))
            existence_time = current_time - entry_ctime
            if existence_time > self.cleanup_threshold:
                log.debug('Removing {} (existed for {})'.format(entry, existence_time))
                shutil.rmtree(entry_path)
            else:
                log.debug('Keeping {} (existed for {})'.format(entry, existence_time))
        log.msg('Cleanup complete.')

    def __str__(self):
        return '<DAQ {}>'.format(self.server)

    __repr__ = __str__


class FileReader(object):

    implements(interfaces.IPushProducer)

    def __init__(self, filepath):
        self.fh = open(filepath)
        self.proto = None
        self.done = False
        self._paused = True

    def setProtocol(self, proto):
        self.proto = proto

    def resumeProducing(self):
        if not self.proto:
            raise ProtocolError('resumeProducing called with no protocol set.')
        self._paused = False
        try:
            while not self._paused:
                line = self.fh.next().rstrip('\n') + '\r\n'
                self.proto.transport.write(line)
        except StopIteration:
            log.debug('Sent everything.')
            self.stopProducing()

    def pauseProducing(self):
        self._paused = True

    def stopProducing(self):
        self.done = True
        self.fh.close()
        self.proto.transport.unregisterProducer()
        self.proto.transport.loseConnection()


class FileSenderProtocol(Protocol):

    def __init__(self, reader):
        self.reader = reader
        self.factory = None

    def connectionMade(self):
        self.transport.registerProducer(self.reader, True)
        self.reader.resumeProducing()

    def connectionLost(self, reason=ConnectionDone):
        if self.reader.done:
            self.factory.transferComplete()
        else:
            self.reader.pauseProducing()
            self.transport.unregisterProducer()


class FileSenderFactory(Factory):

    @property
    def done(self):
        if self.reader:
            return self.reader.done
        else:
            return None

    def __init__(self, path, owner):
        self.path = os.path.abspath(path)
        self.reader = None
        self.owner = owner

    def buildProtocol(self, addr):
        if not self.reader:
            self.reader = FileReader(self.path)
        proto = FileSenderProtocol(self.reader)
        proto.factory = self
        self.reader.setProtocol(proto)
        return proto

    def transferComplete(self):
        self.owner.transferComplete(self)

    def __hash__(self):
        return hash(self.path)

    def __str__(self):
        return '<FileSender {}>'.format(self.path)

    __repr__ = __str__


def run_server():
    parser = argparse.ArgumentParser()
    parser.add_argument('-d', '--directory', help='Working directory', metavar='DIR', default='.')
    parser.add_argument('-p', '--port', help='port the server will listen on.',
                        metavar='PORT', default=45677, type=int)
    parser.add_argument('-c', '--cleanup-after', type=int, default=5, metavar='DAYS',
                        help="""
                        Sever will perodically clean up data files that are older than the number of
                        days specfied by this parameter.
                        """)
    parser.add_argument('--cleanup-period', type=int, default=1, metavar='DAYS',
                        help='Specifies how ofte the server will attempt to clean up old files.')
    parser.add_argument('--debug', help='Run in debug mode (no DAQ connected).',
                        action='store_true', default=False)
    parser.add_argument('--verbose', help='Produce verobose output.', action='store_true', default=False)
    args = parser.parse_args()

    if args.debug:
        global DaqRunner  # pylint: disable=W0603
        DaqRunner = DummyDaqRunner
    else:
        if not DaqRunner:
            raise __import_error  # pylint: disable=raising-bad-type
    if args.verbose or args.debug:
        log.start_logging('DEBUG')
    else:
        log.start_logging('INFO')

    # days to seconds
    cleanup_period = args.cleanup_period * 24 * 60 * 60

    server = DaqServer(args.directory)
    factory = DaqFactory(server, cleanup_period, args.cleanup_after)
    reactor.listenTCP(args.port, factory).getHost()
    try:
        hostname = socket.gethostbyname(socket.gethostname())
    except socket.gaierror:
        hostname = 'localhost'
    log.info('Listening on {}:{}'.format(hostname, args.port))
    reactor.run()


if __name__ == "__main__":
    run_server()