aboutsummaryrefslogtreecommitdiff
path: root/ovsdb/ovsdbmonitor/OVEFetch.py
blob: 8bc5757c5930aaba4f45b3eef5da75d837630b3d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
# Copyright (c) 2011 Nicira Networks.
# Copyright (c) 2010 Citrix Systems, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from OVEStandard import *
from OVEConfig import *
from OVELogger import *
import ovs.json

# This sequence installs the qt4reactor before twisted gets a chance to install its reactor
import qt4reactor
globalApp = QtGui.QApplication([])
qt4reactor.install()

try:
    from twisted.conch.ssh import transport, userauth, connection, common, keys, channel
    from twisted.internet import defer, protocol, reactor
    from twisted.application import reactors
except Exception, e:
    print('+++ Python Twisted Conch module is required\n')
    raise

class OVEFetchUserAuth(userauth.SSHUserAuthClient):
    def __init__(self, fetch, *params):
        userauth.SSHUserAuthClient.__init__(self, *params)
        self.fetch = fetch
        self.authFails = 0
    
    def getPassword(self):
        return defer.succeed(self.fetch.config()['password'])

    def ssh_USERAUTH_FAILURE(self, packet):
        if self.authFails > 0: # We normally get one so ignore.  Real failures send these repeatedly
            OVELog('Authentication failure for '+self.fetch.config()['address'])
        self.authFails += 1
        userauth.SSHUserAuthClient.ssh_USERAUTH_FAILURE(self, packet)

class OVEFetchConnection(connection.SSHConnection, QtCore.QObject):
    def __init__(self, fetch, *params):
        connection.SSHConnection.__init__(self, *params)
        QtCore.QObject.__init__(self)
        self.fetch = fetch
        self._channel = None
        self._oldChannels = []
        
    def serviceStarted(self):
        self.emit(QtCore.SIGNAL('connectionService(QObject)'), self)

    def serviceStopped(self):
        self.emit(QtCore.SIGNAL('connectionService(QObject)'), None)

    def execCommand(self, requester, ref, command, commandType):
        if self._channel is not None:
            # Don't delete old channels immediately in case they're e.g. going to time out with a failure
            self._oldChannels.append(self._channel)
            if len(self._oldChannels) > 90:
                # For 30 second timeouts at 1 second refresh interval and three windows open on a single host, need 90 channels
                del self._oldChannels[1]
        self._channel = OVECommandChannel(self.fetch, requester, ref, command, commandType, 2**16, 2**15, self)
        self.openChannel(self._channel)

    def connectionLost(self, reason):
        if self._channel is not None:
            self._channel.connectionLost(reason)

class OVEFetchTransport(transport.SSHClientTransport, QtCore.QObject):
    def __init__(self, fetch, *params):
        # There is no __init__ method for this class
        # transport.SSHClientTransport.__init__(self, *params)
        
        QtCore.QObject.__init__(self)
        self.fetch = fetch
        self._connection = None
        self.connect(self, QtCore.SIGNAL('channelFailure(QObject, int, QString, QString, QString)'), self.fetch.xon_channelFailure)
        
    def verifyHostKey(self, hostKey, fingerprint):
        return defer.succeed(1)

    def connectionSecure(self):
        self._connection = OVEFetchConnection(self.fetch)
        QtCore.QObject.connect(self._connection, QtCore.SIGNAL('connectionService(QObject)'), self.fetch.xon_connectionService)
        self.requestService(
            OVEFetchUserAuth(self.fetch, self.fetch.config().get('username', 'root'),
                self._connection))

    def connectionLost(self, reason):
        if self._connection is not None:
            self._connection.connectionLost(reason)

class OVEFetchWrapper:
    def __init__(self, contents):
        self.contents = contents

class OVECommandChannel(channel.SSHChannel, QtCore.QObject):
    name = 'session'
    MSEC_TIMEOUT=10000
    STATUS_CONNECTION_LOST = 100001
    STATUS_TIMEOUT = 100002
    END_MARKER='END-MARKER'
    END_MARKER_RE=re.compile(r'^END-MARKER$', re.MULTILINE)
    
    def __init__(self, fetch, requester, ref, command, commandType, *params):
        channel.SSHChannel.__init__(self, *params)
        QtCore.QObject.__init__(self)        
        self.fetch = fetch
        self.requester = requester
        self.ref = ref
        self.command = command
        self.commandType= commandType
        self._data = ''
        self._extData = ''
        self._jsonValues = None
        self._timerId = None
        self._status = None
        self.connect(self, QtCore.SIGNAL('channelData(QObject, int, QString)'), self.fetch.xon_channelData)
        self.connect(self, QtCore.SIGNAL('channelExtData(QObject, int, QString)'), self.fetch.xon_channelExtData)
        self.connect(self, QtCore.SIGNAL('channelSuccess(QObject, int, QString, QString, QVariant)'), self.fetch.xon_channelSuccess)
        self.connect(self, QtCore.SIGNAL('channelFailure(QObject, int, QString, QString, QString)'), self.fetch.xon_channelFailure)
        
    def openFailed(self, reason):
        if self._timerId is not None:
            self.killTimer(self._timerId)
        self.emit(QtCore.SIGNAL('channelFailure(QObject, int, QString, QString, QString)'), self.requester, self.ref,
            'Open failed:'+str(reason), '', '')

    def channelOpen(self, ignoredData):
        try:
            nsCommand = common.NS(str(self.command))
            self._timerId = self.startTimer(self.MSEC_TIMEOUT)
            self.conn.sendRequest(self, 'exec', nsCommand, wantReply=1)
        except Exception, e:
            self.emit(QtCore.SIGNAL('channelFailure(QObject, int, QString, QString, QString)'), self.requester, self.ref,
                'Open failed:'+str(e), self._data, self._extData)
            
    def dataReceived(self, data):
        self._data += data
        if OVEConfig.Inst().logTraffic:
            self.emit(QtCore.SIGNAL('channelData(QObject, int, QString)'), self.requester, self.ref, data)
        self.testIfDone()
        
    def extDataReceived(self, extData):
        self._extData += extData
        if OVEConfig.Inst().logTraffic:
            self.emit(QtCore.SIGNAL('channelExtData(QObject, int, QString)'), self.requester, self.ref, extData)

    def request_exit_status(self, data):
        # We can get the exit status before the data, so delay calling sendResult until we get both
        self._status = struct.unpack('>L', data)[0]
        self.testIfDone()
        
    def testIfDone(self):
        if self._status is not None:
            if self._status != 0:
                self.sendResult() # Failed, so send what we have
            elif len(self._data) > 0:
                # Status == success and we have some data
                if self.commandType == 'JSON':
                    try:
                        # Decode the JSON data, to confirm that we have all of the data
                        self._jsonValues = ovs.json.from_string(str(self._data)) # FIXME: Should handle unicode
                        self.sendResult()
                    except:
                        pass # Wait for more data
                elif self.commandType == 'framed':
                    match = self.END_MARKER_RE.search(self._data)
                    if match:
                        self._data = self._data[:match.start()] # Remove end marker
                        self.sendResult()
                else:
                    OVELog('Bad command type')

    def sendResult(self):
        if self._timerId is not None:
            self.killTimer(self._timerId)
        if self.commandType == 'JSON' and self._status == 0 and self._jsonValues is not None:
            self.emit(QtCore.SIGNAL('channelSuccess(QObject, int, QString, QString, QVariant)'), self.requester, self.ref, self._data, self._extData, QVariant(OVEFetchWrapper(self._jsonValues)))
        elif self.commandType != 'JSON' and self._status == 0:
            self.emit(QtCore.SIGNAL('channelSuccess(QObject, int, QString, QString, QVariant)'), self.requester, self.ref, self._data, self._extData, QVariant(None))
        else:
            self.emit(QtCore.SIGNAL('channelFailure(QObject, int, QString, QString, QString)'), self.requester, self.ref, 'Remote command failed (rc='+str(self._status)+')', self._data, self._extData)
        if self._status != self.STATUS_CONNECTION_LOST:
            try:
                self.loseConnection()
            except Exception, e:
                OVELog('OVECommandChannel.sendResult loseConnection error: '+str(e))

    def connectionLost(self, reason):
        self._extData += '+++ Connection lost'
        self._status = self.STATUS_CONNECTION_LOST
        self.sendResult()

    def timerEvent(self, event):
        if event.timerId() == self._timerId:
            self._extData += '+++ Timeout'
            self._status = self.STATUS_TIMEOUT
            self.sendResult()
        else:
            QtCore.QObject.timerEvent(self, event)

class OVEFetchEvent(QtCore.QEvent):
    TYPE = QtCore.QEvent.Type(QtCore.QEvent.registerEventType())
    def __init__(self, ref, data):
        QtCore.QEvent.__init__(self, self.TYPE)
        self.ref = ref
        self.data = data

class OVEFetchFailEvent(QtCore.QEvent):
    TYPE = QtCore.QEvent.Type(QtCore.QEvent.registerEventType())
    def __init__(self, ref, message):
        QtCore.QEvent.__init__(self, self.TYPE)
        self.ref = ref
        self.message = str(message)

class OVEFetch(QtCore.QObject):
    instances = {}
    SEC_TIMEOUT = 10.0
    
    def __init__(self, uuid):
        QtCore.QObject.__init__(self)
        self._hostUuid = uuid
        self._config = None
        self._transport = None
        self._connection = None
        self._commandQueue = []
        self._timerRef = 0
        self.refs = {}
        self.messages = {}
        self.values = {}
        self.connect(OVEConfig.Inst(), QtCore.SIGNAL("configUpdated()"), self.xon_configUpdated)
        
    @classmethod
    def Inst(cls, uuid):
        if uuid not in cls.instances:
            cls.instances[uuid] = OVEFetch(uuid)
        return cls.instances[uuid]

    @classmethod
    def startReactor(cls):
        reactor.runReturn()

    def xon_configUpdated(self):
        self._config = None
        self.resetTransport()
        
    def xon_connectionService(self, connection):
        self._connection = connection
        if self._connection is not None:
            OVELog('SSH connection to '+self.config()['address'] +' established')
            for command in self._commandQueue:
                # OVELog('Unqueueing '+str(command))
                self.execCommand2(*command)
            self._commandQueue = []

    def xon_channelData(self, requester, ref, data):
        if OVEConfig.Inst().logTraffic:
            OVELog('Channel data received: '+str(data))

    def xon_channelExtData(self, requester, ref, data):
        if OVEConfig.Inst().logTraffic:
            OVELog('+++ Channel extData (stderr) received: '+str(data))

    def xon_channelFailure(self, requester, ref, message, data, extData):
        if OVEConfig.Inst().logTraffic:
            OVELog('+++ Channel failure: '+str(message))
            OVELog("Closing SSH session due to failure")

        errMessage = message
        if len(data) > 0:
            errMessage += '\n+++ Failed command output: '+data
        if len(extData) > 0:
            errMessage += '\n+++ Failed command output (stderr): '+extData

        self.refs[requester] = ref # For PySide workaround
        self.messages[requester] = errMessage # For PySide workaround
        event = OVEFetchFailEvent(ref, errMessage)
        QtCore.QCoreApplication.postEvent(requester, event)
        self.resetTransport()
        
    def xon_channelSuccess(self, requester, ref, data, extData, jsonValueVariant):
        jsonValues = jsonValueVariant.toPyObject()
        if OVEConfig.Inst().logTraffic:
            OVELog('--- Channel success')
        try:
            if jsonValues is not None:
                values = jsonValues.contents
            else:
                values = str(data)

            self.refs[requester] = ref # For PySide workaround
            self.values[requester] = values # For PySide workaround
            event = OVEFetchEvent(ref, values)
            QtCore.QCoreApplication.postEvent(requester, event)
        except Exception, e:
            message = ('+++ Failed to decode JSON reply: '+str(e))
            if len(data) > 0: message += "\n++++++ Data (stdout): "+str(data)
            if len(extData) > 0: message += '\n++++++ Error (stderr): '+str(extData)
            self.refs[requester] = ref # For PySide workaround
            self.messages[requester] = message # For PySide workaround
            event = OVEFetchFailEvent(ref, message)
            QtCore.QCoreApplication.postEvent(requester, event)

    # Use for workaround only
    def snoopRef(self, requester):
        return self.refs.get(requester, None)

    # Use for workaround only
    def snoopValues(self, requester):
        return self.values.get(requester, None)

    # Use for workaround only
    def snoopMessage(self, requester):
        return self.messages.get(requester, None)

    def config(self):
        if self._config is None:
            self._config = OVEConfig.Inst().hostFromUuid(self._hostUuid)

        return self._config
    
    def resetTransport(self):
        if OVEConfig.Inst().logTraffic:
            OVELog('Transport reset for '+self.config()['address'])
        del self._connection
        del self._transport
        self._connection = None
        self._transport = None
        
    def transportErrback(self, failure, requester, ref, address):
        self._timerRef += 1 # Prevent timeout handling
        self.resetTransport()
        message = 'Failure connecting to '+address+': '+failure.getErrorMessage()
        self.refs[requester] = ref # For PySide workaround
        self.messages[requester] = message # For PySide workaround
        event = OVEFetchFailEvent(ref, message)
        QtCore.QCoreApplication.postEvent(requester, event)        
        
    def transportTimeout(self, timerRef, requester, ref, address):
        if self._timerRef == timerRef and self._transport is not None and self._connection is None:
            message = 'Connection attempt to ' +address+' timed out'
            self.refs[requester] = ref # For PySide workaround
            self.messages[requester] = message # For PySide workaround
            event = OVEFetchFailEvent(ref, message)
            QtCore.QCoreApplication.postEvent(requester, event)        
            self.resetTransport()

    def execCommand(self, requester, ref, command, commandType):
        if OVEConfig.Inst().logTraffic:
            hostName = (self.config() or {}).get('address', '<Address not set>')
            OVELog(str(QtCore.QTime.currentTime().toString())+' '+hostName+': Executing '+command)
        if self._transport is None:
            self._connection = None
            self._commandQueue.append((requester, ref, command, commandType))
            config = self.config()
            creator = protocol.ClientCreator(reactor, OVEFetchTransport, self)
            self._transport = creator.connectTCP(config['address'], config.get('port', 22), timeout = self.SEC_TIMEOUT)
            self._transport.addErrback(self.transportErrback, requester, ref, config['address'])
            self._timerRef += 1
            # Set this timer slightly longer than the twisted.conch timeout, as transportErrback can cancel
            # the timeout and prevent double handling
            # lambda timerRef = self._timerRef: takes a copy of self._timerRef
            QtCore.QTimer.singleShot(int((1+self.SEC_TIMEOUT) * 1000), lambda timerRef = self._timerRef: self.transportTimeout(timerRef, requester, ref, config['address']))
        else:
            self.execCommand2(requester, ref, command, commandType)

    def execCommand2(self, requester, ref, command, commandType):
        if self._connection is None:
            self._commandQueue.append((requester, ref, command, commandType))
        else:
            self._connection.execCommand(requester, ref, command, commandType)

    def getTable(self, requester, tableName, ref = QtCore.QObject()):
        command = '/usr/bin/ovsdb-client transact '+self.config()['connectTarget']+' \'["Open_vSwitch", {"op":"select","table":"'+tableName+'", "where":[]}]\''

        self.execCommand(requester, ref, command, 'JSON')
        
    def execCommandFramed(self, requester, ref, command):
        self.execCommand(requester, ref, command + ' && echo ' + OVECommandChannel.END_MARKER, 'framed')