diff options
author | Ansis Atteka <aatteka@nicira.com> | 2011-10-31 14:56:08 -0700 |
---|---|---|
committer | Ansis Atteka <aatteka@nicira.com> | 2011-11-18 10:39:20 -0800 |
commit | 0be6140a9a7de46f07e09d3ba200bd7f0cf73838 (patch) | |
tree | 74e7d7739f3fd6421912cce12be347068949f863 /python | |
parent | 5e9ceccdb69b7e8f519ebeb9d2825b2686810610 (diff) |
ovs-test: A new tool that allows to diagnose connectivity and performance issues
This tool will be a replacement for the current ovs-vlan-test
utility. Besides from connectivity issues it will also be able
to detect performance related issues in Open vSwitch setups.
Currently it uses UDP and TCP protocols for stressing.
Issue #6976
Diffstat (limited to 'python')
-rw-r--r-- | python/automake.mk (renamed from python/ovs/automake.mk) | 12 | ||||
-rw-r--r-- | python/ovstest/__init__.py | 1 | ||||
-rw-r--r-- | python/ovstest/args.py | 115 | ||||
-rw-r--r-- | python/ovstest/rpcserver.py | 203 | ||||
-rw-r--r-- | python/ovstest/tcp.py | 139 | ||||
-rw-r--r-- | python/ovstest/udp.py | 90 | ||||
-rw-r--r-- | python/ovstest/util.py | 74 |
7 files changed, 632 insertions, 2 deletions
diff --git a/python/ovs/automake.mk b/python/automake.mk index 22473280..089ef365 100644 --- a/python/ovs/automake.mk +++ b/python/automake.mk @@ -1,5 +1,13 @@ run_python = PYTHONPATH=$(top_srcdir)/python:$$PYTHON_PATH $(PYTHON) +ovstest_pyfiles = \ + python/ovstest/__init__.py \ + python/ovstest/args.py \ + python/ovstest/rpcserver.py \ + python/ovstest/tcp.py \ + python/ovstest/udp.py \ + python/ovstest/util.py + ovs_pyfiles = \ python/ovs/__init__.py \ python/ovs/daemon.py \ @@ -22,10 +30,10 @@ ovs_pyfiles = \ python/ovs/timeval.py \ python/ovs/vlog.py \ python/ovs/util.py -EXTRA_DIST += $(ovs_pyfiles) python/ovs/dirs.py +EXTRA_DIST += $(ovs_pyfiles) python/ovs/dirs.py $(ovstest_pyfiles) if HAVE_PYTHON -nobase_pkgdata_DATA = $(ovs_pyfiles) +nobase_pkgdata_DATA = $(ovs_pyfiles) $(ovstest_pyfiles) ovs-install-data-local: $(MKDIR_P) python/ovs (echo "import os" && \ diff --git a/python/ovstest/__init__.py b/python/ovstest/__init__.py new file mode 100644 index 00000000..218d8921 --- /dev/null +++ b/python/ovstest/__init__.py @@ -0,0 +1 @@ +# This file intentionally left blank. diff --git a/python/ovstest/args.py b/python/ovstest/args.py new file mode 100644 index 00000000..d6b47568 --- /dev/null +++ b/python/ovstest/args.py @@ -0,0 +1,115 @@ +# Copyright (c) 2011 Nicira Networks +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +ovsargs provide argument parsing for ovs-test utility +""" + +import argparse +import socket +import re + + +def ip(string): + """Verifies if string is a valid IP address""" + try: + socket.inet_aton(string) + except socket.error: + raise argparse.ArgumentTypeError("Not a valid IPv4 address") + return string + + +def port(string): + """Convert a string into a Port (integer)""" + try: + port_number = int(string) + if port_number < 1 or port_number > 65535: + raise argparse.ArgumentTypeError("Port is out of range") + except ValueError: + raise argparse.ArgumentTypeError("Port is not an integer") + return port_number + + +def ip_optional_port(string, default_port): + """Convert a string into IP and Port pair. If port was absent then use + default_port as the port""" + value = string.split(':') + if len(value) == 1: + return (ip(value[0]), default_port) + elif len(value) == 2: + return (ip(value[0]), port(value[1])) + else: + raise argparse.ArgumentTypeError("IP address from the optional Port " + "must be colon-separated") + + + +def server_endpoint(string): + """Converts a string in ControlIP[:ControlPort][,TestIP[:TestPort]] format + into a 4-tuple, where: + 1. First element is ControlIP + 2. Second element is ControlPort (if omitted will use default value 15531) + 3 Third element is TestIP (if omitted will be the same as ControlIP) + 4. Fourth element is TestPort (if omitted will use default value 15532)""" + value = string.split(',') + if len(value) == 1: # TestIP and TestPort are not present + ret = ip_optional_port(value[0], 15531) + return (ret[0], ret[1], ret[0], 15532) + elif len(value) == 2: + ret1 = ip_optional_port(value[0], 15531) + ret2 = ip_optional_port(value[1], 15532) + return (ret1[0], ret1[1], ret2[0], ret2[1]) + else: + raise argparse.ArgumentTypeError("ControlIP:ControlPort and TestIP:" + "TestPort must be comma " + "separated") + + +def bandwidth(string): + """Convert a string (given in bits/second with optional magnitude for + units) into a long (bytes/second)""" + if re.match("^[1-9][0-9]*[MK]?$", string) == None: + raise argparse.ArgumentTypeError("Not a valid target bandwidth") + bwidth = string.replace("M", "000000") + bwidth = bwidth.replace("K", "000") + return long(bwidth) / 8 # Convert from bits to bytes + + +def ovs_initialize_args(): + """Initialize args for ovstest utility""" + parser = argparse.ArgumentParser(description = 'Test ovs connectivity') + parser.add_argument('-v', '--version', action = 'version', + version = 'ovs-test (Open vSwitch) @VERSION@') + parser.add_argument("-b", "--bandwidth", action = 'store', + dest = "targetBandwidth", default = "1M", type = bandwidth, + help = 'target bandwidth for UDP tests in bits/second. Use ' + 'postfix M or K to alter unit magnitude.') + group = parser.add_mutually_exclusive_group(required = True) + group.add_argument("-s", "--server", action = "store", dest = "port", + type = port, + help = 'run in server mode and wait client to connect to this ' + 'port') + group.add_argument('-c', "--client", action = "store", nargs = 2, + dest = "servers", type = server_endpoint, + metavar = ("SERVER1", "SERVER2"), + help = 'run in client mode and do tests between these ' + 'two servers. Each server must be specified in following ' + 'format - ControlIP[:ControlPort][,TestIP[:TestPort]]. If ' + 'TestIP is omitted then ovs-test server will also use the ' + 'ControlIP for testing purposes. ControlPort is TCP port ' + 'where server will listen for incoming XML/RPC control ' + 'connections to schedule tests (by default 15531). TestPort ' + 'is port which will be used by server to send test traffic ' + '(by default 15532)') + return parser.parse_args() diff --git a/python/ovstest/rpcserver.py b/python/ovstest/rpcserver.py new file mode 100644 index 00000000..41d25694 --- /dev/null +++ b/python/ovstest/rpcserver.py @@ -0,0 +1,203 @@ +# Copyright (c) 2011 Nicira Networks +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +rpcserver is an XML RPC server that allows RPC client to initiate tests +""" + +from twisted.internet import reactor +from twisted.web import xmlrpc, server +from twisted.internet.error import CannotListenError +import udp +import tcp +import args +import util + + +class TestArena(xmlrpc.XMLRPC): + """ + This class contains all the functions that ovstest will call + remotely. The caller is responsible to use designated handleIds + for designated methods (e.g. do not mix UDP and TCP handles). + """ + + def __init__(self): + xmlrpc.XMLRPC.__init__(self) + self.handle_id = 1 + self.handle_map = {} + + def __acquire_handle(self, value): + """ + Allocates new handle and assigns value object to it + """ + handle = self.handle_id + self.handle_map[handle] = value + self.handle_id += 1 + return handle + + def __get_handle_resources(self, handle): + """ + Return resources that were assigned to handle + """ + return self.handle_map[handle] + + def __delete_handle(self, handle): + """ + Releases handle from handle_map + """ + del self.handle_map[handle] + + + def xmlrpc_create_udp_listener(self, port): + """ + Creates a UDP listener that will receive packets from UDP sender + """ + try: + listener = udp.UdpListener() + reactor.listenUDP(port, listener) + handle_id = self.__acquire_handle(listener) + except CannotListenError: + return -1 + return handle_id + + def xmlrpc_create_udp_sender(self, host, count, size, duration): + """ + Send UDP datagrams to UDP listener + """ + sender = udp.UdpSender(tuple(host), count, size, duration) + reactor.listenUDP(0, sender) + handle_id = self.__acquire_handle(sender) + return handle_id + + def xmlrpc_get_udp_listener_results(self, handle): + """ + Returns number of datagrams that were received + """ + listener = self.__get_handle_resources(handle) + return listener.getResults() + + def xmlrpc_get_udp_sender_results(self, handle): + """ + Returns number of datagrams that were sent + """ + sender = self.__get_handle_resources(handle) + return sender.getResults() + + def xmlrpc_close_udp_listener(self, handle): + """ + Releases UdpListener and all its resources + """ + listener = self.__get_handle_resources(handle) + listener.transport.stopListening() + self.__delete_handle(handle) + return 0 + + def xmlrpc_close_udp_sender(self, handle): + """ + Releases UdpSender and all its resources + """ + sender = self.__get_handle_resources(handle) + sender.transport.stopListening() + self.__delete_handle(handle) + return 0 + + def xmlrpc_create_tcp_listener(self, port): + """ + Creates a TcpListener that will accept connection from TcpSender + """ + try: + listener = tcp.TcpListenerFactory() + port = reactor.listenTCP(port, listener) + handle_id = self.__acquire_handle((listener, port)) + return handle_id + except CannotListenError: + return -1 + + def xmlrpc_create_tcp_sender(self, his_ip, his_port, duration): + """ + Creates a TcpSender that will connect to TcpListener + """ + sender = tcp.TcpSenderFactory(duration) + connector = reactor.connectTCP(his_ip, his_port, sender) + handle_id = self.__acquire_handle((sender, connector)) + return handle_id + + def xmlrpc_get_tcp_listener_results(self, handle): + """ + Returns number of bytes received + """ + (listener, _) = self.__get_handle_resources(handle) + return listener.getResults() + + def xmlrpc_get_tcp_sender_results(self, handle): + """ + Returns number of bytes sent + """ + (sender, _) = self.__get_handle_resources(handle) + return sender.getResults() + + def xmlrpc_close_tcp_listener(self, handle): + """ + Releases TcpListener and all its resources + """ + try: + (_, port) = self.__get_handle_resources(handle) + port.loseConnection() + self.__delete_handle(handle) + except exceptions.KeyError: + return -1 + return 0 + + def xmlrpc_close_tcp_sender(self, handle): + """ + Releases TcpSender and all its resources + """ + try: + (_, connector) = self.__get_handle_resources(handle) + connector.disconnect() + self.__delete_handle(handle) + except exceptions.KeyError: + return -1 + return 0 + + + def xmlrpc_get_interface(self, address): + """ + Finds first interface that has given address + """ + return util.get_interface(address) + + def xmlrpc_get_interface_mtu(self, iface): + """ + Returns MTU of the given interface + """ + return util.get_interface_mtu(iface) + + def xmlrpc_uname(self): + """ + Return information about running kernel + """ + return util.uname() + + def xmlrpc_get_driver(self, iface): + """ + Returns driver version + """ + return util.get_driver(iface) + + +def start_rpc_server(port): + RPC_SERVER = TestArena() + reactor.listenTCP(port, server.Site(RPC_SERVER)) + reactor.run() diff --git a/python/ovstest/tcp.py b/python/ovstest/tcp.py new file mode 100644 index 00000000..33dc7192 --- /dev/null +++ b/python/ovstest/tcp.py @@ -0,0 +1,139 @@ +# Copyright (c) 2011 Nicira Networks +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +tcp module contains listener and sender classes for TCP protocol +""" + +from twisted.internet.protocol import Factory, ClientFactory, Protocol +from twisted.internet import interfaces +from zope.interface import implements +import time + + +class TcpListenerConnection(Protocol): + """ + This per-connection class is instantiated each time sender connects + """ + def __init__(self): + self.stats = 0 + + def connectionMade(self): + print "Started TCP Listener connection" + + def dataReceived(self, data): + self.stats += len(data) + + def connectionLost(self, reason): + print "Stopped TCP Listener connection" + self.factory.stats += self.stats + + +class TcpListenerFactory(Factory): + """ + This per-listening socket class is used to + instantiate TcpListenerConnections + """ + protocol = TcpListenerConnection + + def __init__(self): + self.stats = 0 + + def startFactory(self): + print "Starting TCP listener factory" + + def stopFactory(self): + print "Stopping TCP listener factory" + + def getResults(self): + """ returns the number of bytes received as string""" + #XML RPC does not support 64bit int (http://bugs.python.org/issue2985) + #so we have to convert the amount of bytes into a string + return str(self.stats) + + +class Producer(object): + implements(interfaces.IPushProducer) + """ + This producer class generates infinite byte stream for a specified time + duration + """ + def __init__(self, proto, duration): + self.proto = proto + self.start = time.time() + self.produced = 0 + self.paused = False + self.data = "X" * 65535 + self.duration = duration + + def pauseProducing(self): + """This function is called whenever write() to socket would block""" + self.paused = True + + def resumeProducing(self): + """This function is called whenever socket becomes writable""" + self.paused = False + current = time.time() + while (not self.paused) and (current < self.start + self.duration): + self.proto.transport.write(self.data) + self.produced += len(self.data) + current = time.time() + if current >= self.start + self.duration: + self.proto.factory.stats += self.produced + self.proto.transport.unregisterProducer() + self.proto.transport.loseConnection() + + def stopProducing(self): + pass + + +class TcpSenderConnection(Protocol): + """ + TCP connection instance class that sends all traffic at full speed. + """ + + def connectionMade(self): + print "Started TCP sender connection" + producer = Producer(self, self.factory.duration) + self.transport.registerProducer(producer, True) + producer.resumeProducing() + + def dataReceived(self, data): + print "Sender received data!", data + self.transport.loseConnection() + + def connectionLost(self, reason): + print "Stopped TCP sender connection" + + +class TcpSenderFactory(ClientFactory): + """ + This factory is responsible to instantiate TcpSenderConnection classes + each time sender initiates connection + """ + protocol = TcpSenderConnection + + def __init__(self, duration): + self.duration = duration + self.stats = 0 + + def startFactory(self): + print "Starting TCP sender factory" + + def stopFactory(self): + print "Stopping TCP sender factory" + + def getResults(self): + """Returns amount of bytes sent to the Listener (as a string)""" + return str(self.stats) diff --git a/python/ovstest/udp.py b/python/ovstest/udp.py new file mode 100644 index 00000000..e09569db --- /dev/null +++ b/python/ovstest/udp.py @@ -0,0 +1,90 @@ +# Copyright (c) 2011 Nicira Networks +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +ovsudp contains listener and sender classes for UDP protocol +""" + +from twisted.internet.protocol import DatagramProtocol +from twisted.internet.task import LoopingCall +import array, struct, time + + +class UdpListener(DatagramProtocol): + """ + Class that will listen for incoming UDP packets + """ + def __init__(self): + self.stats = [] + + def startProtocol(self): + print "Starting UDP listener" + + def stopProtocol(self): + print "Stopping UDP listener" + + def datagramReceived(self, data, (_1, _2)): + """This function is called each time datagram is received""" + try: + self.stats.append(struct.unpack_from("Q", data, 0)) + except struct.error: + pass #ignore packets that are less than 8 bytes of size + + def getResults(self): + """Returns number of packets that were actually received""" + return len(self.stats) + + +class UdpSender(DatagramProtocol): + """ + Class that will send UDP packets to UDP Listener + """ + def __init__(self, host, count, size, duration): + #LoopingCall does not know whether UDP socket is actually writable + self.looper = None + self.host = host + self.count = count + self.duration = duration + self.start = time.time() + self.sent = 0 + self.data = array.array('c', 'X' * size) + + def startProtocol(self): + print "Starting UDP sender" + self.looper = LoopingCall(self.sendData) + period = self.duration / float(self.count) + self.looper.start(period , now = False) + + def stopProtocol(self): + print "Stopping UDP sender" + if (self.looper is not None): + self.looper.stop() + self.looper = None + + def datagramReceived(self, data, (host, port)): + pass + + def sendData(self): + """This function is called from LoopingCall""" + if self.start + self.duration < time.time(): + self.looper.stop() + self.looper = None + + self.sent += 1 + struct.pack_into('Q', self.data, 0, self.sent) + self.transport.write(self.data, self.host) + + def getResults(self): + """Returns number of packets that were sent""" + return self.sent diff --git a/python/ovstest/util.py b/python/ovstest/util.py new file mode 100644 index 00000000..3321e693 --- /dev/null +++ b/python/ovstest/util.py @@ -0,0 +1,74 @@ +# Copyright (c) 2011 Nicira Networks +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +util module contains some helper function +""" +import socket, struct, fcntl, array, os, subprocess, exceptions + +def str_ip(ip): + (x1, x2, x3, x4) = struct.unpack("BBBB", ip) + return ("%u.%u.%u.%u") % (x1, x2, x3, x4) + +def get_interface_mtu(iface): + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + indata = iface + ('\0' * (32 - len(iface))) + try: + outdata = fcntl.ioctl(s.fileno(), 0x8921, indata) # socket.SIOCGIFMTU + mtu = struct.unpack("16si12x", outdata)[1] + except: + return 0 + + return mtu + +def get_interface(address): + """ + Finds first interface that has given address + """ + bytes = 256 * 32 + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + names = array.array('B', '\0' * bytes) + outbytes = struct.unpack('iL', fcntl.ioctl( + s.fileno(), + 0x8912, # SIOCGIFCONF + struct.pack('iL', bytes, names.buffer_info()[0]) + ))[0] + namestr = names.tostring() + + for i in range(0, outbytes, 40): + name = namestr[i:i + 16].split('\0', 1)[0] + if address == str_ip(namestr[i + 20:i + 24]): + return name + return "" # did not find interface we were looking for + +def uname(): + os_info = os.uname() + return os_info[2] #return only the kernel version number + +def get_driver(iface): + try: + p = subprocess.Popen( + ["ethtool", "-i", iface], + stdin = subprocess.PIPE, + stdout = subprocess.PIPE, + stderr = subprocess.PIPE) + out, err = p.communicate() + if p.returncode == 0: + lines = out.split("\n") + driver = "%s(%s)" % (lines[0], lines[1]) #driver name + version + else: + driver = "no support for ethtool" + except exceptions.OSError: + driver = "" + return driver |