aboutsummaryrefslogtreecommitdiff
path: root/utilities
diff options
context:
space:
mode:
authorMark Hamilton <mhamilton@nicira.com>2013-09-13 15:50:48 -0700
committerGurucharan Shetty <gshetty@nicira.com>2013-09-17 14:56:24 -0700
commit14b4c575c28421d1181b509dbeae6e4849c7da69 (patch)
treed32daa0d8acceb79786d403110d3163ceb5618b6 /utilities
parent1394054ecbc5d7abd5b1af3798da4dff2cfb268a (diff)
utilities: a top like tool for ovs-dpctl dump-flows.
This python script summarizes ovs-dpctl dump-flows content by aggregating the number of packets, total bytes and occurrence of the following fields: - Datapath in_port - Ethernet type - Source and destination MAC addresses - IP protocol - Source and destination IPv4 addresses - Source and destination IPv6 addresses - UDP and TCP destination port - Tunnel source and destination addresses Testing included confirming both mega-flows and non-megaflows are properly parsed. Bit masks are applied in the case of mega-flows prior to aggregation. Test --script parameter which runs in non-interactive mode. Tested syntax against python 2.4.3, 2.6 and 2.7. Confirmed script passes pep8 and pylint run as: pylint --disable=I0011 --include-id=y --reports=n This tool has been added to these distribution: - add ovs-dpctl-top to debian distribution - add ovs-dpctl-top to rpm distribution. - add ovs-dpctl-top to XenServer RPM. Signed-off-by: Mark Hamilton <mhamilton@nicira.com> Signed-off-by: Gurucharan Shetty <gshetty@nicira.com>
Diffstat (limited to 'utilities')
-rw-r--r--utilities/automake.mk6
-rw-r--r--utilities/ovs-dpctl-top.8.in140
-rwxr-xr-xutilities/ovs-dpctl-top.in1687
3 files changed, 1833 insertions, 0 deletions
diff --git a/utilities/automake.mk b/utilities/automake.mk
index 9f2bb634..ff50a343 100644
--- a/utilities/automake.mk
+++ b/utilities/automake.mk
@@ -7,6 +7,7 @@ bin_PROGRAMS += \
bin_SCRIPTS += utilities/ovs-pki
if HAVE_PYTHON
bin_SCRIPTS += \
+ utilities/ovs-dpctl-top \
utilities/ovs-l3ping \
utilities/ovs-parse-backtrace \
utilities/ovs-pcap \
@@ -24,6 +25,7 @@ EXTRA_DIST += \
utilities/ovs-check-dead-ifs.in \
utilities/ovs-ctl.in \
utilities/ovs-dev.py \
+ utilities/ovs-dpctl-top.in \
utilities/ovs-l3ping.in \
utilities/ovs-lib.in \
utilities/ovs-parse-backtrace.in \
@@ -39,6 +41,7 @@ MAN_ROOTS += \
utilities/ovs-controller.8.in \
utilities/ovs-ctl.8 \
utilities/ovs-dpctl.8.in \
+ utilities/ovs-dpctl-top.8.in \
utilities/ovs-l3ping.8.in \
utilities/ovs-ofctl.8.in \
utilities/ovs-parse-backtrace.8 \
@@ -57,6 +60,8 @@ DISTCLEANFILES += \
utilities/ovs-check-dead-ifs \
utilities/ovs-controller.8 \
utilities/ovs-dpctl.8 \
+ utilities/ovs-dpctl-top \
+ utilities/ovs-dpctl-top.8 \
utilities/ovs-l3ping \
utilities/ovs-l3ping.8 \
utilities/ovs-lib \
@@ -80,6 +85,7 @@ man_MANS += \
utilities/ovs-benchmark.1 \
utilities/ovs-controller.8 \
utilities/ovs-dpctl.8 \
+ utilities/ovs-dpctl-top.8 \
utilities/ovs-l3ping.8 \
utilities/ovs-ofctl.8 \
utilities/ovs-parse-backtrace.8 \
diff --git a/utilities/ovs-dpctl-top.8.in b/utilities/ovs-dpctl-top.8.in
new file mode 100644
index 00000000..410e9995
--- /dev/null
+++ b/utilities/ovs-dpctl-top.8.in
@@ -0,0 +1,140 @@
+.de IQ
+. br
+. ns
+. IP "\\$1"
+..
+.TH ovs\-dpctl\-top "8" "@VERSION@" "Open vSwitch" "Open vSwitch Manual"
+.
+.SH NAME
+\fBovs\-dpctl\-top\fR \- Top like behavior for ovs\-dpctl dump\-flows
+.
+.SH SYNOPSIS
+\fBovs\-dpctl\-top\fR [\-h] [\-v] [\-f FLOWFILES] [\-V] [\-s] [\-\-host HOST]
+[\-a | \-\-accumulate] [\-\-accumulate\-decay ACCUMULATEDECAY] [\-d DELAY]
+.
+.SH DESCRIPTION
+.PP
+This program summarizes \fBovs\-dpctl\fR flow content by aggregating the number
+of packets, total bytes and occurrence of the following fields:
+.IP
+\- Datapath in_port
+.IP
+\- Ethernet type
+.IP
+\- Source and destination MAC addresses
+.IP
+\- IP protocol
+.IP
+\- Source and destination IPv4 addresses
+.IP
+\- Source and destination IPv6 addresses
+.IP
+\- UDP and TCP destination port
+.IP
+\- Tunnel source and destination addresses
+.
+.SS "Output shows four values:"
+.IP
+\- FIELDS: the flow fields for example in_port(1).
+.IP
+\- COUNT: the number of lines in the dump\-flow output contain the flow field.
+.IP
+\- PACKETS: the total number of packets containing the flow field.
+.IP
+\- BYTES: the total number of bytes containing the flow field. If units are
+not present then values are in bytes.
+.IP
+\- AVERAGE: the average packets size (BYTES/PACKET).
+.PP
+.SS "Top Behavior"
+.PP
+While in top mode, the default behavior, the following single character commands
+are supported:
+.IP
+a \- toggles top in accumulate and live mode. Accumulate mode is described
+below.
+.IP
+s \- toggles which column is used to sort content in decreasing order. A
+DESC title is placed over the column.
+.IP
+_ \- a space indicating to collect dump\-flow content again
+.IP
+h \- halt output. Any character will restart sampling
+.IP
+f \- cycle through flow fields
+.IP
+q \- q for quit.
+.PP
+.SS "Accumulate Mode"
+.PP
+There are two supported modes: live and accumulate. The default is live.
+The parameter \fB\-\-accumulate\fR or the 'a' character in top mode enables the
+latter. In live mode, recent dump\-flow content is presented.
+Where as accumulate mode keeps track of the prior historical
+information until the flow is reset not when the flow is purged. Reset
+flows are determined when the packet count for a flow has decreased from
+its previous sample. There is one caveat, eventually the system will
+run out of memory if, after the accumulate\-decay period any flows that
+have not been refreshed are purged. The goal here is to free memory
+of flows that are not active. Statistics are not decremented. Their purpose
+is to reflect the overall history of the flow fields.
+.PP
+.SS "Debugging Errors"
+.PP
+Parsing errors are counted and displayed in the status line at the beginning
+of the output. Use the \fB\-\-verbose\fR option with \fB\-\-script to see
+what output was not parsed, like this:
+.PP
+$ ovs\-dpctl dump\-flows | ovs\-dpctl\-top \fB\-\-script\fR \fB\-\-verbose\fR
+.PP
+Error messages will identify content that failed to parse.
+.PP
+.SS "Access Remote Hosts"
+.PP
+The \fB\-\-host\fR must follow the format user@hostname. This script simply
+calls \&'ssh user@Hostname' without checking for login credentials therefore
+public keys should be installed on the system identified by hostname, such as:
+.PP
+$ ssh\-copy\-id user@hostname
+.PP
+Consult ssh\-copy\-id man pages for more details.
+.PP
+.SS "Expected usage"
+.PP
+$ ovs\-dpctl\-top
+.PP
+or to run as a script:
+.PP
+$ ovs\-dpctl dump\-flows > dump\-flows.log
+.PP
+$ ovs\-dpctl\-top \fB\-\-script\fR \fB\-\-flow\-file\fR dump\-flows.log
+.SS "OPTIONS"
+.TP
+\fB\-h\fR, \fB\-\-help\fR
+show this help message and exit.
+.TP
+\fB\-v\fR, \fB\-\-version\fR
+show program's version number and exit.
+.TP
+\fB\-f\fR FLOWFILES, \fB\-\-flow\-file\fR FLOWFILES
+file containing flows from ovs\-dpctl dump\-flow.
+.TP
+\fB\-V\fR, \fB\-\-verbose\fR
+enable debug level verbosity.
+.TP
+\fB\-s\fR, \fB\-\-script\fR
+Run from a script (no user interface).
+.TP
+\fB\-\-host\fR HOST
+Specify a user@host for retrieving flows see Accessing
+Remote Hosts for more information.
+.TP
+\fB\-a\fR, \fB\-\-accumulate\fR
+Accumulate dump\-flow content.
+.TP
+\fB\-\-accumulate\-decay\fR ACCUMULATEDECAY
+Decay old accumulated flows. The default is 5 minutes. A value of 0 disables
+decay.
+.TP
+\fB\-d\fR DELAY, \fB\-\-delay\fR DELAY
+Delay in milliseconds to collect dump\-flow content (sample rate).
diff --git a/utilities/ovs-dpctl-top.in b/utilities/ovs-dpctl-top.in
new file mode 100755
index 00000000..f43fdeb7
--- /dev/null
+++ b/utilities/ovs-dpctl-top.in
@@ -0,0 +1,1687 @@
+#! @PYTHON@
+#
+# Copyright (c) 2013 Nicira, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+#
+# The approximate_size code was copied from
+# http://getpython3.com/diveintopython3/your-first-python-program.html#divingin
+# which is licensed under # "Dive Into Python 3," Copyright 2011 Mark Pilgrim,
+# used under a Creative Commons Attribution-Share-Alike license:
+# http://creativecommons.org/licenses/by-sa/3.0/
+#
+#
+
+"""Top like behavior for ovs-dpctl dump-flows output.
+
+This program summarizes ovs-dpctl flow content by aggregating the number
+of packets, total bytes and occurrence of the following fields:
+
+ - Datapath in_port
+
+ - Ethernet type
+
+ - Source and destination MAC addresses
+
+ - IP protocol
+
+ - Source and destination IPv4 addresses
+
+ - Source and destination IPv6 addresses
+
+ - UDP and TCP destination port
+
+ - Tunnel source and destination addresses
+
+
+Output shows four values:
+ - FIELDS: the flow fields for example in_port(1).
+
+ - PACKETS: the total number of packets containing the flow field.
+
+ - BYTES: the total number of bytes containing the flow field. If units are
+ not present then values are in bytes.
+
+ - AVERAGE: the average packets size (BYTES/PACKET).
+
+ - COUNT: the number of lines in the dump-flow output contain the flow field.
+
+Top Behavior
+
+While in top mode, the default behavior, the following single character
+commands are supported:
+
+ a - toggles top in accumulate and live mode. Accumulate mode is described
+ below.
+
+ s - toggles which column is used to sort content in decreasing order. A
+ DESC title is placed over the column.
+
+ _ - a space indicating to collect dump-flow content again
+
+ h - halt output. Any character will restart sampling
+
+ f - cycle through flow fields
+
+ q - q for quit.
+
+Accumulate Mode
+
+There are two supported modes: live and accumulate. The default is live.
+The parameter --accumulate or the 'a' character in top mode enables the
+latter. In live mode, recent dump-flow content is presented.
+Where as accumulate mode keeps track of the prior historical
+information until the flow is reset not when the flow is purged. Reset
+flows are determined when the packet count for a flow has decreased from
+its previous sample. There is one caveat, eventually the system will
+run out of memory if, after the accumulate-decay period any flows that
+have not been refreshed are purged. The goal here is to free memory
+of flows that are not active. Statistics are not decremented. Their purpose
+is to reflect the overall history of the flow fields.
+
+
+Debugging Errors
+
+Parsing errors are counted and displayed in the status line at the beginning
+of the output. Use the --verbose option with --script to see what output
+ was not parsed, like this:
+$ ovs-dpctl dump-flows | ovs-dpctl-top --script --verbose
+
+Error messages will identify content that failed to parse.
+
+
+Access Remote Hosts
+
+The --host must follow the format user@hostname. This script simply calls
+'ssh user@Hostname' without checking for login credentials therefore public
+keys should be installed on the system identified by hostname, such as:
+
+$ ssh-copy-id user@hostname
+
+Consult ssh-copy-id man pages for more details.
+
+
+Expected usage
+
+$ ovs-dpctl-top
+
+or to run as a script:
+$ ovs-dpctl dump-flows > dump-flows.log
+$ ovs-dpctl-top --script --flow-file dump-flows.log
+
+"""
+
+# pylint: disable-msg=C0103
+# pylint: disable-msg=C0302
+# pylint: disable-msg=R0902
+# pylint: disable-msg=R0903
+# pylint: disable-msg=R0904
+# pylint: disable-msg=R0912
+# pylint: disable-msg=R0913
+# pylint: disable-msg=R0914
+
+import sys
+import os
+try:
+ ##
+ # Arg parse is not installed on older Python distributions.
+ # ovs ships with a version in the directory mentioned below.
+ import argparse
+except ImportError:
+ sys.path.append(os.path.join("@pkgdatadir@", "python"))
+ import argparse
+import logging
+import re
+import unittest
+import copy
+import curses
+import operator
+import subprocess
+import fcntl
+import struct
+import termios
+import datetime
+import threading
+import time
+import socket
+
+
+##
+# The following two definitions provide the necessary netaddr functionality.
+# Python netaddr module is not part of the core installation. Packaging
+# netaddr was involved and seems inappropriate given that only two
+# methods where used.
+def ipv4_to_network(ip_str):
+ """ Calculate the network given a ipv4/mask value.
+ If a mask is not present simply return ip_str.
+ """
+ pack_length = '!HH'
+ try:
+ (ip, mask) = ip_str.split("/")
+ except ValueError:
+ # just an ip address no mask.
+ return ip_str
+
+ ip_p = socket.inet_pton(socket.AF_INET, ip)
+ ip_t = struct.unpack(pack_length, ip_p)
+ mask_t = struct.unpack(pack_length, socket.inet_pton(socket.AF_INET, mask))
+ network_n = [ii & jj for (ii, jj) in zip(ip_t, mask_t)]
+
+ return socket.inet_ntop(socket.AF_INET,
+ struct.pack('!HH', network_n[0], network_n[1]))
+
+
+def ipv6_to_network(ip_str):
+ """ Calculate the network given a ipv6/mask value.
+ If a mask is not present simply return ip_str.
+ """
+ pack_length = '!HHHHHHHH'
+ try:
+ (ip, mask) = ip_str.split("/")
+ except ValueError:
+ # just an ip address no mask.
+ return ip_str
+
+ ip_p = socket.inet_pton(socket.AF_INET6, ip)
+ ip_t = struct.unpack(pack_length, ip_p)
+ mask_t = struct.unpack(pack_length,
+ socket.inet_pton(socket.AF_INET6, mask))
+ network_n = [ii & jj for (ii, jj) in zip(ip_t, mask_t)]
+
+ return socket.inet_ntop(socket.AF_INET6,
+ struct.pack(pack_length,
+ network_n[0], network_n[1],
+ network_n[2], network_n[3],
+ network_n[4], network_n[5],
+ network_n[6], network_n[7]))
+
+
+##
+# columns displayed
+##
+class Columns:
+ """ Holds column specific content.
+ Titles needs to be less than 8 characters.
+ """
+ VALUE_WIDTH = 9
+ FIELDS = "fields"
+ PACKETS = "packets"
+ COUNT = "count"
+ BYTES = "bytes"
+ AVERAGE = "average"
+
+ def __init__(self):
+ pass
+
+ @staticmethod
+ def assoc_list(obj):
+ """ Return a associated list. """
+ return [(Columns.FIELDS, repr(obj)),
+ (Columns.PACKETS, obj.packets),
+ (Columns.BYTES, obj.bytes),
+ (Columns.COUNT, obj.count),
+ (Columns.AVERAGE, obj.average),
+ ]
+
+
+def element_eth_get(field_type, element, stats_dict):
+ """ Extract eth frame src and dst from a dump-flow element."""
+ fmt = "%s(src=%s,dst=%s)"
+
+ element = fmt % (field_type, element["src"], element["dst"])
+ return SumData(field_type, element, stats_dict["packets"],
+ stats_dict["bytes"], element)
+
+
+def element_ipv4_get(field_type, element, stats_dict):
+ """ Extract src and dst from a dump-flow element."""
+ fmt = "%s(src=%s,dst=%s)"
+ element_show = fmt % (field_type, element["src"], element["dst"])
+
+ element_key = fmt % (field_type, ipv4_to_network(element["src"]),
+ ipv4_to_network(element["dst"]))
+
+ return SumData(field_type, element_show, stats_dict["packets"],
+ stats_dict["bytes"], element_key)
+
+
+def element_tunnel_get(field_type, element, stats_dict):
+ """ Extract src and dst from a tunnel."""
+ return element_ipv4_get(field_type, element, stats_dict)
+
+
+def element_ipv6_get(field_type, element, stats_dict):
+ """ Extract src and dst from a dump-flow element."""
+
+ fmt = "%s(src=%s,dst=%s)"
+ element_show = fmt % (field_type, element["src"], element["dst"])
+
+ element_key = fmt % (field_type, ipv6_to_network(element["src"]),
+ ipv6_to_network(element["dst"]))
+
+ return SumData(field_type, element_show, stats_dict["packets"],
+ stats_dict["bytes"], element_key)
+
+
+def element_dst_port_get(field_type, element, stats_dict):
+ """ Extract src and dst from a dump-flow element."""
+ element_key = "%s(dst=%s)" % (field_type, element["dst"])
+ return SumData(field_type, element_key, stats_dict["packets"],
+ stats_dict["bytes"], element_key)
+
+
+def element_passthrough_get(field_type, element, stats_dict):
+ """ Extract src and dst from a dump-flow element."""
+ element_key = "%s(%s)" % (field_type, element)
+ return SumData(field_type, element_key,
+ stats_dict["packets"], stats_dict["bytes"], element_key)
+
+
+# pylint: disable-msg=R0903
+class OutputFormat:
+ """ Holds field_type and function to extract element value. """
+ def __init__(self, field_type, generator):
+ self.field_type = field_type
+ self.generator = generator
+
+OUTPUT_FORMAT = [
+ OutputFormat("eth", element_eth_get),
+ OutputFormat("ipv4", element_ipv4_get),
+ OutputFormat("ipv6", element_ipv6_get),
+ OutputFormat("tunnel", element_tunnel_get),
+ OutputFormat("udp", element_dst_port_get),
+ OutputFormat("tcp", element_dst_port_get),
+ OutputFormat("eth_type", element_passthrough_get),
+ OutputFormat("in_port", element_passthrough_get)
+ ]
+
+
+ELEMENT_KEY = {
+ "udp": "udp.dst",
+ "tcp": "tcp.dst"
+ }
+
+
+def top_input_get(args):
+ """ Return subprocess stdout."""
+ cmd = []
+ if (args.host):
+ cmd += ["ssh", args.host]
+ cmd += ["ovs-dpctl", "dump-flows"]
+
+ return subprocess.Popen(cmd, stderr=subprocess.STDOUT,
+ stdout=subprocess.PIPE).stdout
+
+
+def args_get():
+ """ read program parameters handle any necessary validation of input. """
+
+ parser = argparse.ArgumentParser(
+ formatter_class=argparse.RawDescriptionHelpFormatter,
+ description=__doc__)
+ ##
+ # None is a special value indicating to read flows from stdin.
+ # This handles the case
+ # ovs-dpctl dump-flows | ovs-dpctl-flows.py
+ parser.add_argument("-v", "--version", version="@VERSION@",
+ action="version", help="show version")
+ parser.add_argument("-f", "--flow-file", dest="flowFiles", default=None,
+ action="append",
+ help="file containing flows from ovs-dpctl dump-flow")
+ parser.add_argument("-V", "--verbose", dest="verbose",
+ default=logging.CRITICAL,
+ action="store_const", const=logging.DEBUG,
+ help="enable debug level verbosity")
+ parser.add_argument("-s", "--script", dest="top", action="store_false",
+ help="Run from a script (no user interface)")
+ parser.add_argument("--host", dest="host",
+ help="Specify a user@host for retrieving flows see"
+ "Accessing Remote Hosts for more information")
+
+ parser.add_argument("-a", "--accumulate", dest="accumulate",
+ action="store_true", default=False,
+ help="Accumulate dump-flow content")
+ parser.add_argument("--accumulate-decay", dest="accumulateDecay",
+ default=5.0 * 60, type=float,
+ help="Decay old accumulated flows. "
+ "The default is 5 minutes. "
+ "A value of 0 disables decay.")
+ parser.add_argument("-d", "--delay", dest="delay", type=int,
+ default=1000,
+ help="Delay in milliseconds to collect dump-flow "
+ "content (sample rate).")
+
+ args = parser.parse_args()
+
+ logging.basicConfig(level=args.verbose)
+
+ return args
+
+###
+# Code to parse a single line in dump-flow
+###
+# key(values)
+FIELDS_CMPND = re.compile("([\w]+)\((.+)\)")
+# key:value
+FIELDS_CMPND_ELEMENT = re.compile("([\w:]+)=([/\.\w:]+)")
+FIELDS_ELEMENT = re.compile("([\w]+):([-\.\w]+)")
+
+
+def flow_line_iter(line):
+ """ iterate over flow dump elements.
+ return tuples of (true, element) or (false, remaining element)
+ """
+ # splits by , except for when in a (). Actions element was not
+ # split properly but we don't need it.
+ rc = []
+
+ element = ""
+ paren_count = 0
+
+ for ch in line:
+ if (ch == '('):
+ paren_count += 1
+ elif (ch == ')'):
+ paren_count -= 1
+
+ if (ch == ' '):
+ # ignore white space.
+ continue
+ elif ((ch == ',') and (paren_count == 0)):
+ rc.append(element)
+ element = ""
+ else:
+ element += ch
+
+ if (paren_count):
+ raise ValueError(line)
+ else:
+ if (len(element) > 0):
+ rc.append(element)
+ return rc
+
+
+def flow_line_compound_parse(compound):
+ """ Parse compound element
+ for example
+ src=00:50:56:b4:4e:f8,dst=33:33:00:01:00:03
+ which is in
+ eth(src=00:50:56:b4:4e:f8,dst=33:33:00:01:00:03)
+ """
+ result = {}
+ for element in flow_line_iter(compound):
+ match = FIELDS_CMPND_ELEMENT.search(element)
+ if (match):
+ key = match.group(1)
+ value = match.group(2)
+ result[key] = value
+
+ match = FIELDS_CMPND.search(element)
+ if (match):
+ key = match.group(1)
+ value = match.group(2)
+ result[key] = flow_line_compound_parse(value)
+ continue
+
+ if (len(result.keys()) == 0):
+ return compound
+ return result
+
+
+def flow_line_split(line):
+ """ Convert a flow dump line into ([fields], [stats], actions) tuple.
+ Where fields and stats are lists.
+ This function relies on a the following ovs-dpctl dump-flow
+ output characteristics:
+ 1. The dumpe flow line consists of a list of frame fields, list of stats
+ and action.
+ 2. list of frame fields, each stat and action field are delimited by ', '.
+ 3. That all other non stat field are not delimited by ', '.
+
+ """
+
+ results = re.split(', ', line)
+
+ (field, stats, action) = (results[0], results[1:-1], results[-1])
+
+ fields = flow_line_iter(field)
+ return (fields, stats, action)
+
+
+def elements_to_dict(elements):
+ """ Convert line to a hierarchy of dictionaries. """
+ result = {}
+ for element in elements:
+ match = FIELDS_CMPND.search(element)
+ if (match):
+ key = match.group(1)
+ value = match.group(2)
+ result[key] = flow_line_compound_parse(value)
+ continue
+
+ match = FIELDS_ELEMENT.search(element)
+ if (match):
+ key = match.group(1)
+ value = match.group(2)
+ result[key] = value
+ else:
+ raise ValueError("can't parse >%s<" % element)
+ return result
+
+
+# pylint: disable-msg=R0903
+class SumData(object):
+ """ Interface that all data going into SumDb must implement.
+ Holds the flow field and its corresponding count, total packets,
+ total bytes and calculates average.
+
+ __repr__ is used as key into SumData singleton.
+ __str__ is used as human readable output.
+ """
+
+ def __init__(self, field_type, field, packets, flow_bytes, key):
+ # Count is the number of lines in the dump-flow log.
+ self.field_type = field_type
+ self.field = field
+ self.count = 1
+ self.packets = int(packets)
+ self.bytes = int(flow_bytes)
+ self.key = key
+
+ def decrement(self, decr_packets, decr_bytes, decr_count):
+ """ Decrement content to calculate delta from previous flow sample."""
+ self.packets -= decr_packets
+ self.bytes -= decr_bytes
+ self.count -= decr_count
+
+ def __iadd__(self, other):
+ """ Add two objects. """
+
+ if (self.key != other.key):
+ raise ValueError("adding two unrelated types")
+
+ self.count += other.count
+ self.packets += other.packets
+ self.bytes += other.bytes
+ return self
+
+ def __isub__(self, other):
+ """ Decrement two objects. """
+
+ if (self.key != other.key):
+ raise ValueError("adding two unrelated types")
+
+ self.count -= other.count
+ self.packets -= other.packets
+ self.bytes -= other.bytes
+ return self
+
+ def __getattr__(self, name):
+ """ Handle average. """
+ if (name == "average"):
+ if (self.packets == 0):
+ return float(0.0)
+ else:
+ return float(self.bytes) / float(self.packets)
+ raise AttributeError(name)
+
+ def __str__(self):
+ """ Used for debugging. """
+ return "%s %s %s %s" % (self.field, self.count,
+ self.packets, self.bytes)
+
+ def __repr__(self):
+ """ Used as key in the FlowDB table. """
+ return self.key
+
+
+def flow_aggregate(fields_dict, stats_dict):
+ """ Search for content in a line.
+ Passed the flow port of the dump-flows plus the current stats consisting
+ of packets, bytes, etc
+ """
+ result = []
+
+ for output_format in OUTPUT_FORMAT:
+ field = fields_dict.get(output_format.field_type, None)
+ if (field):
+ obj = output_format.generator(output_format.field_type,
+ field, stats_dict)
+ result.append(obj)
+
+ return result
+
+
+def flows_read(ihdl, flow_db):
+ """ read flow content from ihdl and insert into flow_db. """
+
+ done = False
+ while (not done):
+ line = ihdl.readline()
+ if (len(line) == 0):
+ # end of input
+ break
+
+ try:
+ flow_db.flow_line_add(line)
+ except ValueError, arg:
+ logging.error(arg)
+
+ return flow_db
+
+
+def get_terminal_size():
+ """
+ return column width and height of the terminal
+ """
+ for fd_io in [0, 1, 2]:
+ try:
+ result = struct.unpack('hh',
+ fcntl.ioctl(fd_io, termios.TIOCGWINSZ,
+ '1234'))
+ except IOError:
+ result = None
+ continue
+
+ if (result is None or result == (0, 0)):
+ # Maybe we can't get the width. In that case assume (25, 80)
+ result = (25, 80)
+
+ return result
+
+##
+# Content derived from:
+# http://getpython3.com/diveintopython3/your-first-python-program.html#divingin
+##
+SUFFIXES = {1000: ['KB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB', 'YB'],
+ 1024: ['KiB', 'MiB', 'GiB', 'TiB', 'PiB', 'EiB', 'ZiB', 'YiB']}
+
+
+def approximate_size(size, a_kilobyte_is_1024_bytes=True):
+ """Convert a file size to human-readable form.
+
+ Keyword arguments:
+ size -- file size in bytes
+ a_kilobyte_is_1024_bytes -- if True (default), use multiples of 1024
+ if False, use multiples of 1000
+
+ Returns: string
+
+ """
+ size = float(size)
+ if size < 0:
+ raise ValueError('number must be non-negative')
+
+ if (a_kilobyte_is_1024_bytes):
+ multiple = 1024
+ else:
+ multiple = 1000
+ for suffix in SUFFIXES[multiple]:
+ size /= multiple
+ if size < multiple:
+ return "%.1f %s" % (size, suffix)
+
+ raise ValueError('number too large')
+
+
+##
+# End copied content
+##
+class ColMeta:
+ """ Concepts about columns. """
+ def __init__(self, sortable, width):
+ self.sortable = sortable
+ self.width = width
+
+
+class RowMeta:
+ """ How to render rows. """
+ def __init__(self, label, fmt):
+ self.label = label
+ self.fmt = fmt
+
+
+def fmt_packet(obj, width):
+ """ Provide a string for packets that is appropriate for output."""
+ return str(obj.packets).rjust(width)
+
+
+def fmt_count(obj, width):
+ """ Provide a string for average that is appropriate for output."""
+ return str(obj.count).rjust(width)
+
+
+def fmt_avg(obj, width):
+ """ Provide a string for average that is appropriate for output."""
+ return str(int(obj.average)).rjust(width)
+
+
+def fmt_field(obj, width):
+ """ truncate really long flow and insert ellipses to help make it
+ clear.
+ """
+
+ ellipses = " ... "
+ value = obj.field
+ if (len(obj.field) > width):
+ value = value[:(width - len(ellipses))] + ellipses
+ return value.ljust(width)
+
+
+def fmt_bytes(obj, width):
+ """ Provide a string for average that is appropriate for output."""
+ if (len(str(obj.bytes)) <= width):
+ value = str(obj.bytes)
+ else:
+ value = approximate_size(obj.bytes)
+ return value.rjust(width)
+
+
+def title_center(value, width):
+ """ Center a column title."""
+ return value.upper().center(width)
+
+
+def title_rjust(value, width):
+ """ Right justify a column title. """
+ return value.upper().rjust(width)
+
+
+def column_picker(order, obj):
+ """ return the column as specified by order. """
+ if (order == 1):
+ return obj.count
+ elif (order == 2):
+ return obj.packets
+ elif (order == 3):
+ return obj.bytes
+ elif (order == 4):
+ return obj.average
+ else:
+ raise ValueError("order outside of range %s" % order)
+
+
+class Render:
+ """ Renders flow data. """
+ def __init__(self, console_width):
+ """ Calculate column widths taking into account changes in format."""
+
+ self._start_time = datetime.datetime.now()
+
+ self._cols = [ColMeta(False, 0),
+ ColMeta(True, Columns.VALUE_WIDTH),
+ ColMeta(True, Columns.VALUE_WIDTH),
+ ColMeta(True, Columns.VALUE_WIDTH),
+ ColMeta(True, Columns.VALUE_WIDTH)]
+ self._console_width = console_width
+ self.console_width_set(console_width)
+
+ # Order in this array dictate the order of the columns.
+ # The 0 width for the first entry is a place holder. This is
+ # dynamically calculated. The first column is special. We need a
+ # way to indicate which field are presented.
+ self._descs = [RowMeta("", title_rjust),
+ RowMeta("", title_rjust),
+ RowMeta("", title_rjust),
+ RowMeta("", title_rjust),
+ RowMeta("", title_rjust)]
+ self._column_sort_select = 0
+ self.column_select_event()
+
+ self._titles = [
+ RowMeta(Columns.FIELDS, title_center),
+ RowMeta(Columns.COUNT, title_rjust),
+ RowMeta(Columns.PACKETS, title_rjust),
+ RowMeta(Columns.BYTES, title_rjust),
+ RowMeta(Columns.AVERAGE, title_rjust)
+ ]
+
+ self._datas = [
+ RowMeta(None, fmt_field),
+ RowMeta(None, fmt_count),
+ RowMeta(None, fmt_packet),
+ RowMeta(None, fmt_bytes),
+ RowMeta(None, fmt_avg)
+ ]
+
+ ##
+ # _field_types hold which fields are displayed in the field
+ # column, with the keyword all implying all fields.
+ ##
+ self._field_types = ["all"] + [ii.field_type for ii in OUTPUT_FORMAT]
+
+ ##
+ # The default is to show all field types.
+ ##
+ self._field_type_select = -1
+ self.field_type_toggle()
+
+ def _field_type_select_get(self):
+ """ Return which field type to display. """
+ return self._field_types[self._field_type_select]
+
+ def field_type_toggle(self):
+ """ toggle which field types to show. """
+ self._field_type_select += 1
+ if (self._field_type_select >= len(self._field_types)):
+ self._field_type_select = 0
+ value = Columns.FIELDS + " (%s)" % self._field_type_select_get()
+ self._titles[0].label = value
+
+ def column_select_event(self):
+ """ Handles column select toggle. """
+
+ self._descs[self._column_sort_select].label = ""
+ for _ in range(len(self._cols)):
+ self._column_sort_select += 1
+ if (self._column_sort_select >= len(self._cols)):
+ self._column_sort_select = 0
+
+ # Now look for the next sortable column
+ if (self._cols[self._column_sort_select].sortable):
+ break
+ self._descs[self._column_sort_select].label = "DESC"
+
+ def console_width_set(self, console_width):
+ """ Adjust the output given the new console_width. """
+ self._console_width = console_width
+
+ spaces = len(self._cols) - 1
+ ##
+ # Calculating column width can be tedious but important. The
+ # flow field value can be long. The goal here is to dedicate
+ # fixed column space for packets, bytes, average and counts. Give the
+ # remaining space to the flow column. When numbers get large
+ # transition output to output generated by approximate_size which
+ # limits output to ###.# XiB in other words 9 characters.
+ ##
+ # At this point, we know the maximum length values. We may
+ # truncate the flow column to get everything to fit.
+ self._cols[0].width = 0
+ values_max_length = sum([ii.width for ii in self._cols]) + spaces
+ flow_max_length = console_width - values_max_length
+ self._cols[0].width = flow_max_length
+
+ def format(self, flow_db):
+ """ shows flows based on --script parameter."""
+
+ rc = []
+ ##
+ # Top output consists of
+ # Title
+ # Column title (2 rows)
+ # data
+ # statistics and status
+
+ ##
+ # Title
+ ##
+ rc.append("Flow Summary".center(self._console_width))
+
+ stats = " Total: %(flow_total)s errors: %(flow_errors)s " % \
+ flow_db.flow_stats_get()
+ accumulate = flow_db.accumulate_get()
+ if (accumulate):
+ stats += "Accumulate: on "
+ else:
+ stats += "Accumulate: off "
+
+ duration = datetime.datetime.now() - self._start_time
+ stats += "Duration: %s " % str(duration)
+ rc.append(stats.ljust(self._console_width))
+
+ ##
+ # 2 rows for columns.
+ ##
+ # Indicate which column is in descending order.
+ rc.append(" ".join([ii.fmt(ii.label, col.width)
+ for (ii, col) in zip(self._descs, self._cols)]))
+
+ rc.append(" ".join([ii.fmt(ii.label, col.width)
+ for (ii, col) in zip(self._titles, self._cols)]))
+
+ ##
+ # Data.
+ ##
+ for dd in flow_db.field_values_in_order(self._field_type_select_get(),
+ self._column_sort_select):
+ rc.append(" ".join([ii.fmt(dd, col.width)
+ for (ii, col) in zip(self._datas,
+ self._cols)]))
+
+ return rc
+
+
+def curses_screen_begin():
+ """ begin curses screen control. """
+ stdscr = curses.initscr()
+ curses.cbreak()
+ curses.noecho()
+ stdscr.keypad(1)
+ return stdscr
+
+
+def curses_screen_end(stdscr):
+ """ end curses screen control. """
+ curses.nocbreak()
+ stdscr.keypad(0)
+ curses.echo()
+ curses.endwin()
+
+
+class FlowDB:
+ """ Implements live vs accumulate mode.
+
+ Flows are stored as key value pairs. The key consists of the content
+ prior to stat fields. The value portion consists of stats in a dictionary
+ form.
+
+ @ \todo future add filtering here.
+ """
+ def __init__(self, accumulate):
+ self._accumulate = accumulate
+ self._error_count = 0
+ # Values are (stats, last update time.)
+ # The last update time is used for aging.
+ self._flow_lock = threading.Lock()
+ # This dictionary holds individual flows.
+ self._flows = {}
+ # This dictionary holds aggregate of flow fields.
+ self._fields = {}
+
+ def accumulate_get(self):
+ """ Return the current accumulate state. """
+ return self._accumulate
+
+ def accumulate_toggle(self):
+ """ toggle accumulate flow behavior. """
+ self._accumulate = not self._accumulate
+
+ def begin(self):
+ """ Indicate the beginning of processing flow content.
+ if accumulate is false clear current set of flows. """
+
+ if (not self._accumulate):
+ self._flow_lock.acquire()
+ try:
+ self._flows.clear()
+ finally:
+ self._flow_lock.release()
+ self._fields.clear()
+
+ def flow_line_add(self, line):
+ """ Split a line from a ovs-dpctl dump-flow into key and stats.
+ The order of the content in the flow should be:
+ - flow content
+ - stats for the flow
+ - actions
+
+ This method also assumes that the dump flow output does not
+ change order of fields of the same flow.
+ """
+
+ line = line.rstrip("\n")
+ (fields, stats, _) = flow_line_split(line)
+
+ try:
+ fields_dict = elements_to_dict(fields)
+
+ if (len(fields_dict) == 0):
+ raise ValueError("flow fields are missing %s", line)
+
+ stats_dict = elements_to_dict(stats)
+ if (len(stats_dict) == 0):
+ raise ValueError("statistics are missing %s.", line)
+
+ ##
+ # In accumulate mode, the Flow database can reach 10,000's of
+ # persistent flows. The interaction of the script with this many
+ # flows is too slow. Instead, delta are sent to the flow_db
+ # database allow incremental changes to be done in O(m) time
+ # where m is the current flow list, instead of iterating over
+ # all flows in O(n) time where n is the entire history of flows.
+ key = ",".join(fields)
+
+ self._flow_lock.acquire()
+ try:
+ (stats_old_dict, _) = self._flows.get(key, (None, None))
+ finally:
+ self._flow_lock.release()
+
+ self.flow_event(fields_dict, stats_old_dict, stats_dict)
+
+ except ValueError, arg:
+ logging.error(arg)
+ self._error_count += 1
+ raise
+
+ self._flow_lock.acquire()
+ try:
+ self._flows[key] = (stats_dict, datetime.datetime.now())
+ finally:
+ self._flow_lock.release()
+
+ def decay(self, decayTimeInSeconds):
+ """ Decay content. """
+ now = datetime.datetime.now()
+ for (key, value) in self._flows.items():
+ (stats_dict, updateTime) = value
+ delta = now - updateTime
+
+ if (delta.seconds > decayTimeInSeconds):
+ self._flow_lock.acquire()
+ try:
+ del self._flows[key]
+
+ fields_dict = elements_to_dict(flow_line_iter(key))
+ matches = flow_aggregate(fields_dict, stats_dict)
+ for match in matches:
+ self.field_dec(match)
+
+ finally:
+ self._flow_lock.release()
+
+ def flow_stats_get(self):
+ """ Return statistics in a form of a dictionary. """
+ rc = None
+ self._flow_lock.acquire()
+ try:
+ rc = {"flow_total": len(self._flows),
+ "flow_errors": self._error_count}
+ finally:
+ self._flow_lock.release()
+ return rc
+
+ def field_types_get(self):
+ """ Return the set of types stored in the singleton. """
+ types = set((ii.field_type for ii in self._fields.values()))
+ return types
+
+ def field_add(self, data):
+ """ Collect dump-flow data to sum number of times item appears. """
+ current = self._fields.get(repr(data), None)
+ if (current is None):
+ current = copy.copy(data)
+ else:
+ current += data
+ self._fields[repr(current)] = current
+
+ def field_dec(self, data):
+ """ Collect dump-flow data to sum number of times item appears. """
+ current = self._fields.get(repr(data), None)
+ if (current is None):
+ raise ValueError("decrementing field missing %s" % repr(data))
+
+ current -= data
+ self._fields[repr(current)] = current
+ if (current.count == 0):
+ del self._fields[repr(current)]
+
+ def field_values_in_order(self, field_type_select, column_order):
+ """ Return a list of items in order maximum first. """
+ values = self._fields.values()
+ if (field_type_select != "all"):
+ # If a field type other than "all" then reduce the list.
+ values = [ii for ii in values
+ if (ii.field_type == field_type_select)]
+ values = [(column_picker(column_order, ii), ii) for ii in values]
+ values.sort(key=operator.itemgetter(0))
+ values.reverse()
+ values = [ii[1] for ii in values]
+ return values
+
+ def flow_event(self, fields_dict, stats_old_dict, stats_new_dict):
+ """ Receives new flow information. """
+
+ # In order to avoid processing every flow at every sample
+ # period, changes in flow packet count is used to determine the
+ # delta in the flow statistics. This delta is used in the call
+ # to self.decrement prior to self.field_add
+
+ if (stats_old_dict is None):
+ # This is a new flow
+ matches = flow_aggregate(fields_dict, stats_new_dict)
+ for match in matches:
+ self.field_add(match)
+ else:
+ old_packets = int(stats_old_dict.get("packets", 0))
+ new_packets = int(stats_new_dict.get("packets", 0))
+ if (old_packets == new_packets):
+ # ignore. same data.
+ pass
+ else:
+ old_bytes = stats_old_dict.get("bytes", 0)
+ # old_packets != new_packets
+ # if old_packets > new_packets then we end up decrementing
+ # packets and bytes.
+ matches = flow_aggregate(fields_dict, stats_new_dict)
+ for match in matches:
+ match.decrement(int(old_packets), int(old_bytes), 1)
+ self.field_add(match)
+
+
+class DecayThread(threading.Thread):
+ """ Periodically call flow database to see if any flows are old. """
+ def __init__(self, flow_db, interval):
+ """ Start decay thread. """
+ threading.Thread.__init__(self)
+
+ self._interval = max(1, interval)
+ self._min_interval = min(1, interval / 10)
+ self._flow_db = flow_db
+ self._event = threading.Event()
+ self._running = True
+
+ self.daemon = True
+
+ def run(self):
+ """ Worker thread which handles decaying accumulated flows. """
+
+ while(self._running):
+ self._event.wait(self._min_interval)
+ if (self._running):
+ self._flow_db.decay(self._interval)
+
+ def stop(self):
+ """ Stop thread. """
+ self._running = False
+ self._event.set()
+ ##
+ # Give the calling thread time to terminate but not too long.
+ # this thread is a daemon so the application will terminate if
+ # we timeout during the join. This is just a cleaner way to
+ # release resources.
+ self.join(2.0)
+
+
+def flow_top_command(stdscr, render, flow_db):
+ """ Handle input while in top mode. """
+ ch = stdscr.getch()
+ ##
+ # Any character will restart sampling.
+ if (ch == ord('h')):
+ # halt output.
+ ch = stdscr.getch()
+ while (ch == -1):
+ ch = stdscr.getch()
+
+ if (ch == ord('s')):
+ # toggle which column sorts data in descending order.
+ render.column_select_event()
+ elif (ch == ord('a')):
+ flow_db.accumulate_toggle()
+ elif (ch == ord('f')):
+ render.field_type_toggle()
+ elif (ch == ord(' ')):
+ # resample
+ pass
+
+ return ch
+
+
+def decay_timer_start(flow_db, accumulateDecay):
+ """ If accumulateDecay greater than zero then start timer. """
+ if (accumulateDecay > 0):
+ decay_timer = DecayThread(flow_db, accumulateDecay)
+ decay_timer.start()
+ return decay_timer
+ else:
+ return None
+
+
+def flows_top(args):
+ """ handles top like behavior when --script is not specified. """
+
+ flow_db = FlowDB(args.accumulate)
+ render = Render(0)
+
+ decay_timer = decay_timer_start(flow_db, args.accumulateDecay)
+ lines = []
+
+ try:
+ stdscr = curses_screen_begin()
+ try:
+ ch = 'X'
+ #stdscr.nodelay(1)
+ stdscr.timeout(args.delay)
+
+ while (ch != ord('q')):
+ flow_db.begin()
+
+ try:
+ ihdl = top_input_get(args)
+ try:
+ flows_read(ihdl, flow_db)
+ finally:
+ ihdl.close()
+ except OSError, arg:
+ logging.critical(arg)
+ break
+
+ (console_height, console_width) = stdscr.getmaxyx()
+ render.console_width_set(console_width)
+
+ output_height = console_height - 1
+ line_count = range(output_height)
+ line_output = render.format(flow_db)
+ lines = zip(line_count, line_output[:output_height])
+
+ stdscr.erase()
+ for (count, line) in lines:
+ stdscr.addstr(count, 0, line[:console_width])
+ stdscr.refresh()
+
+ ch = flow_top_command(stdscr, render, flow_db)
+
+ finally:
+ curses_screen_end(stdscr)
+ except KeyboardInterrupt:
+ pass
+ if (decay_timer):
+ decay_timer.stop()
+
+ # repeat output
+ for (count, line) in lines:
+ print line
+
+
+def flows_script(args):
+ """ handles --script option. """
+
+ flow_db = FlowDB(args.accumulate)
+ flow_db.begin()
+
+ if (args.flowFiles is None):
+ logging.info("reading flows from stdin")
+ ihdl = os.fdopen(sys.stdin.fileno(), 'r', 0)
+ try:
+ flow_db = flows_read(ihdl, flow_db)
+ finally:
+ ihdl.close()
+ else:
+ for flowFile in args.flowFiles:
+ logging.info("reading flows from %s", flowFile)
+ ihdl = open(flowFile, "r")
+ try:
+ flow_db = flows_read(ihdl, flow_db)
+ finally:
+ ihdl.close()
+
+ (_, console_width) = get_terminal_size()
+ render = Render(console_width)
+
+ for line in render.format(flow_db):
+ print line
+
+
+def main():
+ """ Return 0 on success or 1 on failure.
+
+ Algorithm
+ There are four stages to the process ovs-dpctl dump-flow content.
+ 1. Retrieve current input
+ 2. store in FlowDB and maintain history
+ 3. Iterate over FlowDB and aggregating stats for each flow field
+ 4. present data.
+
+ Retrieving current input is currently trivial, the ovs-dpctl dump-flow
+ is called. Future version will have more elaborate means for collecting
+ dump-flow content. FlowDB returns all data as in the form of a hierarchical
+ dictionary. Input will vary.
+
+ In the case of accumulate mode, flows are not purged from the FlowDB
+ manager. Instead at the very least, merely the latest statistics are
+ kept. In the case, of live output the FlowDB is purged prior to sampling
+ data.
+
+ Aggregating results requires identify flow fields to aggregate out
+ of the flow and summing stats.
+
+ """
+ args = args_get()
+
+ try:
+ if (args.top):
+ flows_top(args)
+ else:
+ flows_script(args)
+ except KeyboardInterrupt:
+ return 1
+ return 0
+
+if __name__ == '__main__':
+ sys.exit(main())
+elif __name__ == 'ovs-dpctl-top':
+ # pylint: disable-msg=R0915
+
+ ##
+ # Test case beyond this point.
+ # pylint: disable-msg=R0904
+ class TestsuiteFlowParse(unittest.TestCase):
+ """
+ parse flow into hierarchy of dictionaries.
+ """
+ def test_flow_parse(self):
+ """ test_flow_parse. """
+ line = "in_port(4),eth(src=00:50:56:b4:4e:f8,"\
+ "dst=33:33:00:01:00:03),eth_type(0x86dd),"\
+ "ipv6(src=fe80::55bf:fe42:bc96:2812,dst=ff02::1:3,"\
+ "label=0,proto=17,tclass=0,hlimit=1,frag=no),"\
+ "udp(src=61252,dst=5355), packets:1, bytes:92, "\
+ "used:0.703s, actions:3,8,11,14,17,20,23,26,29,32,35,"\
+ "38,41,44,47,50,53,56,59,62,65"
+
+ (fields, stats, _) = flow_line_split(line)
+ flow_dict = elements_to_dict(fields + stats)
+ self.assertEqual(flow_dict["eth"]["src"], "00:50:56:b4:4e:f8")
+ self.assertEqual(flow_dict["eth"]["dst"], "33:33:00:01:00:03")
+ self.assertEqual(flow_dict["ipv6"]["src"],
+ "fe80::55bf:fe42:bc96:2812")
+ self.assertEqual(flow_dict["ipv6"]["dst"], "ff02::1:3")
+ self.assertEqual(flow_dict["packets"], "1")
+ self.assertEqual(flow_dict["bytes"], "92")
+
+ line = "in_port(4),eth(src=00:50:56:b4:4e:f8,"\
+ "dst=33:33:00:01:00:03),eth_type(0x86dd),"\
+ "ipv6(src=fe80::55bf:fe42:bc96:2812,dst=ff02::1:3,"\
+ "label=0,proto=17,tclass=0,hlimit=1,frag=no),"\
+ "udp(src=61252,dst=5355), packets:1, bytes:92, "\
+ "used:-0.703s, actions:3,8,11,14,17,20,23,26,29,32,35,"\
+ "38,41,44,47,50,53,56,59,62,65"
+
+ (fields, stats, _) = flow_line_split(line)
+ flow_dict = elements_to_dict(fields + stats)
+ self.assertEqual(flow_dict["used"], "-0.703s")
+ self.assertEqual(flow_dict["packets"], "1")
+ self.assertEqual(flow_dict["bytes"], "92")
+
+ def test_flow_sum(self):
+ """ test_flow_sum. """
+ line = "in_port(4),eth(src=00:50:56:b4:4e:f8,"\
+ "dst=33:33:00:01:00:03),eth_type(0x86dd),"\
+ "ipv6(src=fe80::55bf:fe42:bc96:2812,dst=ff02::1:3,"\
+ "label=0,proto=17,tclass=0,hlimit=1,frag=no),"\
+ "udp(src=61252,dst=5355), packets:2, bytes:92, "\
+ "used:0.703s, actions:3,8,11,14,17,20,23,26,29,32,35,"\
+ "38,41,44,47,50,53,56,59,62,65"
+
+ (fields, stats, _) = flow_line_split(line)
+ stats_dict = elements_to_dict(stats)
+ fields_dict = elements_to_dict(fields)
+ ##
+ # Test simple case of one line.
+ flow_db = FlowDB(False)
+ matches = flow_aggregate(fields_dict, stats_dict)
+ for match in matches:
+ flow_db.field_add(match)
+
+ flow_types = flow_db.field_types_get()
+ expected_flow_types = ["eth", "eth_type", "udp", "in_port", "ipv6"]
+ self.assert_(len(flow_types) == len(expected_flow_types))
+ for flow_type in flow_types:
+ self.assertTrue(flow_type in expected_flow_types)
+
+ for flow_type in flow_types:
+ sum_value = flow_db.field_values_in_order("all", 1)
+ self.assert_(len(sum_value) == 5)
+ self.assert_(sum_value[0].packets == 2)
+ self.assert_(sum_value[0].count == 1)
+ self.assert_(sum_value[0].bytes == 92)
+
+ ##
+ # Add line again just to see counts go up.
+ matches = flow_aggregate(fields_dict, stats_dict)
+ for match in matches:
+ flow_db.field_add(match)
+
+ flow_types = flow_db.field_types_get()
+ self.assert_(len(flow_types) == len(expected_flow_types))
+ for flow_type in flow_types:
+ self.assertTrue(flow_type in expected_flow_types)
+
+ for flow_type in flow_types:
+ sum_value = flow_db.field_values_in_order("all", 1)
+ self.assert_(len(sum_value) == 5)
+ self.assert_(sum_value[0].packets == 4)
+ self.assert_(sum_value[0].count == 2)
+ self.assert_(sum_value[0].bytes == 2 * 92)
+
+ def test_assoc_list(self):
+ """ test_assoc_list. """
+ line = "in_port(4),eth(src=00:50:56:b4:4e:f8,"\
+ "dst=33:33:00:01:00:03),eth_type(0x86dd),"\
+ "ipv6(src=fe80::55bf:fe42:bc96:2812,dst=ff02::1:3,"\
+ "label=0,proto=17,tclass=0,hlimit=1,frag=no),"\
+ "udp(src=61252,dst=5355), packets:2, bytes:92, "\
+ "used:0.703s, actions:3,8,11,14,17,20,23,26,29,32,35,"\
+ "38,41,44,47,50,53,56,59,62,65"
+
+ valid_flows = [
+ 'eth_type(0x86dd)',
+ 'udp(dst=5355)',
+ 'in_port(4)',
+ 'ipv6(src=fe80::55bf:fe42:bc96:2812,dst=ff02::1:3)',
+ 'eth(src=00:50:56:b4:4e:f8,dst=33:33:00:01:00:03)'
+ ]
+
+ (fields, stats, _) = flow_line_split(line)
+ stats_dict = elements_to_dict(stats)
+ fields_dict = elements_to_dict(fields)
+
+ ##
+ # Test simple case of one line.
+ flow_db = FlowDB(False)
+ matches = flow_aggregate(fields_dict, stats_dict)
+ for match in matches:
+ flow_db.field_add(match)
+
+ for sum_value in flow_db.field_values_in_order("all", 1):
+ assoc_list = Columns.assoc_list(sum_value)
+ for item in assoc_list:
+ if (item[0] == "fields"):
+ self.assertTrue(item[1] in valid_flows)
+ elif (item[0] == "packets"):
+ self.assertTrue(item[1] == 2)
+ elif (item[0] == "count"):
+ self.assertTrue(item[1] == 1)
+ elif (item[0] == "average"):
+ self.assertTrue(item[1] == 46.0)
+ elif (item[0] == "bytes"):
+ self.assertTrue(item[1] == 92)
+ else:
+ raise ValueError("unknown %s", item[0])
+
+ def test_human_format(self):
+ """ test_assoc_list. """
+
+ self.assertEqual(approximate_size(0.0), "0.0 KiB")
+ self.assertEqual(approximate_size(1024), "1.0 KiB")
+ self.assertEqual(approximate_size(1024 * 1024), "1.0 MiB")
+ self.assertEqual(approximate_size((1024 * 1024) + 100000),
+ "1.1 MiB")
+ value = (1024 * 1024 * 1024) + 100000000
+ self.assertEqual(approximate_size(value), "1.1 GiB")
+
+ def test_flow_line_split(self):
+ """ Splitting a flow line is not trivial.
+ There is no clear delimiter. Comma is used liberally."""
+ expected_fields = ["in_port(4)",
+ "eth(src=00:50:56:b4:4e:f8,dst=33:33:00:01:00:03)",
+ "eth_type(0x86dd)",
+ "ipv6(src=fe80::55bf:fe42:bc96:2812,dst=ff02::1:3,"
+ "label=0,proto=17,tclass=0,hlimit=1,frag=no)",
+ "udp(src=61252,dst=5355)"]
+ expected_stats = ["packets:2", "bytes:92", "used:0.703s"]
+ expected_actions = "actions:3,8,11,14,17,20,23,26,29,32,35," \
+ "38,41,44,47,50,53,56,59,62,65"
+
+ line = "in_port(4),eth(src=00:50:56:b4:4e:f8,"\
+ "dst=33:33:00:01:00:03),eth_type(0x86dd),"\
+ "ipv6(src=fe80::55bf:fe42:bc96:2812,dst=ff02::1:3,"\
+ "label=0,proto=17,tclass=0,hlimit=1,frag=no),"\
+ "udp(src=61252,dst=5355), packets:2, bytes:92, "\
+ "used:0.703s, actions:3,8,11,14,17,20,23,26,29,32,35,"\
+ "38,41,44,47,50,53,56,59,62,65"
+
+ (fields, stats, actions) = flow_line_split(line)
+
+ self.assertEqual(fields, expected_fields)
+ self.assertEqual(stats, expected_stats)
+ self.assertEqual(actions, expected_actions)
+
+ def test_accumulate_decay(self):
+ """ test_accumulate_decay: test accumulated decay. """
+ lines = ["in_port(1),eth(src=00:50:56:4f:dc:3b,"
+ "dst=ff:ff:ff:ff:ff:ff),"
+ "eth_type(0x0806),arp(sip=10.24.105.107/255.255.255.255,"
+ "tip=10.24.104.230/255.255.255.255,op=1/0xff,"
+ "sha=00:50:56:4f:dc:3b/00:00:00:00:00:00,"
+ "tha=00:00:00:00:00:00/00:00:00:00:00:00), "
+ "packets:1, bytes:120, used:0.004s, actions:1"]
+
+ flow_db = FlowDB(True)
+ flow_db.begin()
+ flow_db.flow_line_add(lines[0])
+
+ # Make sure we decay
+ time.sleep(4)
+ self.assertEqual(flow_db.flow_stats_get()["flow_total"], 1)
+ flow_db.decay(1)
+ self.assertEqual(flow_db.flow_stats_get()["flow_total"], 0)
+
+ flow_db.flow_line_add(lines[0])
+ self.assertEqual(flow_db.flow_stats_get()["flow_total"], 1)
+ flow_db.decay(30)
+ # Should not be deleted.
+ self.assertEqual(flow_db.flow_stats_get()["flow_total"], 1)
+
+ flow_db.flow_line_add(lines[0])
+ self.assertEqual(flow_db.flow_stats_get()["flow_total"], 1)
+ timer = decay_timer_start(flow_db, 2)
+ time.sleep(10)
+ self.assertEqual(flow_db.flow_stats_get()["flow_total"], 0)
+ timer.stop()
+
+ def test_accumulate(self):
+ """ test_accumulate test that FlowDB supports accumulate. """
+
+ lines = ["in_port(1),eth(src=00:50:56:4f:dc:3b,"
+ "dst=ff:ff:ff:ff:ff:ff),"
+ "eth_type(0x0806),arp(sip=10.24.105.107/255.255.255.255,"
+ "tip=10.24.104.230/255.255.255.255,op=1/0xff,"
+ "sha=00:50:56:4f:dc:3b/00:00:00:00:00:00,"
+ "tha=00:00:00:00:00:00/00:00:00:00:00:00), "
+ "packets:1, bytes:120, used:0.004s, actions:1",
+ "in_port(2),"
+ "eth(src=68:ef:bd:25:ef:c0,dst=33:33:00:00:00:66),"
+ "eth_type(0x86dd),ipv6(src=fe80::6aef:bdff:fe25:efc0/::,"
+ "dst=ff02::66/::,label=0/0,proto=17/0xff,tclass=0xe0/0,"
+ "hlimit=255/0,frag=no/0),udp(src=2029,dst=2029), "
+ "packets:2, bytes:5026, used:0.348s, actions:1",
+ "in_port(1),eth(src=ee:ee:ee:ee:ee:ee,"
+ "dst=ff:ff:ff:ff:ff:ff),"
+ "eth_type(0x0806),arp(sip=10.24.105.107/255.255.255.255,"
+ "tip=10.24.104.230/255.255.255.255,op=1/0xff,"
+ "sha=00:50:56:4f:dc:3b/00:00:00:00:00:00,"
+ "tha=00:00:00:00:00:00/00:00:00:00:00:00), packets:2, "
+ "bytes:240, used:0.004s, actions:1"]
+
+ lines = [
+ "in_port(1),eth_type(0x0806), packets:1, bytes:120, actions:1",
+ "in_port(2),eth_type(0x0806), packets:2, bytes:126, actions:1",
+ "in_port(1),eth_type(0x0806), packets:2, bytes:240, actions:1",
+ "in_port(1),eth_type(0x0800), packets:1, bytes:120, actions:1",
+ "in_port(1),eth_type(0x0800), packets:2, bytes:240, actions:1",
+ "in_port(1),eth_type(0x0806), packets:1, bytes:120, actions:1",
+ ]
+
+ # Turn on accumulate.
+ flow_db = FlowDB(True)
+ flow_db.begin()
+
+ flow_db.flow_line_add(lines[0])
+
+ # Test one flow exist.
+ sum_values = flow_db.field_values_in_order("all", 1)
+ in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(1)")]
+ self.assertEqual(len(in_ports), 1)
+ self.assertEqual(in_ports[0].packets, 1)
+ self.assertEqual(in_ports[0].bytes, 120)
+ self.assertEqual(in_ports[0].count, 1)
+
+ # simulate another sample
+ # Test two different flows exist.
+ flow_db.begin()
+ flow_db.flow_line_add(lines[1])
+ sum_values = flow_db.field_values_in_order("all", 1)
+ in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(1)")]
+ self.assertEqual(len(in_ports), 1)
+ self.assertEqual(in_ports[0].packets, 1)
+ self.assertEqual(in_ports[0].bytes, 120)
+ self.assertEqual(in_ports[0].count, 1)
+
+ in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(2)")]
+ self.assertEqual(len(in_ports), 1)
+ self.assertEqual(in_ports[0].packets, 2)
+ self.assertEqual(in_ports[0].bytes, 126)
+ self.assertEqual(in_ports[0].count, 1)
+
+ # Test first flow increments packets.
+ flow_db.begin()
+ flow_db.flow_line_add(lines[2])
+ sum_values = flow_db.field_values_in_order("all", 1)
+ in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(1)")]
+ self.assertEqual(len(in_ports), 1)
+ self.assertEqual(in_ports[0].packets, 2)
+ self.assertEqual(in_ports[0].bytes, 240)
+ self.assertEqual(in_ports[0].count, 1)
+
+ in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(2)")]
+ self.assertEqual(len(in_ports), 1)
+ self.assertEqual(in_ports[0].packets, 2)
+ self.assertEqual(in_ports[0].bytes, 126)
+ self.assertEqual(in_ports[0].count, 1)
+
+ # Test third flow but with the same in_port(1) as the first flow.
+ flow_db.begin()
+ flow_db.flow_line_add(lines[3])
+ sum_values = flow_db.field_values_in_order("all", 1)
+ in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(1)")]
+ self.assertEqual(len(in_ports), 1)
+ self.assertEqual(in_ports[0].packets, 3)
+ self.assertEqual(in_ports[0].bytes, 360)
+ self.assertEqual(in_ports[0].count, 2)
+
+ in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(2)")]
+ self.assertEqual(len(in_ports), 1)
+ self.assertEqual(in_ports[0].packets, 2)
+ self.assertEqual(in_ports[0].bytes, 126)
+ self.assertEqual(in_ports[0].count, 1)
+
+ # Third flow has changes.
+ flow_db.begin()
+ flow_db.flow_line_add(lines[4])
+ sum_values = flow_db.field_values_in_order("all", 1)
+ in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(1)")]
+ self.assertEqual(len(in_ports), 1)
+ self.assertEqual(in_ports[0].packets, 4)
+ self.assertEqual(in_ports[0].bytes, 480)
+ self.assertEqual(in_ports[0].count, 2)
+
+ in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(2)")]
+ self.assertEqual(len(in_ports), 1)
+ self.assertEqual(in_ports[0].packets, 2)
+ self.assertEqual(in_ports[0].bytes, 126)
+ self.assertEqual(in_ports[0].count, 1)
+
+ # First flow reset.
+ flow_db.begin()
+ flow_db.flow_line_add(lines[5])
+ sum_values = flow_db.field_values_in_order("all", 1)
+ in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(1)")]
+ self.assertEqual(len(in_ports), 1)
+ self.assertEqual(in_ports[0].packets, 3)
+ self.assertEqual(in_ports[0].bytes, 360)
+ self.assertEqual(in_ports[0].count, 2)
+
+ in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(2)")]
+ self.assertEqual(len(in_ports), 1)
+ self.assertEqual(in_ports[0].packets, 2)
+ self.assertEqual(in_ports[0].bytes, 126)
+ self.assertEqual(in_ports[0].count, 1)
+
+ def test_parse_character_errors(self):
+ """ test_parsing errors.
+ The flow parses is purposely loose. Its not designed to validate
+ input. Merely pull out what it can but there are situations
+ that a parse error can be detected.
+ """
+
+ lines = ["complete garbage",
+ "in_port(2),eth(src=68:ef:bd:25:ef:c0,"
+ "dst=33:33:00:00:00:66),"
+ "eth_type(0x86dd),ipv6(src=fe80::6aef:bdff:fe25:efc0/::,"
+ "dst=ff02::66/::,label=0/0,proto=17/0xff,tclass=0xe0/0,"
+ "hlimit=255/0,frag=no/0),udp(src=2029,dst=2029),"
+ "packets:2,bytes:5026,actions:1"]
+
+ flow_db = FlowDB(False)
+ flow_db.begin()
+ for line in lines:
+ try:
+ flow_db.flow_line_add(line)
+ except ValueError:
+ # We want an exception. That is how we know we have
+ # correctly found a simple parsing error. We are not
+ # looking to validate flow output just catch simple issues.
+ continue
+ self.assertTrue(False)
+
+ def test_tunnel_parsing(self):
+ """ test_tunnel_parsing test parse flows with tunnel. """
+ lines = [
+ "tunnel(tun_id=0x0,src=192.168.1.1,dst=192.168.1.10,"
+ "tos=0x0,ttl=64,flags(key)),in_port(1),"
+ "eth(src=9e:40:f5:ef:ec:ee,dst=01:23:20:00:00:30),"
+ "eth_type(0x8902), packets:6, bytes:534, used:0.128s, "
+ "actions:userspace(pid=4294962691,slow_path(cfm))"
+ ]
+ flow_db = FlowDB(False)
+ flow_db.begin()
+ flow_db.flow_line_add(lines[0])
+ sum_values = flow_db.field_values_in_order("all", 1)
+ in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(1)")]
+ self.assertEqual(len(in_ports), 1)
+ self.assertEqual(in_ports[0].packets, 6)
+ self.assertEqual(in_ports[0].bytes, 534)
+ self.assertEqual(in_ports[0].count, 1)
+
+ def test_flow_multiple_paren(self):
+ """ test_flow_multiple_paren. """
+ line = "tunnel(tun_id=0x0,src=192.168.1.1,flags(key)),in_port(2)"
+ valid = ["tunnel(tun_id=0x0,src=192.168.1.1,flags(key))",
+ "in_port(2)"]
+ rc = flow_line_iter(line)
+ self.assertEqual(valid, rc)
+
+ def test_to_network(self):
+ """ test_to_network test ipv4_to_network and ipv6_to_network. """
+ ipv4s = [
+ ("192.168.0.1", "192.168.0.1"),
+ ("192.168.0.1/255.255.255.255", "192.168.0.1"),
+ ("192.168.0.1/255.255.255.0", "192.168.0.0"),
+ ("192.168.0.1/255.255.0.0", "192.168.0.0"),
+ ("192.168.0.1/255.0.0.0", "192.0.0.0"),
+ ("192.168.0.1/0.0.0.0", "0.0.0.0"),
+ ("10.24.106.230/255.255.255.255", "10.24.106.230"),
+ ("10.24.106.230/255.255.255.0", "10.24.106.0"),
+ ("10.24.106.0/255.255.255.0", "10.24.106.0"),
+ ("10.24.106.0/255.255.252.0", "10.24.104.0")
+ ]
+
+ ipv6s = [
+ ("1::192:168:0:1", "1::192:168:0:1"),
+ ("1::192:168:0:1/1::ffff:ffff:ffff:ffff", "1::192:168:0:1"),
+ ("1::192:168:0:1/1::ffff:ffff:ffff:0", "1::192:168:0:0"),
+ ("1::192:168:0:1/1::ffff:ffff:0:0", "1::192:168:0:0"),
+ ("1::192:168:0:1/1::ffff:0:0:0", "1::192:0:0:0"),
+ ("1::192:168:0:1/1::0:0:0:0", "1::"),
+ ("1::192:168:0:1/::", "::")
+ ]
+
+ for (ipv4_test, ipv4_check) in ipv4s:
+ self.assertEqual(ipv4_to_network(ipv4_test), ipv4_check)
+
+ for (ipv6_test, ipv6_check) in ipv6s:
+ self.assertEqual(ipv6_to_network(ipv6_test), ipv6_check)