aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBen Pfaff <blp@nicira.com>2010-01-25 10:52:28 -0800
committerBen Pfaff <blp@nicira.com>2010-01-25 10:52:28 -0800
commit49c36903d6d65bed96cba31f05534510a21a68d7 (patch)
treeffd8b53b6da72e0bc9aa7265eb296cd43669e57d
parentb3080599f6b280c63b9b6f4ca2d3c6006bcd9590 (diff)
parent56fd8edf80b6098289f9ddd94a6a4be3be648472 (diff)
Merge "sflow" into "master".
No conflicts, but lib/dpif.c needed a few changes since struct dpif's member "class" was renamed to "dpif_class" in master since sflow was branched off.
-rw-r--r--COPYING4
-rw-r--r--README7
-rw-r--r--acinclude.m410
-rw-r--r--configure.ac1
-rw-r--r--datapath/actions.c35
-rw-r--r--datapath/datapath.c17
-rw-r--r--datapath/datapath.h60
-rw-r--r--include/openvswitch/datapath-protocol.h57
-rw-r--r--lib/automake.mk13
-rw-r--r--lib/dpif-linux.c21
-rw-r--r--lib/dpif-netdev.c2
-rw-r--r--lib/dpif-provider.h21
-rw-r--r--lib/dpif.c40
-rw-r--r--lib/dpif.h2
-rw-r--r--lib/netdev.c30
-rw-r--r--lib/netdev.h2
-rw-r--r--lib/sflow.h548
-rw-r--r--lib/sflow_agent.c492
-rw-r--r--lib/sflow_api.h340
-rw-r--r--lib/sflow_poller.c142
-rw-r--r--lib/sflow_receiver.c832
-rw-r--r--lib/sflow_sampler.c183
-rw-r--r--lib/vlog-modules.def1
-rw-r--r--ofproto/automake.mk2
-rw-r--r--ofproto/collectors.c6
-rw-r--r--ofproto/collectors.h2
-rw-r--r--ofproto/ofproto-sflow.c607
-rw-r--r--ofproto/ofproto-sflow.h46
-rw-r--r--ofproto/ofproto.c115
-rw-r--r--ofproto/ofproto.h16
-rw-r--r--utilities/automake.mk1
-rw-r--r--vswitchd/automake.mk1
-rw-r--r--vswitchd/bridge.c40
-rw-r--r--vswitchd/ovs-vswitchd.8.in3
-rw-r--r--vswitchd/ovs-vswitchd.conf.5.in34
35 files changed, 3683 insertions, 50 deletions
diff --git a/COPYING b/COPYING
index 134f02b0..375efeca 100644
--- a/COPYING
+++ b/COPYING
@@ -22,3 +22,7 @@ Public License, version 2.
Files under the xenserver directory are licensed on a file-by-file
basis. Some files are under an uncertain license that may not be
DFSG-compliant or GPL-compatible. Refer to each file for details.
+
+Files lib/sflow*.[ch] are licensed under the terms of the InMon sFlow
+licence that is available at:
+ http://www.inmon.com/technology/sflowlicense.txt
diff --git a/README b/README
index 7871c763..a0c9a2e1 100644
--- a/README
+++ b/README
@@ -6,8 +6,8 @@ What is Open vSwitch?
Open vSwitch is a multilayer software switch licensed under the open
source Apache 2 license. Our goal is to implement a production
quality switch platform that supports standard management interfaces
-(e.g. NetFlow, RSPAN, ERSPAN, IOS-like CLI), and opens the forwarding
-functions to programmatic extension and control.
+(e.g. NetFlow, sFlow(R), RSPAN, ERSPAN, IOS-like CLI), and opens the
+forwarding functions to programmatic extension and control.
Open vSwitch is well suited to function as a virtual switch in VM
environments. In addition to exposing standard control and visibility
@@ -20,7 +20,8 @@ The bulk of the code is written in platform-independent C and is
easily ported to other environments. The current release of Open
vSwitch supports the following features:
- * Visibility into inter-VM communication via NetFlow, SPAN, and RSPAN
+ * Visibility into inter-VM communication via NetFlow, sFlow, SPAN,
+ and RSPAN
* Standard 802.1Q VLAN model with trunking
* Per VM policing
* NIC bonding with source-MAC load balancing
diff --git a/acinclude.m4 b/acinclude.m4
index e37a316d..e074c7d4 100644
--- a/acinclude.m4
+++ b/acinclude.m4
@@ -235,4 +235,14 @@ dnl Example: OVS_ENABLE_OPTION([-Wdeclaration-after-statement])
AC_DEFUN([OVS_ENABLE_OPTION],
[OVS_CHECK_CC_OPTION([$1], [WARNING_FLAGS="$WARNING_FLAGS $1"])
AC_SUBST([WARNING_FLAGS])])
+
+dnl OVS_CONDITIONAL_CC_OPTION([OPTION], [CONDITIONAL])
+dnl Check whether the given C compiler OPTION is accepted.
+dnl If so, enable the given Automake CONDITIONAL.
+
+dnl Example: OVS_CONDITIONAL_CC_OPTION([-Wno-unused], [HAVE_WNO_UNUSED])
+AC_DEFUN([OVS_CONDITIONAL_CC_OPTION],
+ [OVS_CHECK_CC_OPTION(
+ [$1], [ovs_have_cc_option=yes], [ovs_have_cc_option=no])
+ AM_CONDITIONAL([$2], [test $ovs_have_cc_option = yes])])
dnl ----------------------------------------------------------------------
diff --git a/configure.ac b/configure.ac
index 1e56c462..5b0b3c65 100644
--- a/configure.ac
+++ b/configure.ac
@@ -75,6 +75,7 @@ OVS_ENABLE_OPTION([-Wold-style-definition])
OVS_ENABLE_OPTION([-Wmissing-prototypes])
OVS_ENABLE_OPTION([-Wmissing-field-initializers])
OVS_ENABLE_OPTION([-Wno-override-init])
+OVS_CONDITIONAL_CC_OPTION([-Wno-unused], [HAVE_WNO_UNUSED])
AC_ARG_VAR(KARCH, [Kernel Architecture String])
AC_SUBST(KARCH)
diff --git a/datapath/actions.c b/datapath/actions.c
index cadab05f..8b32de47 100644
--- a/datapath/actions.c
+++ b/datapath/actions.c
@@ -1,6 +1,6 @@
/*
* Distributed under the terms of the GNU GPL version 2.
- * Copyright (c) 2007, 2008, 2009 Nicira Networks.
+ * Copyright (c) 2007, 2008, 2009, 2010 Nicira Networks.
*
* Significant portions of this file may be copied from parts of the Linux
* kernel, by Linus Torvalds and others.
@@ -366,6 +366,28 @@ output_control(struct datapath *dp, struct sk_buff *skb, u32 arg, gfp_t gfp)
return dp_output_control(dp, skb, _ODPL_ACTION_NR, arg);
}
+/* Send a copy of this packet up to the sFlow agent, along with extra
+ * information about what happened to it. */
+static void sflow_sample(struct datapath *dp, struct sk_buff *skb,
+ const union odp_action *a, int n_actions,
+ gfp_t gfp, struct net_bridge_port *nbp)
+{
+ struct odp_sflow_sample_header *hdr;
+ unsigned int actlen = n_actions * sizeof(union odp_action);
+ unsigned int hdrlen = sizeof(struct odp_sflow_sample_header);
+ struct sk_buff *nskb;
+
+ nskb = skb_copy_expand(skb, actlen + hdrlen, 0, gfp);
+ if (!nskb)
+ return;
+
+ memcpy(__skb_push(nskb, actlen), a, actlen);
+ hdr = (struct odp_sflow_sample_header*)__skb_push(nskb, hdrlen);
+ hdr->n_actions = n_actions;
+ hdr->sample_pool = atomic_read(&nbp->sflow_pool);
+ dp_output_control(dp, nskb, _ODPL_SFLOW_NR, 0);
+}
+
/* Execute a list of actions against 'skb'. */
int execute_actions(struct datapath *dp, struct sk_buff *skb,
struct odp_flow_key *key,
@@ -378,6 +400,17 @@ int execute_actions(struct datapath *dp, struct sk_buff *skb,
* is slightly obscure just to avoid that. */
int prev_port = -1;
int err;
+
+ if (dp->sflow_probability) {
+ struct net_bridge_port *p = skb->dev->br_port;
+ if (p) {
+ atomic_inc(&p->sflow_pool);
+ if (dp->sflow_probability == UINT_MAX ||
+ net_random() < dp->sflow_probability)
+ sflow_sample(dp, skb, a, n_actions, gfp, p);
+ }
+ }
+
for (; n_actions > 0; a++, n_actions--) {
WARN_ON_ONCE(skb_shared(skb));
if (prev_port != -1) {
diff --git a/datapath/datapath.c b/datapath/datapath.c
index 12798958..ba363fb7 100644
--- a/datapath/datapath.c
+++ b/datapath/datapath.c
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2007, 2008, 2009 Nicira Networks.
+ * Copyright (c) 2007, 2008, 2009, 2010 Nicira Networks.
* Distributed under the terms of the GNU GPL version 2.
*
* Significant portions of this file may be copied from parts of the Linux
@@ -349,6 +349,7 @@ static int new_nbp(struct datapath *dp, struct net_device *dev, int port_no)
p->port_no = port_no;
p->dp = dp;
p->dev = dev;
+ atomic_set(&p->sflow_pool, 0);
if (!is_dp_dev(dev))
rcu_assign_pointer(dev->br_port, p);
else {
@@ -713,8 +714,7 @@ dp_output_control(struct datapath *dp, struct sk_buff *skb, int queue_no,
int err;
WARN_ON_ONCE(skb_shared(skb));
- BUG_ON(queue_no != _ODPL_MISS_NR && queue_no != _ODPL_ACTION_NR);
-
+ BUG_ON(queue_no != _ODPL_MISS_NR && queue_no != _ODPL_ACTION_NR && queue_no != _ODPL_SFLOW_NR);
queue = &dp->queues[queue_no];
err = -ENOBUFS;
if (skb_queue_len(queue) >= DP_MAX_QUEUE_LEN)
@@ -1391,6 +1391,7 @@ static long openvswitch_ioctl(struct file *f, unsigned int cmd,
int dp_idx = iminor(f->f_dentry->d_inode);
struct datapath *dp;
int drop_frags, listeners, port_no;
+ unsigned int sflow_probability;
int err;
/* Handle commands with special locking requirements up front. */
@@ -1454,6 +1455,16 @@ static long openvswitch_ioctl(struct file *f, unsigned int cmd,
set_listen_mask(f, listeners);
break;
+ case ODP_GET_SFLOW_PROBABILITY:
+ err = put_user(dp->sflow_probability, (unsigned int __user *)argp);
+ break;
+
+ case ODP_SET_SFLOW_PROBABILITY:
+ err = get_user(sflow_probability, (unsigned int __user *)argp);
+ if (!err)
+ dp->sflow_probability = sflow_probability;
+ break;
+
case ODP_PORT_QUERY:
err = query_port(dp, (struct odp_port __user *)argp);
break;
diff --git a/datapath/datapath.h b/datapath/datapath.h
index 643c91ac..3b5a67b1 100644
--- a/datapath/datapath.h
+++ b/datapath/datapath.h
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2009 Nicira Networks.
+ * Copyright (c) 2009, 2010 Nicira Networks.
* Distributed under the terms of the GNU GPL version 2.
*
* Significant portions of this file may be copied from parts of the Linux
@@ -79,9 +79,22 @@ struct dp_bucket {
struct sw_flow *flows[];
};
-#define DP_N_QUEUES 2
+#define DP_N_QUEUES 3
#define DP_MAX_QUEUE_LEN 100
+/**
+ * struct dp_stats_percpu - per-cpu packet processing statistics for a given
+ * datapath.
+ * @n_frags: Number of IP fragments processed by datapath.
+ * @n_hit: Number of received packets for which a matching flow was found in
+ * the flow table.
+ * @n_miss: Number of received packets that had no matching flow in the flow
+ * table. The sum of @n_hit and @n_miss is the number of packets that have
+ * been received by the datapath.
+ * @n_lost: Number of received packets that had no matching flow in the flow
+ * table that could not be sent to userspace (normally due to an overflow in
+ * one of the datapath's queues).
+ */
struct dp_stats_percpu {
u64 n_frags;
u64 n_hit;
@@ -95,10 +108,29 @@ struct dp_port_group {
u16 ports[];
};
+/**
+ * struct datapath - datapath for flow-based packet switching
+ * @mutex: Mutual exclusion for ioctls.
+ * @dp_idx: Datapath number (index into the dps[] array in datapath.c).
+ * @ifobj: Represents /sys/class/net/<devname>/brif.
+ * @drop_frags: Drop all IP fragments if nonzero.
+ * @queues: %DP_N_QUEUES sets of queued packets for userspace to handle.
+ * @waitqueue: Waitqueue, for waiting for new packets in @queues.
+ * @n_flows: Number of flows currently in flow table.
+ * @table: Current flow table (RCU protected).
+ * @groups: Port groups, used by ODPAT_OUTPUT_GROUP action (RCU protected).
+ * @n_ports: Number of ports currently in @ports.
+ * @ports: Map from port number to &struct net_bridge_port. %ODPP_LOCAL port
+ * always exists, other ports may be %NULL.
+ * @port_list: List of all ports in @ports in arbitrary order.
+ * @stats_percpu: Per-CPU datapath statistics.
+ * @sflow_probability: Number of packets out of UINT_MAX to sample to the
+ * %ODPL_SFLOW queue, e.g. (@sflow_probability/UINT_MAX) is the probability of
+ * sampling a given packet.
+ */
struct datapath {
struct mutex mutex;
int dp_idx;
-
struct kobject ifobj;
int drop_frags;
@@ -117,19 +149,37 @@ struct datapath {
/* Switch ports. */
unsigned int n_ports;
struct net_bridge_port *ports[DP_MAX_PORTS];
- struct list_head port_list; /* All ports, including local_port. */
+ struct list_head port_list;
/* Stats. */
struct dp_stats_percpu *stats_percpu;
+
+ /* sFlow Sampling */
+ unsigned int sflow_probability;
};
+/**
+ * struct net_bridge_port - one port within a datapath
+ * @port_no: Index into @dp's @ports array.
+ * @dp: Datapath to which this port belongs.
+ * @dev: The network device attached to this port. The @br_port member in @dev
+ * points back to this &struct net_bridge_port.
+ * @kobj: Represents /sys/class/net/<devname>/brport.
+ * @linkname: The name of the link from /sys/class/net/<datapath>/brif to this
+ * &struct net_bridge_port. (We keep this around so that we can delete it
+ * if @dev gets renamed.) Set to the null string when no link exists.
+ * @node: Element in @dp's @port_list.
+ * @sflow_pool: Number of packets that were candidates for sFlow sampling,
+ * regardless of whether they were actually chosen and sent down to userspace.
+ */
struct net_bridge_port {
u16 port_no;
struct datapath *dp;
struct net_device *dev;
struct kobject kobj;
char linkname[IFNAMSIZ];
- struct list_head node; /* Element in datapath.ports. */
+ struct list_head node;
+ atomic_t sflow_pool;
};
extern struct notifier_block dp_device_notifier;
diff --git a/include/openvswitch/datapath-protocol.h b/include/openvswitch/datapath-protocol.h
index ab7eb9e3..b079f529 100644
--- a/include/openvswitch/datapath-protocol.h
+++ b/include/openvswitch/datapath-protocol.h
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2009 Nicira Networks.
+ * Copyright (c) 2009, 2010 Nicira Networks.
*
* This file is offered under your choice of two licenses: Apache 2.0 or GNU
* GPL 2.0 or later. The permission statements for each of these licenses is
@@ -77,6 +77,9 @@
#define ODP_EXECUTE _IOR('O', 18, struct odp_execute)
+#define ODP_SET_SFLOW_PROBABILITY _IOR('O', 19, int)
+#define ODP_GET_SFLOW_PROBABILITY _IOW('O', 20, int)
+
struct odp_stats {
/* Flows. */
__u32 n_flows; /* Number of flows in flow table. */
@@ -98,6 +101,7 @@ struct odp_stats {
/* Queues. */
__u16 max_miss_queue; /* Max length of ODPL_MISS queue. */
__u16 max_action_queue; /* Max length of ODPL_ACTION queue. */
+ __u16 max_sflow_queue; /* Max length of ODPL_SFLOW queue. */
};
/* Logical ports. */
@@ -109,16 +113,51 @@ struct odp_stats {
#define ODPL_MISS (1 << _ODPL_MISS_NR)
#define _ODPL_ACTION_NR 1 /* Packet output to ODPP_CONTROLLER. */
#define ODPL_ACTION (1 << _ODPL_ACTION_NR)
-#define ODPL_ALL (ODPL_MISS | ODPL_ACTION)
-
-/* Format of messages read from datapath fd. */
+#define _ODPL_SFLOW_NR 2 /* sFlow samples. */
+#define ODPL_SFLOW (1 << _ODPL_SFLOW_NR)
+#define ODPL_ALL (ODPL_MISS | ODPL_ACTION | ODPL_SFLOW)
+
+/**
+ * struct odp_msg - format of messages read from datapath fd.
+ * @type: One of the %_ODPL_* constants.
+ * @length: Total length of message, including this header.
+ * @port: Port that received the packet embedded in this message.
+ * @reserved: Not currently used. Should be set to 0.
+ * @arg: Argument value whose meaning depends on @type.
+ *
+ * For @type == %_ODPL_MISS_NR, the header is followed by packet data. The
+ * @arg member is unused and set to 0.
+ *
+ * For @type == %_ODPL_ACTION_NR, the header is followed by packet data. The
+ * @arg member is copied from the &struct odp_action_controller that caused
+ * the &struct odp_msg to be composed.
+ *
+ * For @type == %_ODPL_SFLOW_NR, the header is followed by &struct
+ * odp_sflow_sample_header, then by an array of &union odp_action (the number
+ * of which is specified in &struct odp_sflow_sample_header), then by packet
+ * data.
+ */
struct odp_msg {
- __u32 type; /* _ODPL_MISS_NR or _ODPL_ACTION_NR. */
- __u32 length; /* Message length, including header. */
- __u16 port; /* Port on which frame was received. */
+ __u32 type;
+ __u32 length;
+ __u16 port;
__u16 reserved;
- __u32 arg; /* Argument value specified in action. */
- /* Followed by packet data. */
+ __u32 arg;
+};
+
+/**
+ * struct odp_sflow_sample_header - header added to sFlow sampled packet.
+ * @sample_pool: Number of packets that were candidates for sFlow sampling,
+ * regardless of whether they were actually chosen and sent down to userspace.
+ * @n_actions: Number of "union odp_action"s immediately following this header.
+ *
+ * This header follows &struct odp_msg when that structure's @type is
+ * %_ODPL_SFLOW_NR, and it is itself followed by an array of &union odp_action
+ * (the number of which is specified in @n_actions) and then by packet data.
+ */
+struct odp_sflow_sample_header {
+ __u32 sample_pool;
+ __u32 n_actions;
};
#define ODP_PORT_INTERNAL (1 << 0) /* This port is simulated. */
diff --git a/lib/automake.mk b/lib/automake.mk
index 5265eb77..67100fc1 100644
--- a/lib/automake.mk
+++ b/lib/automake.mk
@@ -126,6 +126,19 @@ nodist_lib_libopenvswitch_a_SOURCES = \
lib/dirs.c
CLEANFILES += $(nodist_lib_libopenvswitch_a_SOURCES)
+noinst_LIBRARIES += lib/libsflow.a
+lib_libsflow_a_SOURCES = \
+ lib/sflow_api.h \
+ lib/sflow.h \
+ lib/sflow_agent.c \
+ lib/sflow_sampler.c \
+ lib/sflow_poller.c \
+ lib/sflow_receiver.c
+lib_libsflow_a_CFLAGS = $(AM_CFLAGS)
+if HAVE_WNO_UNUSED
+lib_libsflow_a_CFLAGS += -Wno-unused
+endif
+
if HAVE_NETLINK
lib_libopenvswitch_a_SOURCES += \
lib/netlink-protocol.h \
diff --git a/lib/dpif-linux.c b/lib/dpif-linux.c
index 2bf329f4..e7d43811 100644
--- a/lib/dpif-linux.c
+++ b/lib/dpif-linux.c
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2008, 2009 Nicira Networks.
+ * Copyright (c) 2008, 2009, 2010 Nicira Networks.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -196,6 +196,7 @@ dpif_linux_delete(struct dpif *dpif_)
static int
dpif_linux_get_stats(const struct dpif *dpif_, struct odp_stats *stats)
{
+ memset(stats, 0, sizeof *stats);
return do_ioctl(dpif_, ODP_DP_STATS, stats);
}
@@ -396,6 +397,19 @@ dpif_linux_recv_set_mask(struct dpif *dpif_, int listen_mask)
}
static int
+dpif_linux_get_sflow_probability(const struct dpif *dpif_,
+ uint32_t *probability)
+{
+ return do_ioctl(dpif_, ODP_GET_SFLOW_PROBABILITY, probability);
+}
+
+static int
+dpif_linux_set_sflow_probability(struct dpif *dpif_, uint32_t probability)
+{
+ return do_ioctl(dpif_, ODP_SET_SFLOW_PROBABILITY, &probability);
+}
+
+static int
dpif_linux_recv(struct dpif *dpif_, struct ofpbuf **bufp)
{
struct dpif_linux *dpif = dpif_linux_cast(dpif_);
@@ -475,6 +489,8 @@ const struct dpif_class dpif_linux_class = {
dpif_linux_execute,
dpif_linux_recv_get_mask,
dpif_linux_recv_set_mask,
+ dpif_linux_get_sflow_probability,
+ dpif_linux_set_sflow_probability,
dpif_linux_recv,
dpif_linux_recv_wait,
};
@@ -555,13 +571,14 @@ make_openvswitch_device(int minor, char **fnp)
struct stat s;
char fn[128];
+ *fnp = NULL;
+
major = get_openvswitch_major();
if (major < 0) {
return -major;
}
dev = makedev(major, minor);
- *fnp = NULL;
sprintf(fn, "%s/dp%d", dirname, minor);
if (!stat(fn, &s)) {
if (!S_ISCHR(s.st_mode)) {
diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
index c4b5a994..720e8cb0 100644
--- a/lib/dpif-netdev.c
+++ b/lib/dpif-netdev.c
@@ -1333,6 +1333,8 @@ const struct dpif_class dpif_netdev_class = {
dpif_netdev_execute,
dpif_netdev_recv_get_mask,
dpif_netdev_recv_set_mask,
+ NULL, /* get_sflow_probability */
+ NULL, /* set_sflow_probability */
dpif_netdev_recv,
dpif_netdev_recv_wait,
};
diff --git a/lib/dpif-provider.h b/lib/dpif-provider.h
index 1d41eafd..39c66e11 100644
--- a/lib/dpif-provider.h
+++ b/lib/dpif-provider.h
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2009 Nicira Networks.
+ * Copyright (c) 2009, 2010 Nicira Networks.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -282,6 +282,25 @@ struct dpif_class {
* corresponding type when it calls the recv member function. */
int (*recv_set_mask)(struct dpif *dpif, int listen_mask);
+ /* Retrieves 'dpif''s sFlow sampling probability into '*probability'.
+ * Return value is 0 or a positive errno value. EOPNOTSUPP indicates that
+ * the datapath does not support sFlow, as does a null pointer.
+ *
+ * '*probability' is expressed as the number of packets out of UINT_MAX to
+ * sample, e.g. probability/UINT_MAX is the probability of sampling a given
+ * packet. */
+ int (*get_sflow_probability)(const struct dpif *dpif,
+ uint32_t *probability);
+
+ /* Sets 'dpif''s sFlow sampling probability to 'probability'. Return value
+ * is 0 or a positive errno value. EOPNOTSUPP indicates that the datapath
+ * does not support sFlow, as does a null pointer.
+ *
+ * 'probability' is expressed as the number of packets out of UINT_MAX to
+ * sample, e.g. probability/UINT_MAX is the probability of sampling a given
+ * packet. */
+ int (*set_sflow_probability)(struct dpif *dpif, uint32_t probability);
+
/* Attempts to receive a message from 'dpif'. If successful, stores the
* message into '*packetp'. The message, if one is received, must begin
* with 'struct odp_msg' as a header. Only messages of the types selected
diff --git a/lib/dpif.c b/lib/dpif.c
index 72184c84..7edaf31b 100644
--- a/lib/dpif.c
+++ b/lib/dpif.c
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2008, 2009 Nicira Networks.
+ * Copyright (c) 2008, 2009, 2010 Nicira Networks.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -845,6 +845,41 @@ dpif_recv_set_mask(struct dpif *dpif, int listen_mask)
return error;
}
+/* Retrieve the sFlow sampling probability. '*probability' is expressed as the
+ * number of packets out of UINT_MAX to sample, e.g. probability/UINT_MAX is
+ * the probability of sampling a given packet.
+ *
+ * Returns 0 if successful, otherwise a positive errno value. EOPNOTSUPP
+ * indicates that 'dpif' does not support sFlow sampling. */
+int
+dpif_get_sflow_probability(const struct dpif *dpif, uint32_t *probability)
+{
+ int error = (dpif->dpif_class->get_sflow_probability
+ ? dpif->dpif_class->get_sflow_probability(dpif, probability)
+ : EOPNOTSUPP);
+ if (error) {
+ *probability = 0;
+ }
+ log_operation(dpif, "get_sflow_probability", error);
+ return error;
+}
+
+/* Set the sFlow sampling probability. 'probability' is expressed as the
+ * number of packets out of UINT_MAX to sample, e.g. probability/UINT_MAX is
+ * the probability of sampling a given packet.
+ *
+ * Returns 0 if successful, otherwise a positive errno value. EOPNOTSUPP
+ * indicates that 'dpif' does not support sFlow sampling. */
+int
+dpif_set_sflow_probability(struct dpif *dpif, uint32_t probability)
+{
+ int error = (dpif->dpif_class->set_sflow_probability
+ ? dpif->dpif_class->set_sflow_probability(dpif, probability)
+ : EOPNOTSUPP);
+ log_operation(dpif, "set_sflow_probability", error);
+ return error;
+}
+
/* Attempts to receive a message from 'dpif'. If successful, stores the
* message into '*packetp'. The message, if one is received, will begin with
* 'struct odp_msg' as a header. Only messages of the types selected with
@@ -868,6 +903,7 @@ dpif_recv(struct dpif *dpif, struct ofpbuf **packetp)
"%zu on port %"PRIu16": %s", dpif_name(dpif),
(msg->type == _ODPL_MISS_NR ? "miss"
: msg->type == _ODPL_ACTION_NR ? "action"
+ : msg->type == _ODPL_SFLOW_NR ? "sFlow"
: "<unknown>"),
payload_len, msg->port, s);
free(s);
@@ -894,7 +930,7 @@ dpif_recv_purge(struct dpif *dpif)
return error;
}
- for (i = 0; i < stats.max_miss_queue + stats.max_action_queue; i++) {
+ for (i = 0; i < stats.max_miss_queue + stats.max_action_queue + stats.max_sflow_queue; i++) {
struct ofpbuf *buf;
error = dpif_recv(dpif, &buf);
if (error) {
diff --git a/lib/dpif.h b/lib/dpif.h
index 1d109c2d..bf3c6481 100644
--- a/lib/dpif.h
+++ b/lib/dpif.h
@@ -84,6 +84,8 @@ int dpif_execute(struct dpif *, uint16_t in_port,
int dpif_recv_get_mask(const struct dpif *, int *listen_mask);
int dpif_recv_set_mask(struct dpif *, int listen_mask);
+int dpif_get_sflow_probability(const struct dpif *, uint32_t *probability);
+int dpif_set_sflow_probability(struct dpif *, uint32_t probability);
int dpif_recv(struct dpif *, struct ofpbuf **);
int dpif_recv_purge(struct dpif *);
void dpif_recv_wait(struct dpif *);
diff --git a/lib/netdev.c b/lib/netdev.c
index f5089c1d..804050fc 100644
--- a/lib/netdev.c
+++ b/lib/netdev.c
@@ -31,6 +31,7 @@
#include "list.h"
#include "netdev-provider.h"
#include "ofpbuf.h"
+#include "openflow/openflow.h"
#include "packets.h"
#include "poll-loop.h"
#include "shash.h"
@@ -525,6 +526,35 @@ netdev_get_features(struct netdev *netdev,
return error;
}
+/* Returns the maximum speed of a network connection that has the "enum
+ * ofp_port_features" bits in 'features', in bits per second. If no bits that
+ * indicate a speed are set in 'features', assumes 100Mbps. */
+uint64_t
+netdev_features_to_bps(uint32_t features)
+{
+ enum {
+ F_10000MB = OFPPF_10GB_FD,
+ F_1000MB = OFPPF_1GB_HD | OFPPF_1GB_FD,
+ F_100MB = OFPPF_100MB_HD | OFPPF_100MB_FD,
+ F_10MB = OFPPF_10MB_HD | OFPPF_10MB_FD
+ };
+
+ return ( features & F_10000MB ? UINT64_C(10000000000)
+ : features & F_1000MB ? UINT64_C(1000000000)
+ : features & F_100MB ? UINT64_C(100000000)
+ : features & F_10MB ? UINT64_C(10000000)
+ : UINT64_C(100000000));
+}
+
+/* Returns true if any of the "enum ofp_port_features" bits that indicate a
+ * full-duplex link are set in 'features', otherwise false. */
+bool
+netdev_features_is_full_duplex(uint32_t features)
+{
+ return (features & (OFPPF_10MB_FD | OFPPF_100MB_FD | OFPPF_1GB_FD
+ | OFPPF_10GB_FD)) != 0;
+}
+
/* Set the features advertised by 'netdev' to 'advertise'. Returns 0 if
* successful, otherwise a positive errno value. */
int
diff --git a/lib/netdev.h b/lib/netdev.h
index b8c7dfb4..8060ddb9 100644
--- a/lib/netdev.h
+++ b/lib/netdev.h
@@ -112,6 +112,8 @@ int netdev_get_carrier(const struct netdev *, bool *carrier);
int netdev_get_features(struct netdev *,
uint32_t *current, uint32_t *advertised,
uint32_t *supported, uint32_t *peer);
+uint64_t netdev_features_to_bps(uint32_t features);
+bool netdev_features_is_full_duplex(uint32_t features);
int netdev_set_advertisements(struct netdev *, uint32_t advertise);
int netdev_get_in4(const struct netdev *, struct in_addr *address,
diff --git a/lib/sflow.h b/lib/sflow.h
new file mode 100644
index 00000000..397ae2da
--- /dev/null
+++ b/lib/sflow.h
@@ -0,0 +1,548 @@
+/* Copyright (c) 2002-2009 InMon Corp. Licensed under the terms of the InMon sFlow licence: */
+/* http://www.inmon.com/technology/sflowlicense.txt */
+
+#ifndef SFLOW_H
+#define SFLOW_H 1
+
+enum SFLAddress_type {
+ SFLADDRESSTYPE_IP_V4 = 1,
+ SFLADDRESSTYPE_IP_V6 = 2
+};
+
+typedef struct {
+ u_int32_t addr;
+} SFLIPv4;
+
+typedef struct {
+ u_char addr[16];
+} SFLIPv6;
+
+typedef union _SFLAddress_value {
+ SFLIPv4 ip_v4;
+ SFLIPv6 ip_v6;
+} SFLAddress_value;
+
+typedef struct _SFLAddress {
+ u_int32_t type; /* enum SFLAddress_type */
+ SFLAddress_value address;
+} SFLAddress;
+
+/* Packet header data */
+
+#define SFL_DEFAULT_HEADER_SIZE 128
+#define SFL_DEFAULT_COLLECTOR_PORT 6343
+#define SFL_DEFAULT_SAMPLING_RATE 400
+#define SFL_DEFAULT_POLLING_INTERVAL 30
+
+/* The header protocol describes the format of the sampled header */
+enum SFLHeader_protocol {
+ SFLHEADER_ETHERNET_ISO8023 = 1,
+ SFLHEADER_ISO88024_TOKENBUS = 2,
+ SFLHEADER_ISO88025_TOKENRING = 3,
+ SFLHEADER_FDDI = 4,
+ SFLHEADER_FRAME_RELAY = 5,
+ SFLHEADER_X25 = 6,
+ SFLHEADER_PPP = 7,
+ SFLHEADER_SMDS = 8,
+ SFLHEADER_AAL5 = 9,
+ SFLHEADER_AAL5_IP = 10, /* e.g. Cisco AAL5 mux */
+ SFLHEADER_IPv4 = 11,
+ SFLHEADER_IPv6 = 12,
+ SFLHEADER_MPLS = 13
+};
+
+/* raw sampled header */
+
+typedef struct _SFLSampled_header {
+ u_int32_t header_protocol; /* (enum SFLHeader_protocol) */
+ u_int32_t frame_length; /* Original length of packet before sampling */
+ u_int32_t stripped; /* header/trailer bytes stripped by sender */
+ u_int32_t header_length; /* length of sampled header bytes to follow */
+ u_int8_t *header_bytes; /* Header bytes */
+} SFLSampled_header;
+
+/* decoded ethernet header */
+
+typedef struct _SFLSampled_ethernet {
+ u_int32_t eth_len; /* The length of the MAC packet excluding
+ lower layer encapsulations */
+ u_int8_t src_mac[8]; /* 6 bytes + 2 pad */
+ u_int8_t dst_mac[8];
+ u_int32_t eth_type;
+} SFLSampled_ethernet;
+
+/* decoded IP version 4 header */
+
+typedef struct _SFLSampled_ipv4 {
+ u_int32_t length; /* The length of the IP packet
+ excluding lower layer encapsulations */
+ u_int32_t protocol; /* IP Protocol type (for example, TCP = 6, UDP = 17) */
+ SFLIPv4 src_ip; /* Source IP Address */
+ SFLIPv4 dst_ip; /* Destination IP Address */
+ u_int32_t src_port; /* TCP/UDP source port number or equivalent */
+ u_int32_t dst_port; /* TCP/UDP destination port number or equivalent */
+ u_int32_t tcp_flags; /* TCP flags */
+ u_int32_t tos; /* IP type of service */
+} SFLSampled_ipv4;
+
+/* decoded IP version 6 data */
+
+typedef struct _SFLSampled_ipv6 {
+ u_int32_t length; /* The length of the IP packet
+ excluding lower layer encapsulations */
+ u_int32_t protocol; /* IP Protocol type (for example, TCP = 6, UDP = 17) */
+ SFLIPv6 src_ip; /* Source IP Address */
+ SFLIPv6 dst_ip; /* Destination IP Address */
+ u_int32_t src_port; /* TCP/UDP source port number or equivalent */
+ u_int32_t dst_port; /* TCP/UDP destination port number or equivalent */
+ u_int32_t tcp_flags; /* TCP flags */
+ u_int32_t priority; /* IP priority */
+} SFLSampled_ipv6;
+
+/* Extended data types */
+
+/* Extended switch data */
+
+typedef struct _SFLExtended_switch {
+ u_int32_t src_vlan; /* The 802.1Q VLAN id of incomming frame */
+ u_int32_t src_priority; /* The 802.1p priority */
+ u_int32_t dst_vlan; /* The 802.1Q VLAN id of outgoing frame */
+ u_int32_t dst_priority; /* The 802.1p priority */
+} SFLExtended_switch;
+
+/* Extended router data */
+
+typedef struct _SFLExtended_router {
+ SFLAddress nexthop; /* IP address of next hop router */
+ u_int32_t src_mask; /* Source address prefix mask bits */
+ u_int32_t dst_mask; /* Destination address prefix mask bits */
+} SFLExtended_router;
+
+/* Extended gateway data */
+enum SFLExtended_as_path_segment_type {
+ SFLEXTENDED_AS_SET = 1, /* Unordered set of ASs */
+ SFLEXTENDED_AS_SEQUENCE = 2 /* Ordered sequence of ASs */
+};
+
+typedef struct _SFLExtended_as_path_segment {
+ u_int32_t type; /* enum SFLExtended_as_path_segment_type */
+ u_int32_t length; /* number of AS numbers in set/sequence */
+ union {
+ u_int32_t *set;
+ u_int32_t *seq;
+ } as;
+} SFLExtended_as_path_segment;
+
+typedef struct _SFLExtended_gateway {
+ SFLAddress nexthop; /* Address of the border router that should
+ be used for the destination network */
+ u_int32_t as; /* AS number for this gateway */
+ u_int32_t src_as; /* AS number of source (origin) */
+ u_int32_t src_peer_as; /* AS number of source peer */
+ u_int32_t dst_as_path_segments; /* number of segments in path */
+ SFLExtended_as_path_segment *dst_as_path; /* list of seqs or sets */
+ u_int32_t communities_length; /* number of communities */
+ u_int32_t *communities; /* set of communities */
+ u_int32_t localpref; /* LocalPref associated with this route */
+} SFLExtended_gateway;
+
+typedef struct _SFLString {
+ u_int32_t len;
+ char *str;
+} SFLString;
+
+/* Extended user data */
+
+typedef struct _SFLExtended_user {
+ u_int32_t src_charset; /* MIBEnum value of character set used to encode a string - See RFC 2978
+ Where possible UTF-8 encoding (MIBEnum=106) should be used. A value
+ of zero indicates an unknown encoding. */
+ SFLString src_user;
+ u_int32_t dst_charset;
+ SFLString dst_user;
+} SFLExtended_user;
+
+/* Extended URL data */
+
+enum SFLExtended_url_direction {
+ SFLEXTENDED_URL_SRC = 1, /* URL is associated with source address */
+ SFLEXTENDED_URL_DST = 2 /* URL is associated with destination address */
+};
+
+typedef struct _SFLExtended_url {
+ u_int32_t direction; /* enum SFLExtended_url_direction */
+ SFLString url; /* URL associated with the packet flow.
+ Must be URL encoded */
+ SFLString host; /* The host field from the HTTP header */
+} SFLExtended_url;
+
+/* Extended MPLS data */
+
+typedef struct _SFLLabelStack {
+ u_int32_t depth;
+ u_int32_t *stack; /* first entry is top of stack - see RFC 3032 for encoding */
+} SFLLabelStack;
+
+typedef struct _SFLExtended_mpls {
+ SFLAddress nextHop; /* Address of the next hop */
+ SFLLabelStack in_stack;
+ SFLLabelStack out_stack;
+} SFLExtended_mpls;
+
+/* Extended NAT data
+ Packet header records report addresses as seen at the sFlowDataSource.
+ The extended_nat structure reports on translated source and/or destination
+ addesses for this packet. If an address was not translated it should
+ be equal to that reported for the header. */
+
+typedef struct _SFLExtended_nat {
+ SFLAddress src; /* Source address */
+ SFLAddress dst; /* Destination address */
+} SFLExtended_nat;
+
+/* additional Extended MPLS stucts */
+
+typedef struct _SFLExtended_mpls_tunnel {
+ SFLString tunnel_lsp_name; /* Tunnel name */
+ u_int32_t tunnel_id; /* Tunnel ID */
+ u_int32_t tunnel_cos; /* Tunnel COS value */
+} SFLExtended_mpls_tunnel;
+
+typedef struct _SFLExtended_mpls_vc {
+ SFLString vc_instance_name; /* VC instance name */
+ u_int32_t vll_vc_id; /* VLL/VC instance ID */
+ u_int32_t vc_label_cos; /* VC Label COS value */
+} SFLExtended_mpls_vc;
+
+/* Extended MPLS FEC
+ - Definitions from MPLS-FTN-STD-MIB mplsFTNTable */
+
+typedef struct _SFLExtended_mpls_FTN {
+ SFLString mplsFTNDescr;
+ u_int32_t mplsFTNMask;
+} SFLExtended_mpls_FTN;
+
+/* Extended MPLS LVP FEC
+ - Definition from MPLS-LDP-STD-MIB mplsFecTable
+ Note: mplsFecAddrType, mplsFecAddr information available
+ from packet header */
+
+typedef struct _SFLExtended_mpls_LDP_FEC {
+ u_int32_t mplsFecAddrPrefixLength;
+} SFLExtended_mpls_LDP_FEC;
+
+/* Extended VLAN tunnel information
+ Record outer VLAN encapsulations that have
+ been stripped. extended_vlantunnel information
+ should only be reported if all the following conditions are satisfied:
+ 1. The packet has nested vlan tags, AND
+ 2. The reporting device is VLAN aware, AND
+ 3. One or more VLAN tags have been stripped, either
+ because they represent proprietary encapsulations, or
+ because switch hardware automatically strips the outer VLAN
+ encapsulation.
+ Reporting extended_vlantunnel information is not a substitute for
+ reporting extended_switch information. extended_switch data must
+ always be reported to describe the ingress/egress VLAN information
+ for the packet. The extended_vlantunnel information only applies to
+ nested VLAN tags, and then only when one or more tags has been
+ stripped. */
+
+typedef SFLLabelStack SFLVlanStack;
+typedef struct _SFLExtended_vlan_tunnel {
+ SFLVlanStack stack; /* List of stripped 802.1Q TPID/TCI layers. Each
+ TPID,TCI pair is represented as a single 32 bit
+ integer. Layers listed from outermost to
+ innermost. */
+} SFLExtended_vlan_tunnel;
+
+enum SFLFlow_type_tag {
+ /* enterprise = 0, format = ... */
+ SFLFLOW_HEADER = 1, /* Packet headers are sampled */
+ SFLFLOW_ETHERNET = 2, /* MAC layer information */
+ SFLFLOW_IPV4 = 3, /* IP version 4 data */
+ SFLFLOW_IPV6 = 4, /* IP version 6 data */
+ SFLFLOW_EX_SWITCH = 1001, /* Extended switch information */
+ SFLFLOW_EX_ROUTER = 1002, /* Extended router information */
+ SFLFLOW_EX_GATEWAY = 1003, /* Extended gateway router information */
+ SFLFLOW_EX_USER = 1004, /* Extended TACAS/RADIUS user information */
+ SFLFLOW_EX_URL = 1005, /* Extended URL information */
+ SFLFLOW_EX_MPLS = 1006, /* Extended MPLS information */
+ SFLFLOW_EX_NAT = 1007, /* Extended NAT information */
+ SFLFLOW_EX_MPLS_TUNNEL = 1008, /* additional MPLS information */
+ SFLFLOW_EX_MPLS_VC = 1009,
+ SFLFLOW_EX_MPLS_FTN = 1010,
+ SFLFLOW_EX_MPLS_LDP_FEC = 1011,
+ SFLFLOW_EX_VLAN_TUNNEL = 1012, /* VLAN stack */
+};
+
+typedef union _SFLFlow_type {
+ SFLSampled_header header;
+ SFLSampled_ethernet ethernet;
+ SFLSampled_ipv4 ipv4;
+ SFLSampled_ipv6 ipv6;
+ SFLExtended_switch sw;
+ SFLExtended_router router;
+ SFLExtended_gateway gateway;
+ SFLExtended_user user;
+ SFLExtended_url url;
+ SFLExtended_mpls mpls;
+ SFLExtended_nat nat;
+ SFLExtended_mpls_tunnel mpls_tunnel;
+ SFLExtended_mpls_vc mpls_vc;
+ SFLExtended_mpls_FTN mpls_ftn;
+ SFLExtended_mpls_LDP_FEC mpls_ldp_fec;
+ SFLExtended_vlan_tunnel vlan_tunnel;
+} SFLFlow_type;
+
+typedef struct _SFLFlow_sample_element {
+ struct _SFLFlow_sample_element *nxt;
+ u_int32_t tag; /* SFLFlow_type_tag */
+ u_int32_t length;
+ SFLFlow_type flowType;
+} SFLFlow_sample_element;
+
+enum SFL_sample_tag {
+ SFLFLOW_SAMPLE = 1, /* enterprise = 0 : format = 1 */
+ SFLCOUNTERS_SAMPLE = 2, /* enterprise = 0 : format = 2 */
+ SFLFLOW_SAMPLE_EXPANDED = 3, /* enterprise = 0 : format = 3 */
+ SFLCOUNTERS_SAMPLE_EXPANDED = 4 /* enterprise = 0 : format = 4 */
+};
+
+/* Format of a single flow sample */
+
+typedef struct _SFLFlow_sample {
+ /* u_int32_t tag; */ /* SFL_sample_tag -- enterprise = 0 : format = 1 */
+ /* u_int32_t length; */
+ u_int32_t sequence_number; /* Incremented with each flow sample
+ generated */
+ u_int32_t source_id; /* fsSourceId */
+ u_int32_t sampling_rate; /* fsPacketSamplingRate */
+ u_int32_t sample_pool; /* Total number of packets that could have been
+ sampled (i.e. packets skipped by sampling
+ process + total number of samples) */
+ u_int32_t drops; /* Number of times a packet was dropped due to
+ lack of resources */
+ u_int32_t input; /* SNMP ifIndex of input interface.
+ 0 if interface is not known. */
+ u_int32_t output; /* SNMP ifIndex of output interface,
+ 0 if interface is not known.
+ Set most significant bit to indicate
+ multiple destination interfaces
+ (i.e. in case of broadcast or multicast)
+ and set lower order bits to indicate
+ number of destination interfaces.
+ Examples:
+ 0x00000002 indicates ifIndex = 2
+ 0x00000000 ifIndex unknown.
+ 0x80000007 indicates a packet sent
+ to 7 interfaces.
+ 0x80000000 indicates a packet sent to
+ an unknown number of
+ interfaces greater than 1.*/
+ u_int32_t num_elements;
+ SFLFlow_sample_element *elements;
+} SFLFlow_sample;
+
+/* same thing, but the expanded version (for full 32-bit ifIndex numbers) */
+
+typedef struct _SFLFlow_sample_expanded {
+ /* u_int32_t tag; */ /* SFL_sample_tag -- enterprise = 0 : format = 1 */
+ /* u_int32_t length; */
+ u_int32_t sequence_number; /* Incremented with each flow sample
+ generated */
+ u_int32_t ds_class; /* EXPANDED */
+ u_int32_t ds_index; /* EXPANDED */
+ u_int32_t sampling_rate; /* fsPacketSamplingRate */
+ u_int32_t sample_pool; /* Total number of packets that could have been
+ sampled (i.e. packets skipped by sampling
+ process + total number of samples) */
+ u_int32_t drops; /* Number of times a packet was dropped due to
+ lack of resources */
+ u_int32_t inputFormat; /* EXPANDED */
+ u_int32_t input; /* SNMP ifIndex of input interface.
+ 0 if interface is not known. */
+ u_int32_t outputFormat; /* EXPANDED */
+ u_int32_t output; /* SNMP ifIndex of output interface,
+ 0 if interface is not known. */
+ u_int32_t num_elements;
+ SFLFlow_sample_element *elements;
+} SFLFlow_sample_expanded;
+
+/* Counter types */
+
+/* Generic interface counters - see RFC 1573, 2233 */
+
+typedef struct _SFLIf_counters {
+ u_int32_t ifIndex;
+ u_int32_t ifType;
+ u_int64_t ifSpeed;
+ u_int32_t ifDirection; /* Derived from MAU MIB (RFC 2668)
+ 0 = unknown, 1 = full-duplex,
+ 2 = half-duplex, 3 = in, 4 = out */
+ u_int32_t ifStatus; /* bit field with the following bits assigned:
+ bit 0 = ifAdminStatus (0 = down, 1 = up)
+ bit 1 = ifOperStatus (0 = down, 1 = up) */
+ u_int64_t ifInOctets;
+ u_int32_t ifInUcastPkts;
+ u_int32_t ifInMulticastPkts;
+ u_int32_t ifInBroadcastPkts;
+ u_int32_t ifInDiscards;
+ u_int32_t ifInErrors;
+ u_int32_t ifInUnknownProtos;
+ u_int64_t ifOutOctets;
+ u_int32_t ifOutUcastPkts;
+ u_int32_t ifOutMulticastPkts;
+ u_int32_t ifOutBroadcastPkts;
+ u_int32_t ifOutDiscards;
+ u_int32_t ifOutErrors;
+ u_int32_t ifPromiscuousMode;
+} SFLIf_counters;
+
+/* Ethernet interface counters - see RFC 2358 */
+typedef struct _SFLEthernet_counters {
+ u_int32_t dot3StatsAlignmentErrors;
+ u_int32_t dot3StatsFCSErrors;
+ u_int32_t dot3StatsSingleCollisionFrames;
+ u_int32_t dot3StatsMultipleCollisionFrames;
+ u_int32_t dot3StatsSQETestErrors;
+ u_int32_t dot3StatsDeferredTransmissions;
+ u_int32_t dot3StatsLateCollisions;
+ u_int32_t dot3StatsExcessiveCollisions;
+ u_int32_t dot3StatsInternalMacTransmitErrors;
+ u_int32_t dot3StatsCarrierSenseErrors;
+ u_int32_t dot3StatsFrameTooLongs;
+ u_int32_t dot3StatsInternalMacReceiveErrors;
+ u_int32_t dot3StatsSymbolErrors;
+} SFLEthernet_counters;
+
+/* Token ring counters - see RFC 1748 */
+
+typedef struct _SFLTokenring_counters {
+ u_int32_t dot5StatsLineErrors;
+ u_int32_t dot5StatsBurstErrors;
+ u_int32_t dot5StatsACErrors;
+ u_int32_t dot5StatsAbortTransErrors;
+ u_int32_t dot5StatsInternalErrors;
+ u_int32_t dot5StatsLostFrameErrors;
+ u_int32_t dot5StatsReceiveCongestions;
+ u_int32_t dot5StatsFrameCopiedErrors;
+ u_int32_t dot5StatsTokenErrors;
+ u_int32_t dot5StatsSoftErrors;
+ u_int32_t dot5StatsHardErrors;
+ u_int32_t dot5StatsSignalLoss;
+ u_int32_t dot5StatsTransmitBeacons;
+ u_int32_t dot5StatsRecoverys;
+ u_int32_t dot5StatsLobeWires;
+ u_int32_t dot5StatsRemoves;
+ u_int32_t dot5StatsSingles;
+ u_int32_t dot5StatsFreqErrors;
+} SFLTokenring_counters;
+
+/* 100 BaseVG interface counters - see RFC 2020 */
+
+typedef struct _SFLVg_counters {
+ u_int32_t dot12InHighPriorityFrames;
+ u_int64_t dot12InHighPriorityOctets;
+ u_int32_t dot12InNormPriorityFrames;
+ u_int64_t dot12InNormPriorityOctets;
+ u_int32_t dot12InIPMErrors;
+ u_int32_t dot12InOversizeFrameErrors;
+ u_int32_t dot12InDataErrors;
+ u_int32_t dot12InNullAddressedFrames;
+ u_int32_t dot12OutHighPriorityFrames;
+ u_int64_t dot12OutHighPriorityOctets;
+ u_int32_t dot12TransitionIntoTrainings;
+ u_int64_t dot12HCInHighPriorityOctets;
+ u_int64_t dot12HCInNormPriorityOctets;
+ u_int64_t dot12HCOutHighPriorityOctets;
+} SFLVg_counters;
+
+typedef struct _SFLVlan_counters {
+ u_int32_t vlan_id;
+ u_int64_t octets;
+ u_int32_t ucastPkts;
+ u_int32_t multicastPkts;
+ u_int32_t broadcastPkts;
+ u_int32_t discards;
+} SFLVlan_counters;
+
+/* Counters data */
+
+enum SFLCounters_type_tag {
+ /* enterprise = 0, format = ... */
+ SFLCOUNTERS_GENERIC = 1,
+ SFLCOUNTERS_ETHERNET = 2,
+ SFLCOUNTERS_TOKENRING = 3,
+ SFLCOUNTERS_VG = 4,
+ SFLCOUNTERS_VLAN = 5
+};
+
+typedef union _SFLCounters_type {
+ SFLIf_counters generic;
+ SFLEthernet_counters ethernet;
+ SFLTokenring_counters tokenring;
+ SFLVg_counters vg;
+ SFLVlan_counters vlan;
+} SFLCounters_type;
+
+typedef struct _SFLCounters_sample_element {
+ struct _SFLCounters_sample_element *nxt; /* linked list */
+ u_int32_t tag; /* SFLCounters_type_tag */
+ u_int32_t length;
+ SFLCounters_type counterBlock;
+} SFLCounters_sample_element;
+
+typedef struct _SFLCounters_sample {
+ /* u_int32_t tag; */ /* SFL_sample_tag -- enterprise = 0 : format = 2 */
+ /* u_int32_t length; */
+ u_int32_t sequence_number; /* Incremented with each counters sample
+ generated by this source_id */
+ u_int32_t source_id; /* fsSourceId */
+ u_int32_t num_elements;
+ SFLCounters_sample_element *elements;
+} SFLCounters_sample;
+
+/* same thing, but the expanded version, so ds_index can be a full 32 bits */
+typedef struct _SFLCounters_sample_expanded {
+ /* u_int32_t tag; */ /* SFL_sample_tag -- enterprise = 0 : format = 2 */
+ /* u_int32_t length; */
+ u_int32_t sequence_number; /* Incremented with each counters sample
+ generated by this source_id */
+ u_int32_t ds_class; /* EXPANDED */
+ u_int32_t ds_index; /* EXPANDED */
+ u_int32_t num_elements;
+ SFLCounters_sample_element *elements;
+} SFLCounters_sample_expanded;
+
+#define SFLADD_ELEMENT(_sm, _el) do { (_el)->nxt = (_sm)->elements; (_sm)->elements = (_el); } while(0)
+
+/* Format of a sample datagram */
+
+enum SFLDatagram_version {
+ SFLDATAGRAM_VERSION2 = 2,
+ SFLDATAGRAM_VERSION4 = 4,
+ SFLDATAGRAM_VERSION5 = 5
+};
+
+typedef struct _SFLSample_datagram_hdr {
+ u_int32_t datagram_version; /* (enum SFLDatagram_version) = VERSION5 = 5 */
+ SFLAddress agent_address; /* IP address of sampling agent */
+ u_int32_t sub_agent_id; /* Used to distinguishing between datagram
+ streams from separate agent sub entities
+ within an device. */
+ u_int32_t sequence_number; /* Incremented with each sample datagram
+ generated */
+ u_int32_t uptime; /* Current time (in milliseconds since device
+ last booted). Should be set as close to
+ datagram transmission time as possible.*/
+ u_int32_t num_records; /* Number of tag-len-val flow/counter records to follow */
+} SFLSample_datagram_hdr;
+
+#define SFL_MAX_DATAGRAM_SIZE 1500
+#define SFL_MIN_DATAGRAM_SIZE 200
+#define SFL_DEFAULT_DATAGRAM_SIZE 1400
+
+#define SFL_DATA_PAD 400
+
+#endif /* SFLOW_H */
diff --git a/lib/sflow_agent.c b/lib/sflow_agent.c
new file mode 100644
index 00000000..4b25c25a
--- /dev/null
+++ b/lib/sflow_agent.c
@@ -0,0 +1,492 @@
+/* Copyright (c) 2002-2009 InMon Corp. Licensed under the terms of the InMon sFlow licence: */
+/* http://www.inmon.com/technology/sflowlicense.txt */
+
+#include "sflow_api.h"
+
+static void * sflAlloc(SFLAgent *agent, size_t bytes);
+static void sflFree(SFLAgent *agent, void *obj);
+static void sfl_agent_jumpTableAdd(SFLAgent *agent, SFLSampler *sampler);
+static void sfl_agent_jumpTableRemove(SFLAgent *agent, SFLSampler *sampler);
+
+/*________________--------------------------__________________
+ ________________ sfl_agent_init __________________
+ ----------------__________________________------------------
+*/
+
+void sfl_agent_init(SFLAgent *agent,
+ SFLAddress *myIP, /* IP address of this agent in net byte order */
+ u_int32_t subId, /* agent_sub_id */
+ time_t bootTime, /* agent boot time */
+ time_t now, /* time now */
+ void *magic, /* ptr to pass back in logging and alloc fns */
+ allocFn_t allocFn,
+ freeFn_t freeFn,
+ errorFn_t errorFn,
+ sendFn_t sendFn)
+{
+ /* first clear everything */
+ memset(agent, 0, sizeof(*agent));
+ /* now copy in the parameters */
+ agent->myIP = *myIP; /* structure copy */
+ agent->subId = subId;
+ agent->bootTime = bootTime;
+ agent->now = now;
+ agent->magic = magic;
+ agent->allocFn = allocFn;
+ agent->freeFn = freeFn;
+ agent->errorFn = errorFn;
+ agent->sendFn = sendFn;
+
+#ifdef SFLOW_DO_SOCKET
+ if(sendFn == NULL) {
+ /* open the socket - really need one for v4 and another for v6? */
+ if((agent->receiverSocket4 = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1)
+ sfl_agent_sysError(agent, "agent", "IPv4 socket open failed");
+ if((agent->receiverSocket6 = socket(AF_INET6, SOCK_DGRAM, IPPROTO_UDP)) == -1)
+ sfl_agent_sysError(agent, "agent", "IPv6 socket open failed");
+ }
+#endif
+}
+
+/*_________________---------------------------__________________
+ _________________ sfl_agent_release __________________
+ -----------------___________________________------------------
+*/
+
+void sfl_agent_release(SFLAgent *agent)
+{
+ /* release and free the samplers, pollers and receivers */
+ SFLSampler *sm = agent->samplers;
+ SFLPoller *pl = agent->pollers;
+ SFLReceiver *rcv = agent->receivers;
+
+ for(; sm != NULL; ) {
+ SFLSampler *nextSm = sm->nxt;
+ sflFree(agent, sm);
+ sm = nextSm;
+ }
+ agent->samplers = NULL;
+
+ for(; pl != NULL; ) {
+ SFLPoller *nextPl = pl->nxt;
+ sflFree(agent, pl);
+ pl = nextPl;
+ }
+ agent->pollers = NULL;
+
+ for(; rcv != NULL; ) {
+ SFLReceiver *nextRcv = rcv->nxt;
+ sflFree(agent, rcv);
+ rcv = nextRcv;
+ }
+ agent->receivers = NULL;
+
+#ifdef SFLOW_DO_SOCKET
+ /* close the sockets */
+ if(agent->receiverSocket4 > 0) close(agent->receiverSocket4);
+ if(agent->receiverSocket6 > 0) close(agent->receiverSocket6);
+#endif
+}
+
+
+/*_________________---------------------------__________________
+ _________________ sfl_agent_set_* __________________
+ -----------------___________________________------------------
+*/
+
+void sfl_agent_set_agentAddress(SFLAgent *agent, SFLAddress *addr)
+{
+ if(addr && memcmp(addr, &agent->myIP, sizeof(agent->myIP)) != 0) {
+ /* change of address */
+ agent->myIP = *addr; /* structure copy */
+ /* reset sequence numbers here? */
+ }
+}
+
+void sfl_agent_set_agentSubId(SFLAgent *agent, u_int32_t subId)
+{
+ if(subId != agent->subId) {
+ /* change of subId */
+ agent->subId = subId;
+ /* reset sequence numbers here? */
+ }
+}
+
+/*_________________---------------------------__________________
+ _________________ sfl_agent_tick __________________
+ -----------------___________________________------------------
+*/
+
+void sfl_agent_tick(SFLAgent *agent, time_t now)
+{
+ SFLReceiver *rcv = agent->receivers;
+ SFLSampler *sm = agent->samplers;
+ SFLPoller *pl = agent->pollers;
+ agent->now = now;
+ /* receivers use ticks to flush send data */
+ for(; rcv != NULL; rcv = rcv->nxt) sfl_receiver_tick(rcv, now);
+ /* samplers use ticks to decide when they are sampling too fast */
+ for(; sm != NULL; sm = sm->nxt) sfl_sampler_tick(sm, now);
+ /* pollers use ticks to decide when to ask for counters */
+ for(; pl != NULL; pl = pl->nxt) sfl_poller_tick(pl, now);
+}
+
+/*_________________---------------------------__________________
+ _________________ sfl_agent_addReceiver __________________
+ -----------------___________________________------------------
+*/
+
+SFLReceiver *sfl_agent_addReceiver(SFLAgent *agent)
+{
+ SFLReceiver *rcv = (SFLReceiver *)sflAlloc(agent, sizeof(SFLReceiver));
+ sfl_receiver_init(rcv, agent);
+ /* add to end of list - to preserve the receiver index numbers for existing receivers */
+ {
+ SFLReceiver *r, *prev = NULL;
+ for(r = agent->receivers; r != NULL; prev = r, r = r->nxt);
+ if(prev) prev->nxt = rcv;
+ else agent->receivers = rcv;
+ rcv->nxt = NULL;
+ }
+ return rcv;
+}
+
+/*_________________---------------------------__________________
+ _________________ sfl_dsi_compare __________________
+ -----------------___________________________------------------
+
+ Note that if there is a mixture of ds_classes for this agent, then
+ the simple numeric comparison may not be correct - the sort order (for
+ the purposes of the SNMP MIB) should really be determined by the OID
+ that these numeric ds_class numbers are a shorthand for. For example,
+ ds_class == 0 means ifIndex, which is the oid "1.3.6.1.2.1.2.2.1"
+*/
+
+static inline int sfl_dsi_compare(SFLDataSource_instance *pdsi1, SFLDataSource_instance *pdsi2) {
+ /* could have used just memcmp(), but not sure if that would
+ give the right answer on little-endian platforms. Safer to be explicit... */
+ int cmp = pdsi2->ds_class - pdsi1->ds_class;
+ if(cmp == 0) cmp = pdsi2->ds_index - pdsi1->ds_index;
+ if(cmp == 0) cmp = pdsi2->ds_instance - pdsi1->ds_instance;
+ return cmp;
+}
+
+/*_________________---------------------------__________________
+ _________________ sfl_agent_addSampler __________________
+ -----------------___________________________------------------
+*/
+
+SFLSampler *sfl_agent_addSampler(SFLAgent *agent, SFLDataSource_instance *pdsi)
+{
+ /* Keep the list sorted. */
+ SFLSampler *prev = NULL, *sm = agent->samplers;
+ for(; sm != NULL; prev = sm, sm = sm->nxt) {
+ int64_t cmp = sfl_dsi_compare(pdsi, &sm->dsi);
+ if(cmp == 0) return sm; /* found - return existing one */
+ if(cmp < 0) break; /* insert here */
+ }
+ /* either we found the insert point, or reached the end of the list...*/
+
+ {
+ SFLSampler *newsm = (SFLSampler *)sflAlloc(agent, sizeof(SFLSampler));
+ sfl_sampler_init(newsm, agent, pdsi);
+ if(prev) prev->nxt = newsm;
+ else agent->samplers = newsm;
+ newsm->nxt = sm;
+
+ /* see if we should go in the ifIndex jumpTable */
+ if(SFL_DS_CLASS(newsm->dsi) == 0) {
+ SFLSampler *test = sfl_agent_getSamplerByIfIndex(agent, SFL_DS_INDEX(newsm->dsi));
+ if(test && (SFL_DS_INSTANCE(newsm->dsi) < SFL_DS_INSTANCE(test->dsi))) {
+ /* replace with this new one because it has a lower ds_instance number */
+ sfl_agent_jumpTableRemove(agent, test);
+ test = NULL;
+ }
+ if(test == NULL) sfl_agent_jumpTableAdd(agent, newsm);
+ }
+ return newsm;
+ }
+}
+
+/*_________________---------------------------__________________
+ _________________ sfl_agent_addPoller __________________
+ -----------------___________________________------------------
+*/
+
+SFLPoller *sfl_agent_addPoller(SFLAgent *agent,
+ SFLDataSource_instance *pdsi,
+ void *magic, /* ptr to pass back in getCountersFn() */
+ getCountersFn_t getCountersFn)
+{
+ /* keep the list sorted */
+ SFLPoller *prev = NULL, *pl = agent->pollers;
+ for(; pl != NULL; prev = pl, pl = pl->nxt) {
+ int64_t cmp = sfl_dsi_compare(pdsi, &pl->dsi);
+ if(cmp == 0) return pl; /* found - return existing one */
+ if(cmp < 0) break; /* insert here */
+ }
+ /* either we found the insert point, or reached the end of the list... */
+ {
+ SFLPoller *newpl = (SFLPoller *)sflAlloc(agent, sizeof(SFLPoller));
+ sfl_poller_init(newpl, agent, pdsi, magic, getCountersFn);
+ if(prev) prev->nxt = newpl;
+ else agent->pollers = newpl;
+ newpl->nxt = pl;
+ return newpl;
+ }
+}
+
+/*_________________---------------------------__________________
+ _________________ sfl_agent_removeSampler __________________
+ -----------------___________________________------------------
+*/
+
+int sfl_agent_removeSampler(SFLAgent *agent, SFLDataSource_instance *pdsi)
+{
+ /* find it, unlink it and free it */
+ SFLSampler *prev = NULL, *sm = agent->samplers;
+ for(; sm != NULL; prev = sm, sm = sm->nxt) {
+ if(sfl_dsi_compare(pdsi, &sm->dsi) == 0) {
+ if(prev == NULL) agent->samplers = sm->nxt;
+ else prev->nxt = sm->nxt;
+ sfl_agent_jumpTableRemove(agent, sm);
+ sflFree(agent, sm);
+ return 1;
+ }
+ }
+ /* not found */
+ return 0;
+}
+
+/*_________________---------------------------__________________
+ _________________ sfl_agent_removePoller __________________
+ -----------------___________________________------------------
+*/
+
+int sfl_agent_removePoller(SFLAgent *agent, SFLDataSource_instance *pdsi)
+{
+ /* find it, unlink it and free it */
+ SFLPoller *prev = NULL, *pl = agent->pollers;
+ for(; pl != NULL; prev = pl, pl = pl->nxt) {
+ if(sfl_dsi_compare(pdsi, &pl->dsi) == 0) {
+ if(prev == NULL) agent->pollers = pl->nxt;
+ else prev->nxt = pl->nxt;
+ sflFree(agent, pl);
+ return 1;
+ }
+ }
+ /* not found */
+ return 0;
+}
+
+/*_________________--------------------------------__________________
+ _________________ sfl_agent_jumpTableAdd __________________
+ -----------------________________________________------------------
+*/
+
+static void sfl_agent_jumpTableAdd(SFLAgent *agent, SFLSampler *sampler)
+{
+ u_int32_t hashIndex = SFL_DS_INDEX(sampler->dsi) % SFL_HASHTABLE_SIZ;
+ sampler->hash_nxt = agent->jumpTable[hashIndex];
+ agent->jumpTable[hashIndex] = sampler;
+}
+
+/*_________________--------------------------------__________________
+ _________________ sfl_agent_jumpTableRemove __________________
+ -----------------________________________________------------------
+*/
+
+static void sfl_agent_jumpTableRemove(SFLAgent *agent, SFLSampler *sampler)
+{
+ u_int32_t hashIndex = SFL_DS_INDEX(sampler->dsi) % SFL_HASHTABLE_SIZ;
+ SFLSampler *search = agent->jumpTable[hashIndex], *prev = NULL;
+ for( ; search != NULL; prev = search, search = search->hash_nxt) if(search == sampler) break;
+ if(search) {
+ // found - unlink
+ if(prev) prev->hash_nxt = search->hash_nxt;
+ else agent->jumpTable[hashIndex] = search->hash_nxt;
+ search->hash_nxt = NULL;
+ }
+}
+
+/*_________________--------------------------------__________________
+ _________________ sfl_agent_getSamplerByIfIndex __________________
+ -----------------________________________________------------------
+ fast lookup (pointers cached in hash table). If there are multiple
+ sampler instances for a given ifIndex, then this fn will return
+ the one with the lowest instance number. Since the samplers
+ list is sorted, this means the other instances will be accesible
+ by following the sampler->nxt pointer (until the ds_class
+ or ds_index changes). This is helpful if you need to offer
+ the same flowSample to multiple samplers.
+*/
+
+SFLSampler *sfl_agent_getSamplerByIfIndex(SFLAgent *agent, u_int32_t ifIndex)
+{
+ SFLSampler *search = agent->jumpTable[ifIndex % SFL_HASHTABLE_SIZ];
+ for( ; search != NULL; search = search->hash_nxt) if(SFL_DS_INDEX(search->dsi) == ifIndex) break;
+ return search;
+}
+
+/*_________________---------------------------__________________
+ _________________ sfl_agent_getSampler __________________
+ -----------------___________________________------------------
+*/
+
+SFLSampler *sfl_agent_getSampler(SFLAgent *agent, SFLDataSource_instance *pdsi)
+{
+ /* find it and return it */
+ SFLSampler *sm = agent->samplers;
+ for(; sm != NULL; sm = sm->nxt)
+ if(sfl_dsi_compare(pdsi, &sm->dsi) == 0) return sm;
+ /* not found */
+ return NULL;
+}
+
+/*_________________---------------------------__________________
+ _________________ sfl_agent_getPoller __________________
+ -----------------___________________________------------------
+*/
+
+SFLPoller *sfl_agent_getPoller(SFLAgent *agent, SFLDataSource_instance *pdsi)
+{
+ /* find it and return it */
+ SFLPoller *pl = agent->pollers;
+ for(; pl != NULL; pl = pl->nxt)
+ if(sfl_dsi_compare(pdsi, &pl->dsi) == 0) return pl;
+ /* not found */
+ return NULL;
+}
+
+/*_________________---------------------------__________________
+ _________________ sfl_agent_getReceiver __________________
+ -----------------___________________________------------------
+*/
+
+SFLReceiver *sfl_agent_getReceiver(SFLAgent *agent, u_int32_t receiverIndex)
+{
+ u_int32_t rcvIdx = 0;
+ SFLReceiver *rcv = agent->receivers;
+ for(; rcv != NULL; rcv = rcv->nxt)
+ if(receiverIndex == ++rcvIdx) return rcv;
+
+ /* not found - ran off the end of the table */
+ return NULL;
+}
+
+/*_________________---------------------------__________________
+ _________________ sfl_agent_getNextSampler __________________
+ -----------------___________________________------------------
+*/
+
+SFLSampler *sfl_agent_getNextSampler(SFLAgent *agent, SFLDataSource_instance *pdsi)
+{
+ /* return the one lexograpically just after it - assume they are sorted
+ correctly according to the lexographical ordering of the object ids */
+ SFLSampler *sm = sfl_agent_getSampler(agent, pdsi);
+ return sm ? sm->nxt : NULL;
+}
+
+/*_________________---------------------------__________________
+ _________________ sfl_agent_getNextPoller __________________
+ -----------------___________________________------------------
+*/
+
+SFLPoller *sfl_agent_getNextPoller(SFLAgent *agent, SFLDataSource_instance *pdsi)
+{
+ /* return the one lexograpically just after it - assume they are sorted
+ correctly according to the lexographical ordering of the object ids */
+ SFLPoller *pl = sfl_agent_getPoller(agent, pdsi);
+ return pl ? pl->nxt : NULL;
+}
+
+/*_________________---------------------------__________________
+ _________________ sfl_agent_getNextReceiver __________________
+ -----------------___________________________------------------
+*/
+
+SFLReceiver *sfl_agent_getNextReceiver(SFLAgent *agent, u_int32_t receiverIndex)
+{
+ return sfl_agent_getReceiver(agent, receiverIndex + 1);
+}
+
+
+/*_________________---------------------------__________________
+ _________________ sfl_agent_resetReceiver __________________
+ -----------------___________________________------------------
+*/
+
+void sfl_agent_resetReceiver(SFLAgent *agent, SFLReceiver *receiver)
+{
+ /* tell samplers and pollers to stop sending to this receiver */
+ /* first get his receiverIndex */
+ u_int32_t rcvIdx = 0;
+ SFLReceiver *rcv = agent->receivers;
+ for(; rcv != NULL; rcv = rcv->nxt) {
+ rcvIdx++; /* thanks to Diego Valverde for pointing out this bugfix */
+ if(rcv == receiver) {
+ /* now tell anyone that is using it to stop */
+ SFLSampler *sm = agent->samplers;
+ SFLPoller *pl = agent->pollers;
+
+ for(; sm != NULL; sm = sm->nxt)
+ if(sfl_sampler_get_sFlowFsReceiver(sm) == rcvIdx) sfl_sampler_set_sFlowFsReceiver(sm, 0);
+
+ for(; pl != NULL; pl = pl->nxt)
+ if(sfl_poller_get_sFlowCpReceiver(pl) == rcvIdx) sfl_poller_set_sFlowCpReceiver(pl, 0);
+
+ break;
+ }
+ }
+}
+
+/*_________________---------------------------__________________
+ _________________ sfl_agent_error __________________
+ -----------------___________________________------------------
+*/
+#define MAX_ERRMSG_LEN 1000
+
+void sfl_agent_error(SFLAgent *agent, char *modName, char *msg)
+{
+ char errm[MAX_ERRMSG_LEN];
+ sprintf(errm, "sfl_agent_error: %s: %s\n", modName, msg);
+ if(agent->errorFn) (*agent->errorFn)(agent->magic, agent, errm);
+ else {
+ fprintf(stderr, "%s\n", errm);
+ fflush(stderr);
+ }
+}
+
+/*_________________---------------------------__________________
+ _________________ sfl_agent_sysError __________________
+ -----------------___________________________------------------
+*/
+
+void sfl_agent_sysError(SFLAgent *agent, char *modName, char *msg)
+{
+ char errm[MAX_ERRMSG_LEN];
+ sprintf(errm, "sfl_agent_sysError: %s: %s (errno = %d - %s)\n", modName, msg, errno, strerror(errno));
+ if(agent->errorFn) (*agent->errorFn)(agent->magic, agent, errm);
+ else {
+ fprintf(stderr, "%s\n", errm);
+ fflush(stderr);
+ }
+}
+
+
+/*_________________---------------------------__________________
+ _________________ alloc and free __________________
+ -----------------___________________________------------------
+*/
+
+static void * sflAlloc(SFLAgent *agent, size_t bytes)
+{
+ if(agent->allocFn) return (*agent->allocFn)(agent->magic, agent, bytes);
+ else return SFL_ALLOC(bytes);
+}
+
+static void sflFree(SFLAgent *agent, void *obj)
+{
+ if(agent->freeFn) (*agent->freeFn)(agent->magic, agent, obj);
+ else SFL_FREE(obj);
+}
diff --git a/lib/sflow_api.h b/lib/sflow_api.h
new file mode 100644
index 00000000..be8d9977
--- /dev/null
+++ b/lib/sflow_api.h
@@ -0,0 +1,340 @@
+/* Copyright (c) 2002-2009 InMon Corp. Licensed under the terms of the InMon sFlow licence: */
+/* http://www.inmon.com/technology/sflowlicense.txt */
+
+#ifndef SFLOW_API_H
+#define SFLOW_API_H 1
+
+/* define SFLOW_DO_SOCKET to 1 if you want the agent
+ to send the packets itself, otherwise set the sendFn
+ callback in sfl_agent_init.*/
+/* #define SFLOW_DO_SOCKET */
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <errno.h>
+#include <string.h>
+#include <sys/types.h>
+#include <arpa/inet.h> /* for htonl */
+
+#ifdef SFLOW_DO_SOCKET
+#include <sys/socket.h>
+#include <netinet/in_systm.h>
+#include <netinet/in.h>
+#include <netinet/ip.h>
+#endif
+
+#include "sflow.h"
+
+/* define SFLOW_SOFTWARE_SAMPLING to 1 if you need to use the
+ sfl_sampler_takeSample routine and give it every packet */
+/* #define SFLOW_SOFTWARE_SAMPLING */
+
+/*
+ uncomment this preprocessor flag (or compile with -DSFL_USE_32BIT_INDEX)
+ if your ds_index numbers can ever be >= 2^30-1 (i.e. >= 0x3FFFFFFF)
+*/
+/* #define SFL_USE_32BIT_INDEX */
+
+
+/* Used to combine ds_class, ds_index and instance into
+ a single 64-bit number like this:
+ __________________________________
+ | cls| index | instance |
+ ----------------------------------
+
+ but now is opened up to a 12-byte struct to ensure
+ that ds_index has a full 32-bit field, and to make
+ accessing the components simpler. The macros have
+ the same behavior as before, so this change should
+ be transparent. The only difference is that these
+ objects are now passed around by reference instead
+ of by value, and the comparison is done using a fn.
+*/
+
+typedef struct _SFLDataSource_instance {
+ u_int32_t ds_class;
+ u_int32_t ds_index;
+ u_int32_t ds_instance;
+} SFLDataSource_instance;
+
+#ifdef SFL_USE_32BIT_INDEX
+#define SFL_FLOW_SAMPLE_TYPE SFLFlow_sample_expanded
+#define SFL_COUNTERS_SAMPLE_TYPE SFLCounters_sample_expanded
+#else
+#define SFL_FLOW_SAMPLE_TYPE SFLFlow_sample
+#define SFL_COUNTERS_SAMPLE_TYPE SFLCounters_sample
+/* if index numbers are not going to use all 32 bits, then we can use
+ the more compact encoding, with the dataSource class and index merged */
+#define SFL_DS_DATASOURCE(dsi) (((dsi).ds_class << 24) + (dsi).ds_index)
+#endif
+
+#define SFL_DS_INSTANCE(dsi) (dsi).ds_instance
+#define SFL_DS_CLASS(dsi) (dsi).ds_class
+#define SFL_DS_INDEX(dsi) (dsi).ds_index
+#define SFL_DS_SET(dsi,clss,indx,inst) \
+ do { \
+ (dsi).ds_class = (clss); \
+ (dsi).ds_index = (indx); \
+ (dsi).ds_instance = (inst); \
+ } while(0)
+
+typedef struct _SFLSampleCollector {
+ u_int32_t data[(SFL_MAX_DATAGRAM_SIZE + SFL_DATA_PAD) / sizeof(u_int32_t)];
+ u_int32_t *datap; /* packet fill pointer */
+ u_int32_t pktlen; /* accumulated size */
+ u_int32_t packetSeqNo;
+ u_int32_t numSamples;
+} SFLSampleCollector;
+
+struct _SFLAgent; /* forward decl */
+
+typedef struct _SFLReceiver {
+ struct _SFLReceiver *nxt;
+ /* MIB fields */
+ char *sFlowRcvrOwner;
+ time_t sFlowRcvrTimeout;
+ u_int32_t sFlowRcvrMaximumDatagramSize;
+ SFLAddress sFlowRcvrAddress;
+ u_int32_t sFlowRcvrPort;
+ u_int32_t sFlowRcvrDatagramVersion;
+ /* public fields */
+ struct _SFLAgent *agent; /* pointer to my agent */
+ /* private fields */
+ SFLSampleCollector sampleCollector;
+#ifdef SFLOW_DO_SOCKET
+ struct sockaddr_in receiver4;
+ struct sockaddr_in6 receiver6;
+#endif
+} SFLReceiver;
+
+typedef struct _SFLSampler {
+ /* for linked list */
+ struct _SFLSampler *nxt;
+ /* for hash lookup table */
+ struct _SFLSampler *hash_nxt;
+ /* MIB fields */
+ SFLDataSource_instance dsi;
+ u_int32_t sFlowFsReceiver;
+ u_int32_t sFlowFsPacketSamplingRate;
+ u_int32_t sFlowFsMaximumHeaderSize;
+ /* public fields */
+ struct _SFLAgent *agent; /* pointer to my agent */
+ /* private fields */
+ SFLReceiver *myReceiver;
+ u_int32_t skip;
+ u_int32_t samplePool;
+ u_int32_t flowSampleSeqNo;
+ /* rate checking */
+ u_int32_t samplesThisTick;
+ u_int32_t samplesLastTick;
+ u_int32_t backoffThreshold;
+} SFLSampler;
+
+/* declare */
+struct _SFLPoller;
+
+typedef void (*getCountersFn_t)(void *magic, /* callback to get counters */
+ struct _SFLPoller *sampler, /* called with self */
+ SFL_COUNTERS_SAMPLE_TYPE *cs); /* struct to fill in */
+
+typedef struct _SFLPoller {
+ /* for linked list */
+ struct _SFLPoller *nxt;
+ /* MIB fields */
+ SFLDataSource_instance dsi;
+ u_int32_t sFlowCpReceiver;
+ time_t sFlowCpInterval;
+ /* public fields */
+ struct _SFLAgent *agent; /* pointer to my agent */
+ void *magic; /* ptr to pass back in getCountersFn() */
+ getCountersFn_t getCountersFn;
+ u_int32_t bridgePort; /* port number local to bridge */
+ /* private fields */
+ SFLReceiver *myReceiver;
+ time_t countersCountdown;
+ u_int32_t countersSampleSeqNo;
+} SFLPoller;
+
+typedef void *(*allocFn_t)(void *magic, /* callback to allocate space on heap */
+ struct _SFLAgent *agent, /* called with self */
+ size_t bytes); /* bytes requested */
+
+typedef int (*freeFn_t)(void *magic, /* callback to free space on heap */
+ struct _SFLAgent *agent, /* called with self */
+ void *obj); /* obj to free */
+
+typedef void (*errorFn_t)(void *magic, /* callback to log error message */
+ struct _SFLAgent *agent, /* called with self */
+ char *msg); /* error message */
+
+typedef void (*sendFn_t)(void *magic, /* optional override fn to send packet */
+ struct _SFLAgent *agent,
+ SFLReceiver *receiver,
+ u_char *pkt,
+ u_int32_t pktLen);
+
+
+/* prime numbers are good for hash tables */
+#define SFL_HASHTABLE_SIZ 199
+
+typedef struct _SFLAgent {
+ SFLSampler *jumpTable[SFL_HASHTABLE_SIZ]; /* fast lookup table for samplers (by ifIndex) */
+ SFLSampler *samplers; /* the list of samplers */
+ SFLPoller *pollers; /* the list of samplers */
+ SFLReceiver *receivers; /* the array of receivers */
+ time_t bootTime; /* time when we booted or started */
+ time_t now; /* time now */
+ SFLAddress myIP; /* IP address of this node */
+ u_int32_t subId; /* sub_agent_id */
+ void *magic; /* ptr to pass back in logging and alloc fns */
+ allocFn_t allocFn;
+ freeFn_t freeFn;
+ errorFn_t errorFn;
+ sendFn_t sendFn;
+#ifdef SFLOW_DO_SOCKET
+ int receiverSocket4;
+ int receiverSocket6;
+#endif
+} SFLAgent;
+
+/* call this at the start with a newly created agent */
+void sfl_agent_init(SFLAgent *agent,
+ SFLAddress *myIP, /* IP address of this agent */
+ u_int32_t subId, /* agent_sub_id */
+ time_t bootTime, /* agent boot time */
+ time_t now, /* time now */
+ void *magic, /* ptr to pass back in logging and alloc fns */
+ allocFn_t allocFn,
+ freeFn_t freeFn,
+ errorFn_t errorFn,
+ sendFn_t sendFn);
+
+/* call this to create samplers */
+SFLSampler *sfl_agent_addSampler(SFLAgent *agent, SFLDataSource_instance *pdsi);
+
+/* call this to create pollers */
+SFLPoller *sfl_agent_addPoller(SFLAgent *agent,
+ SFLDataSource_instance *pdsi,
+ void *magic, /* ptr to pass back in getCountersFn() */
+ getCountersFn_t getCountersFn);
+
+/* call this to create receivers */
+SFLReceiver *sfl_agent_addReceiver(SFLAgent *agent);
+
+/* call this to remove samplers */
+int sfl_agent_removeSampler(SFLAgent *agent, SFLDataSource_instance *pdsi);
+
+/* call this to remove pollers */
+int sfl_agent_removePoller(SFLAgent *agent, SFLDataSource_instance *pdsi);
+
+/* note: receivers should not be removed. Typically the receivers
+ list will be created at init time and never changed */
+
+/* call these fns to retrieve sampler, poller or receiver (e.g. for SNMP GET or GETNEXT operation) */
+SFLSampler *sfl_agent_getSampler(SFLAgent *agent, SFLDataSource_instance *pdsi);
+SFLSampler *sfl_agent_getNextSampler(SFLAgent *agent, SFLDataSource_instance *pdsi);
+SFLPoller *sfl_agent_getPoller(SFLAgent *agent, SFLDataSource_instance *pdsi);
+SFLPoller *sfl_agent_getNextPoller(SFLAgent *agent, SFLDataSource_instance *pdsi);
+SFLReceiver *sfl_agent_getReceiver(SFLAgent *agent, u_int32_t receiverIndex);
+SFLReceiver *sfl_agent_getNextReceiver(SFLAgent *agent, u_int32_t receiverIndex);
+
+/* jump table access - for performance */
+SFLSampler *sfl_agent_getSamplerByIfIndex(SFLAgent *agent, u_int32_t ifIndex);
+
+/* call these functions to GET and SET MIB values */
+
+/* receiver */
+char * sfl_receiver_get_sFlowRcvrOwner(SFLReceiver *receiver);
+void sfl_receiver_set_sFlowRcvrOwner(SFLReceiver *receiver, char *sFlowRcvrOwner);
+time_t sfl_receiver_get_sFlowRcvrTimeout(SFLReceiver *receiver);
+void sfl_receiver_set_sFlowRcvrTimeout(SFLReceiver *receiver, time_t sFlowRcvrTimeout);
+u_int32_t sfl_receiver_get_sFlowRcvrMaximumDatagramSize(SFLReceiver *receiver);
+void sfl_receiver_set_sFlowRcvrMaximumDatagramSize(SFLReceiver *receiver, u_int32_t sFlowRcvrMaximumDatagramSize);
+SFLAddress *sfl_receiver_get_sFlowRcvrAddress(SFLReceiver *receiver);
+void sfl_receiver_set_sFlowRcvrAddress(SFLReceiver *receiver, SFLAddress *sFlowRcvrAddress);
+u_int32_t sfl_receiver_get_sFlowRcvrPort(SFLReceiver *receiver);
+void sfl_receiver_set_sFlowRcvrPort(SFLReceiver *receiver, u_int32_t sFlowRcvrPort);
+/* sampler */
+u_int32_t sfl_sampler_get_sFlowFsReceiver(SFLSampler *sampler);
+void sfl_sampler_set_sFlowFsReceiver(SFLSampler *sampler, u_int32_t sFlowFsReceiver);
+u_int32_t sfl_sampler_get_sFlowFsPacketSamplingRate(SFLSampler *sampler);
+void sfl_sampler_set_sFlowFsPacketSamplingRate(SFLSampler *sampler, u_int32_t sFlowFsPacketSamplingRate);
+u_int32_t sfl_sampler_get_sFlowFsMaximumHeaderSize(SFLSampler *sampler);
+void sfl_sampler_set_sFlowFsMaximumHeaderSize(SFLSampler *sampler, u_int32_t sFlowFsMaximumHeaderSize);
+u_int32_t sfl_sampler_get_samplesLastTick(SFLSampler *sampler);
+/* poller */
+u_int32_t sfl_poller_get_sFlowCpReceiver(SFLPoller *poller);
+void sfl_poller_set_sFlowCpReceiver(SFLPoller *poller, u_int32_t sFlowCpReceiver);
+u_int32_t sfl_poller_get_sFlowCpInterval(SFLPoller *poller);
+void sfl_poller_set_sFlowCpInterval(SFLPoller *poller, u_int32_t sFlowCpInterval);
+
+/* fns to set the sflow agent address or sub-id */
+void sfl_agent_set_agentAddress(SFLAgent *agent, SFLAddress *addr);
+void sfl_agent_set_agentSubId(SFLAgent *agent, u_int32_t subId);
+
+/* The poller may need a separate number to reference the local bridge port
+ to get counters if it is not the same as the global ifIndex */
+void sfl_poller_set_bridgePort(SFLPoller *poller, u_int32_t port_no);
+u_int32_t sfl_poller_get_bridgePort(SFLPoller *poller);
+
+/* call this to indicate a discontinuity with a counter like samplePool so that the
+ sflow collector will ignore the next delta */
+void sfl_sampler_resetFlowSeqNo(SFLSampler *sampler);
+
+/* call this to indicate a discontinuity with one or more of the counters so that the
+ sflow collector will ignore the next delta */
+void sfl_poller_resetCountersSeqNo(SFLPoller *poller);
+
+#ifdef SFLOW_SOFTWARE_SAMLING
+/* software sampling: call this with every packet - returns non-zero if the packet
+ should be sampled (in which case you then call sfl_sampler_writeFlowSample()) */
+int sfl_sampler_takeSample(SFLSampler *sampler);
+#endif
+
+/* call this to set a maximum samples-per-second threshold. If the sampler reaches this
+ threshold it will automatically back off the sampling rate. A value of 0 disables the
+ mechanism */
+void sfl_sampler_set_backoffThreshold(SFLSampler *sampler, u_int32_t samplesPerSecond);
+u_int32_t sfl_sampler_get_backoffThreshold(SFLSampler *sampler);
+
+/* call this once per second (N.B. not on interrupt stack i.e. not hard real-time) */
+void sfl_agent_tick(SFLAgent *agent, time_t now);
+
+/* call this with each flow sample */
+void sfl_sampler_writeFlowSample(SFLSampler *sampler, SFL_FLOW_SAMPLE_TYPE *fs);
+
+/* call this to push counters samples (usually done in the getCountersFn callback) */
+void sfl_poller_writeCountersSample(SFLPoller *poller, SFL_COUNTERS_SAMPLE_TYPE *cs);
+
+/* call this to deallocate resources */
+void sfl_agent_release(SFLAgent *agent);
+
+
+/* internal fns */
+
+void sfl_receiver_init(SFLReceiver *receiver, SFLAgent *agent);
+void sfl_sampler_init(SFLSampler *sampler, SFLAgent *agent, SFLDataSource_instance *pdsi);
+void sfl_poller_init(SFLPoller *poller, SFLAgent *agent, SFLDataSource_instance *pdsi, void *magic, getCountersFn_t getCountersFn);
+
+
+void sfl_receiver_tick(SFLReceiver *receiver, time_t now);
+void sfl_poller_tick(SFLPoller *poller, time_t now);
+void sfl_sampler_tick(SFLSampler *sampler, time_t now);
+
+int sfl_receiver_writeFlowSample(SFLReceiver *receiver, SFL_FLOW_SAMPLE_TYPE *fs);
+int sfl_receiver_writeCountersSample(SFLReceiver *receiver, SFL_COUNTERS_SAMPLE_TYPE *cs);
+
+void sfl_agent_resetReceiver(SFLAgent *agent, SFLReceiver *receiver);
+
+void sfl_agent_error(SFLAgent *agent, char *modName, char *msg);
+void sfl_agent_sysError(SFLAgent *agent, char *modName, char *msg);
+
+u_int32_t sfl_receiver_samplePacketsSent(SFLReceiver *receiver);
+
+#define SFL_ALLOC malloc
+#define SFL_FREE free
+
+#endif /* SFLOW_API_H */
+
+
diff --git a/lib/sflow_poller.c b/lib/sflow_poller.c
new file mode 100644
index 00000000..ffd09d3c
--- /dev/null
+++ b/lib/sflow_poller.c
@@ -0,0 +1,142 @@
+/* Copyright (c) 2002-2009 InMon Corp. Licensed under the terms of the InMon sFlow licence: */
+/* http://www.inmon.com/technology/sflowlicense.txt */
+
+#include "sflow_api.h"
+
+/*_________________--------------------------__________________
+ _________________ sfl_poller_init __________________
+ -----------------__________________________------------------
+*/
+
+void sfl_poller_init(SFLPoller *poller,
+ SFLAgent *agent,
+ SFLDataSource_instance *pdsi,
+ void *magic, /* ptr to pass back in getCountersFn() */
+ getCountersFn_t getCountersFn)
+{
+ /* copy the dsi in case it points to poller->dsi, which we are about to clear */
+ SFLDataSource_instance dsi = *pdsi;
+
+ /* preserve the *nxt pointer too, in case we are resetting this poller and it is
+ already part of the agent's linked list (thanks to Matt Woodly for pointing this out) */
+ SFLPoller *nxtPtr = poller->nxt;
+
+ /* clear everything */
+ memset(poller, 0, sizeof(*poller));
+
+ /* restore the linked list ptr */
+ poller->nxt = nxtPtr;
+
+ /* now copy in the parameters */
+ poller->agent = agent;
+ poller->dsi = dsi; /* structure copy */
+ poller->magic = magic;
+ poller->getCountersFn = getCountersFn;
+}
+
+/*_________________--------------------------__________________
+ _________________ reset __________________
+ -----------------__________________________------------------
+*/
+
+static void reset(SFLPoller *poller)
+{
+ SFLDataSource_instance dsi = poller->dsi;
+ sfl_poller_init(poller, poller->agent, &dsi, poller->magic, poller->getCountersFn);
+}
+
+/*_________________---------------------------__________________
+ _________________ MIB access __________________
+ -----------------___________________________------------------
+*/
+u_int32_t sfl_poller_get_sFlowCpReceiver(SFLPoller *poller) {
+ return poller->sFlowCpReceiver;
+}
+
+void sfl_poller_set_sFlowCpReceiver(SFLPoller *poller, u_int32_t sFlowCpReceiver) {
+ poller->sFlowCpReceiver = sFlowCpReceiver;
+ if(sFlowCpReceiver == 0) reset(poller);
+ else {
+ /* retrieve and cache a direct pointer to my receiver */
+ poller->myReceiver = sfl_agent_getReceiver(poller->agent, poller->sFlowCpReceiver);
+ }
+}
+
+u_int32_t sfl_poller_get_sFlowCpInterval(SFLPoller *poller) {
+ return poller->sFlowCpInterval;
+}
+
+void sfl_poller_set_sFlowCpInterval(SFLPoller *poller, u_int32_t sFlowCpInterval) {
+ poller->sFlowCpInterval = sFlowCpInterval;
+ /* Set the countersCountdown to be a randomly selected value between 1 and
+ sFlowCpInterval. That way the counter polling would be desynchronised
+ (on a 200-port switch, polling all the counters in one second could be harmful). */
+ poller->countersCountdown = 1 + (random() % sFlowCpInterval);
+}
+
+/*_________________---------------------------------__________________
+ _________________ bridge port __________________
+ -----------------_________________________________------------------
+ May need a separate number to reference the local bridge port
+ to get counters if it is not the same as the global ifIndex.
+*/
+
+void sfl_poller_set_bridgePort(SFLPoller *poller, u_int32_t port_no) {
+ poller->bridgePort = port_no;
+}
+
+u_int32_t sfl_poller_get_bridgePort(SFLPoller *poller) {
+ return poller->bridgePort;
+}
+
+/*_________________---------------------------------__________________
+ _________________ sequence number reset __________________
+ -----------------_________________________________------------------
+ Used to indicate a counter discontinuity
+ so that the sflow collector will know to ignore the next delta.
+*/
+void sfl_poller_resetCountersSeqNo(SFLPoller *poller) { poller->countersSampleSeqNo = 0; }
+
+/*_________________---------------------------__________________
+ _________________ sfl_poller_tick __________________
+ -----------------___________________________------------------
+*/
+
+void sfl_poller_tick(SFLPoller *poller, time_t now)
+{
+ if(poller->countersCountdown == 0) return; /* counters retrieval was not enabled */
+ if(poller->sFlowCpReceiver == 0) return;
+
+ if(--poller->countersCountdown == 0) {
+ if(poller->getCountersFn != NULL) {
+ /* call out for counters */
+ SFL_COUNTERS_SAMPLE_TYPE cs;
+ memset(&cs, 0, sizeof(cs));
+ poller->getCountersFn(poller->magic, poller, &cs);
+ /* this countersFn is expected to fill in some counter block elements
+ and then call sfl_poller_writeCountersSample(poller, &cs); */
+ }
+ /* reset the countdown */
+ poller->countersCountdown = poller->sFlowCpInterval;
+ }
+}
+
+/*_________________---------------------------------__________________
+ _________________ sfl_poller_writeCountersSample __________________
+ -----------------_________________________________------------------
+*/
+
+void sfl_poller_writeCountersSample(SFLPoller *poller, SFL_COUNTERS_SAMPLE_TYPE *cs)
+{
+ /* fill in the rest of the header fields, and send to the receiver */
+ cs->sequence_number = ++poller->countersSampleSeqNo;
+#ifdef SFL_USE_32BIT_INDEX
+ cs->ds_class = SFL_DS_CLASS(poller->dsi);
+ cs->ds_index = SFL_DS_INDEX(poller->dsi);
+#else
+ cs->source_id = SFL_DS_DATASOURCE(poller->dsi);
+#endif
+ /* sent to my receiver */
+ if(poller->myReceiver) sfl_receiver_writeCountersSample(poller->myReceiver, cs);
+}
+
diff --git a/lib/sflow_receiver.c b/lib/sflow_receiver.c
new file mode 100644
index 00000000..7fccab30
--- /dev/null
+++ b/lib/sflow_receiver.c
@@ -0,0 +1,832 @@
+/* Copyright (c) 2002-2009 InMon Corp. Licensed under the terms of the InMon sFlow licence: */
+/* http://www.inmon.com/technology/sflowlicense.txt */
+
+#include <assert.h>
+#include "sflow_api.h"
+
+static void resetSampleCollector(SFLReceiver *receiver);
+static void sendSample(SFLReceiver *receiver);
+static void sflError(SFLReceiver *receiver, char *errm);
+inline static void putNet32(SFLReceiver *receiver, u_int32_t val);
+inline static void putAddress(SFLReceiver *receiver, SFLAddress *addr);
+#ifdef SFLOW_DO_SOCKET
+static void initSocket(SFLReceiver *receiver);
+#endif
+
+/*_________________--------------------------__________________
+ _________________ sfl_receiver_init __________________
+ -----------------__________________________------------------
+*/
+
+void sfl_receiver_init(SFLReceiver *receiver, SFLAgent *agent)
+{
+ /* first clear everything */
+ memset(receiver, 0, sizeof(*receiver));
+
+ /* now copy in the parameters */
+ receiver->agent = agent;
+
+ /* set defaults */
+ receiver->sFlowRcvrMaximumDatagramSize = SFL_DEFAULT_DATAGRAM_SIZE;
+ receiver->sFlowRcvrPort = SFL_DEFAULT_COLLECTOR_PORT;
+
+#ifdef SFLOW_DO_SOCKET
+ /* initialize the socket address */
+ initSocket(receiver);
+#endif
+
+ /* preset some of the header fields */
+ receiver->sampleCollector.datap = receiver->sampleCollector.data;
+ putNet32(receiver, SFLDATAGRAM_VERSION5);
+ putAddress(receiver, &agent->myIP);
+ putNet32(receiver, agent->subId);
+
+ /* prepare to receive the first sample */
+ resetSampleCollector(receiver);
+}
+
+/*_________________---------------------------__________________
+ _________________ reset __________________
+ -----------------___________________________------------------
+
+ called on timeout, or when owner string is cleared
+*/
+
+static void reset(SFLReceiver *receiver) {
+ // ask agent to tell samplers and pollers to stop sending samples
+ sfl_agent_resetReceiver(receiver->agent, receiver);
+ // reinitialize
+ sfl_receiver_init(receiver, receiver->agent);
+}
+
+#ifdef SFLOW_DO_SOCKET
+/*_________________---------------------------__________________
+ _________________ initSocket __________________
+ -----------------___________________________------------------
+*/
+
+static void initSocket(SFLReceiver *receiver) {
+ if(receiver->sFlowRcvrAddress.type == SFLADDRESSTYPE_IP_V6) {
+ struct sockaddr_in6 *sa6 = &receiver->receiver6;
+ sa6->sin6_port = htons((u_int16_t)receiver->sFlowRcvrPort);
+ sa6->sin6_family = AF_INET6;
+ sa6->sin6_addr = receiver->sFlowRcvrAddress.address.ip_v6;
+ }
+ else {
+ struct sockaddr_in *sa4 = &receiver->receiver4;
+ sa4->sin_port = htons((u_int16_t)receiver->sFlowRcvrPort);
+ sa4->sin_family = AF_INET;
+ sa4->sin_addr = receiver->sFlowRcvrAddress.address.ip_v4;
+ }
+}
+#endif
+
+/*_________________----------------------------------------_____________
+ _________________ MIB Vars _____________
+ -----------------________________________________________-------------
+*/
+
+char * sfl_receiver_get_sFlowRcvrOwner(SFLReceiver *receiver) {
+ return receiver->sFlowRcvrOwner;
+}
+void sfl_receiver_set_sFlowRcvrOwner(SFLReceiver *receiver, char *sFlowRcvrOwner) {
+ receiver->sFlowRcvrOwner = sFlowRcvrOwner;
+ if(sFlowRcvrOwner == NULL || sFlowRcvrOwner[0] == '\0') {
+ // reset condition! owner string was cleared
+ reset(receiver);
+ }
+}
+time_t sfl_receiver_get_sFlowRcvrTimeout(SFLReceiver *receiver) {
+ return receiver->sFlowRcvrTimeout;
+}
+void sfl_receiver_set_sFlowRcvrTimeout(SFLReceiver *receiver, time_t sFlowRcvrTimeout) {
+ receiver->sFlowRcvrTimeout =sFlowRcvrTimeout;
+}
+u_int32_t sfl_receiver_get_sFlowRcvrMaximumDatagramSize(SFLReceiver *receiver) {
+ return receiver->sFlowRcvrMaximumDatagramSize;
+}
+void sfl_receiver_set_sFlowRcvrMaximumDatagramSize(SFLReceiver *receiver, u_int32_t sFlowRcvrMaximumDatagramSize) {
+ u_int32_t mdz = sFlowRcvrMaximumDatagramSize;
+ if(mdz < SFL_MIN_DATAGRAM_SIZE) mdz = SFL_MIN_DATAGRAM_SIZE;
+ receiver->sFlowRcvrMaximumDatagramSize = mdz;
+}
+SFLAddress *sfl_receiver_get_sFlowRcvrAddress(SFLReceiver *receiver) {
+ return &receiver->sFlowRcvrAddress;
+}
+void sfl_receiver_set_sFlowRcvrAddress(SFLReceiver *receiver, SFLAddress *sFlowRcvrAddress) {
+ if(sFlowRcvrAddress) receiver->sFlowRcvrAddress = *sFlowRcvrAddress; // structure copy
+#ifdef SFLOW_DO_SOCKET
+ initSocket(receiver);
+#endif
+}
+u_int32_t sfl_receiver_get_sFlowRcvrPort(SFLReceiver *receiver) {
+ return receiver->sFlowRcvrPort;
+}
+void sfl_receiver_set_sFlowRcvrPort(SFLReceiver *receiver, u_int32_t sFlowRcvrPort) {
+ receiver->sFlowRcvrPort = sFlowRcvrPort;
+ // update the socket structure
+#ifdef SFLOW_DO_SOCKET
+ initSocket(receiver);
+#endif
+}
+
+/*_________________---------------------------__________________
+ _________________ sfl_receiver_tick __________________
+ -----------------___________________________------------------
+*/
+
+void sfl_receiver_tick(SFLReceiver *receiver, time_t now)
+{
+ // if there are any samples to send, flush them now
+ if(receiver->sampleCollector.numSamples > 0) sendSample(receiver);
+ // check the timeout
+ if(receiver->sFlowRcvrTimeout && (u_int32_t)receiver->sFlowRcvrTimeout != 0xFFFFFFFF) {
+ // count down one tick and reset if we reach 0
+ if(--receiver->sFlowRcvrTimeout == 0) reset(receiver);
+ }
+}
+
+/*_________________-----------------------------__________________
+ _________________ receiver write utilities __________________
+ -----------------_____________________________------------------
+*/
+
+inline static void put32(SFLReceiver *receiver, u_int32_t val)
+{
+ *receiver->sampleCollector.datap++ = val;
+}
+
+inline static void putNet32(SFLReceiver *receiver, u_int32_t val)
+{
+ *receiver->sampleCollector.datap++ = htonl(val);
+}
+
+inline static void putNet32_run(SFLReceiver *receiver, void *obj, size_t quads)
+{
+ u_int32_t *from = (u_int32_t *)obj;
+ while(quads--) putNet32(receiver, *from++);
+}
+
+inline static void putNet64(SFLReceiver *receiver, u_int64_t val64)
+{
+ u_int32_t *firstQuadPtr = receiver->sampleCollector.datap;
+ // first copy the bytes in
+ memcpy((u_char *)firstQuadPtr, &val64, 8);
+ if(htonl(1) != 1) {
+ // swap the bytes, and reverse the quads too
+ u_int32_t tmp = *receiver->sampleCollector.datap++;
+ *firstQuadPtr = htonl(*receiver->sampleCollector.datap);
+ *receiver->sampleCollector.datap++ = htonl(tmp);
+ }
+ else receiver->sampleCollector.datap += 2;
+}
+
+inline static void put128(SFLReceiver *receiver, u_char *val)
+{
+ memcpy(receiver->sampleCollector.datap, val, 16);
+ receiver->sampleCollector.datap += 4;
+}
+
+inline static void putString(SFLReceiver *receiver, SFLString *s)
+{
+ putNet32(receiver, s->len);
+ memcpy(receiver->sampleCollector.datap, s->str, s->len);
+ receiver->sampleCollector.datap += (s->len + 3) / 4; /* pad to 4-byte boundary */
+}
+
+inline static u_int32_t stringEncodingLength(SFLString *s) {
+ // answer in bytes, so remember to mulitply by 4 after rounding up to nearest 4-byte boundary
+ return 4 + (((s->len + 3) / 4) * 4);
+}
+
+inline static void putAddress(SFLReceiver *receiver, SFLAddress *addr)
+{
+ // encode unspecified addresses as IPV4:0.0.0.0 - or should we flag this as an error?
+ if(addr->type == 0) {
+ putNet32(receiver, SFLADDRESSTYPE_IP_V4);
+ put32(receiver, 0);
+ }
+ else {
+ putNet32(receiver, addr->type);
+ if(addr->type == SFLADDRESSTYPE_IP_V4) put32(receiver, addr->address.ip_v4.addr);
+ else put128(receiver, addr->address.ip_v6.addr);
+ }
+}
+
+inline static u_int32_t addressEncodingLength(SFLAddress *addr) {
+ return (addr->type == SFLADDRESSTYPE_IP_V6) ? 20 : 8; // type + address (unspecified == IPV4)
+}
+
+inline static void putMACAddress(SFLReceiver *receiver, u_int8_t *mac)
+{
+ memcpy(receiver->sampleCollector.datap, mac, 6);
+ receiver->sampleCollector.datap += 2;
+}
+
+inline static void putSwitch(SFLReceiver *receiver, SFLExtended_switch *sw)
+{
+ putNet32(receiver, sw->src_vlan);
+ putNet32(receiver, sw->src_priority);
+ putNet32(receiver, sw->dst_vlan);
+ putNet32(receiver, sw->dst_priority);
+}
+
+inline static void putRouter(SFLReceiver *receiver, SFLExtended_router *router)
+{
+ putAddress(receiver, &router->nexthop);
+ putNet32(receiver, router->src_mask);
+ putNet32(receiver, router->dst_mask);
+}
+
+inline static u_int32_t routerEncodingLength(SFLExtended_router *router) {
+ return addressEncodingLength(&router->nexthop) + 8;
+}
+
+inline static void putGateway(SFLReceiver *receiver, SFLExtended_gateway *gw)
+{
+ putAddress(receiver, &gw->nexthop);
+ putNet32(receiver, gw->as);
+ putNet32(receiver, gw->src_as);
+ putNet32(receiver, gw->src_peer_as);
+ putNet32(receiver, gw->dst_as_path_segments);
+ {
+ u_int32_t seg = 0;
+ for(; seg < gw->dst_as_path_segments; seg++) {
+ putNet32(receiver, gw->dst_as_path[seg].type);
+ putNet32(receiver, gw->dst_as_path[seg].length);
+ putNet32_run(receiver, gw->dst_as_path[seg].as.seq, gw->dst_as_path[seg].length);
+ }
+ }
+ putNet32(receiver, gw->communities_length);
+ putNet32_run(receiver, gw->communities, gw->communities_length);
+ putNet32(receiver, gw->localpref);
+}
+
+inline static u_int32_t gatewayEncodingLength(SFLExtended_gateway *gw) {
+ u_int32_t elemSiz = addressEncodingLength(&gw->nexthop);
+ u_int32_t seg = 0;
+ elemSiz += 16; // as, src_as, src_peer_as, dst_as_path_segments
+ for(; seg < gw->dst_as_path_segments; seg++) {
+ elemSiz += 8; // type, length
+ elemSiz += 4 * gw->dst_as_path[seg].length; // set/seq bytes
+ }
+ elemSiz += 4; // communities_length
+ elemSiz += 4 * gw->communities_length; // communities
+ elemSiz += 4; // localpref
+ return elemSiz;
+}
+
+inline static void putUser(SFLReceiver *receiver, SFLExtended_user *user)
+{
+ putNet32(receiver, user->src_charset);
+ putString(receiver, &user->src_user);
+ putNet32(receiver, user->dst_charset);
+ putString(receiver, &user->dst_user);
+}
+
+inline static u_int32_t userEncodingLength(SFLExtended_user *user) {
+ return 4
+ + stringEncodingLength(&user->src_user)
+ + 4
+ + stringEncodingLength(&user->dst_user);
+}
+
+inline static void putUrl(SFLReceiver *receiver, SFLExtended_url *url)
+{
+ putNet32(receiver, url->direction);
+ putString(receiver, &url->url);
+ putString(receiver, &url->host);
+}
+
+inline static u_int32_t urlEncodingLength(SFLExtended_url *url) {
+ return 4
+ + stringEncodingLength(&url->url)
+ + stringEncodingLength(&url->host);
+}
+
+inline static void putLabelStack(SFLReceiver *receiver, SFLLabelStack *labelStack)
+{
+ putNet32(receiver, labelStack->depth);
+ putNet32_run(receiver, labelStack->stack, labelStack->depth);
+}
+
+inline static u_int32_t labelStackEncodingLength(SFLLabelStack *labelStack) {
+ return 4 + (4 * labelStack->depth);
+}
+
+inline static void putMpls(SFLReceiver *receiver, SFLExtended_mpls *mpls)
+{
+ putAddress(receiver, &mpls->nextHop);
+ putLabelStack(receiver, &mpls->in_stack);
+ putLabelStack(receiver, &mpls->out_stack);
+}
+
+inline static u_int32_t mplsEncodingLength(SFLExtended_mpls *mpls) {
+ return addressEncodingLength(&mpls->nextHop)
+ + labelStackEncodingLength(&mpls->in_stack)
+ + labelStackEncodingLength(&mpls->out_stack);
+}
+
+inline static void putNat(SFLReceiver *receiver, SFLExtended_nat *nat)
+{
+ putAddress(receiver, &nat->src);
+ putAddress(receiver, &nat->dst);
+}
+
+inline static u_int32_t natEncodingLength(SFLExtended_nat *nat) {
+ return addressEncodingLength(&nat->src)
+ + addressEncodingLength(&nat->dst);
+}
+
+inline static void putMplsTunnel(SFLReceiver *receiver, SFLExtended_mpls_tunnel *tunnel)
+{
+ putString(receiver, &tunnel->tunnel_lsp_name);
+ putNet32(receiver, tunnel->tunnel_id);
+ putNet32(receiver, tunnel->tunnel_cos);
+}
+
+inline static u_int32_t mplsTunnelEncodingLength(SFLExtended_mpls_tunnel *tunnel) {
+ return stringEncodingLength(&tunnel->tunnel_lsp_name) + 8;
+}
+
+inline static void putMplsVc(SFLReceiver *receiver, SFLExtended_mpls_vc *vc)
+{
+ putString(receiver, &vc->vc_instance_name);
+ putNet32(receiver, vc->vll_vc_id);
+ putNet32(receiver, vc->vc_label_cos);
+}
+
+inline static u_int32_t mplsVcEncodingLength(SFLExtended_mpls_vc *vc) {
+ return stringEncodingLength( &vc->vc_instance_name) + 8;
+}
+
+inline static void putMplsFtn(SFLReceiver *receiver, SFLExtended_mpls_FTN *ftn)
+{
+ putString(receiver, &ftn->mplsFTNDescr);
+ putNet32(receiver, ftn->mplsFTNMask);
+}
+
+inline static u_int32_t mplsFtnEncodingLength(SFLExtended_mpls_FTN *ftn) {
+ return stringEncodingLength( &ftn->mplsFTNDescr) + 4;
+}
+
+inline static void putMplsLdpFec(SFLReceiver *receiver, SFLExtended_mpls_LDP_FEC *ldpfec)
+{
+ putNet32(receiver, ldpfec->mplsFecAddrPrefixLength);
+}
+
+inline static u_int32_t mplsLdpFecEncodingLength(SFLExtended_mpls_LDP_FEC *ldpfec) {
+ return 4;
+}
+
+inline static void putVlanTunnel(SFLReceiver *receiver, SFLExtended_vlan_tunnel *vlanTunnel)
+{
+ putLabelStack(receiver, &vlanTunnel->stack);
+}
+
+inline static u_int32_t vlanTunnelEncodingLength(SFLExtended_vlan_tunnel *vlanTunnel) {
+ return labelStackEncodingLength(&vlanTunnel->stack);
+}
+
+
+inline static void putGenericCounters(SFLReceiver *receiver, SFLIf_counters *counters)
+{
+ putNet32(receiver, counters->ifIndex);
+ putNet32(receiver, counters->ifType);
+ putNet64(receiver, counters->ifSpeed);
+ putNet32(receiver, counters->ifDirection);
+ putNet32(receiver, counters->ifStatus);
+ putNet64(receiver, counters->ifInOctets);
+ putNet32(receiver, counters->ifInUcastPkts);
+ putNet32(receiver, counters->ifInMulticastPkts);
+ putNet32(receiver, counters->ifInBroadcastPkts);
+ putNet32(receiver, counters->ifInDiscards);
+ putNet32(receiver, counters->ifInErrors);
+ putNet32(receiver, counters->ifInUnknownProtos);
+ putNet64(receiver, counters->ifOutOctets);
+ putNet32(receiver, counters->ifOutUcastPkts);
+ putNet32(receiver, counters->ifOutMulticastPkts);
+ putNet32(receiver, counters->ifOutBroadcastPkts);
+ putNet32(receiver, counters->ifOutDiscards);
+ putNet32(receiver, counters->ifOutErrors);
+ putNet32(receiver, counters->ifPromiscuousMode);
+}
+
+
+/*_________________-----------------------------__________________
+ _________________ computeFlowSampleSize __________________
+ -----------------_____________________________------------------
+*/
+
+static int computeFlowSampleSize(SFLReceiver *receiver, SFL_FLOW_SAMPLE_TYPE *fs)
+{
+ SFLFlow_sample_element *elem = fs->elements;
+#ifdef SFL_USE_32BIT_INDEX
+ u_int siz = 52; /* tag, length, sequence_number, ds_class, ds_index, sampling_rate,
+ sample_pool, drops, inputFormat, input, outputFormat, output, number of elements */
+#else
+ u_int siz = 40; /* tag, length, sequence_number, source_id, sampling_rate,
+ sample_pool, drops, input, output, number of elements */
+#endif
+
+ fs->num_elements = 0; /* we're going to count them again even if this was set by the client */
+ for(; elem != NULL; elem = elem->nxt) {
+ u_int elemSiz = 0;
+ fs->num_elements++;
+ siz += 8; /* tag, length */
+ switch(elem->tag) {
+ case SFLFLOW_HEADER:
+ elemSiz = 16; /* header_protocol, frame_length, stripped, header_length */
+ elemSiz += ((elem->flowType.header.header_length + 3) / 4) * 4; /* header, rounded up to nearest 4 bytes */
+ break;
+ case SFLFLOW_ETHERNET: elemSiz = sizeof(SFLSampled_ethernet); break;
+ case SFLFLOW_IPV4: elemSiz = sizeof(SFLSampled_ipv4); break;
+ case SFLFLOW_IPV6: elemSiz = sizeof(SFLSampled_ipv6); break;
+ case SFLFLOW_EX_SWITCH: elemSiz = sizeof(SFLExtended_switch); break;
+ case SFLFLOW_EX_ROUTER: elemSiz = routerEncodingLength(&elem->flowType.router); break;
+ case SFLFLOW_EX_GATEWAY: elemSiz = gatewayEncodingLength(&elem->flowType.gateway); break;
+ case SFLFLOW_EX_USER: elemSiz = userEncodingLength(&elem->flowType.user); break;
+ case SFLFLOW_EX_URL: elemSiz = urlEncodingLength(&elem->flowType.url); break;
+ case SFLFLOW_EX_MPLS: elemSiz = mplsEncodingLength(&elem->flowType.mpls); break;
+ case SFLFLOW_EX_NAT: elemSiz = natEncodingLength(&elem->flowType.nat); break;
+ case SFLFLOW_EX_MPLS_TUNNEL: elemSiz = mplsTunnelEncodingLength(&elem->flowType.mpls_tunnel); break;
+ case SFLFLOW_EX_MPLS_VC: elemSiz = mplsVcEncodingLength(&elem->flowType.mpls_vc); break;
+ case SFLFLOW_EX_MPLS_FTN: elemSiz = mplsFtnEncodingLength(&elem->flowType.mpls_ftn); break;
+ case SFLFLOW_EX_MPLS_LDP_FEC: elemSiz = mplsLdpFecEncodingLength(&elem->flowType.mpls_ldp_fec); break;
+ case SFLFLOW_EX_VLAN_TUNNEL: elemSiz = vlanTunnelEncodingLength(&elem->flowType.vlan_tunnel); break;
+ default:
+ sflError(receiver, "unexpected packet_data_tag");
+ return -1;
+ break;
+ }
+ // cache the element size, and accumulate it into the overall FlowSample size
+ elem->length = elemSiz;
+ siz += elemSiz;
+ }
+
+ return siz;
+}
+
+/*_________________-------------------------------__________________
+ _________________ sfl_receiver_writeFlowSample __________________
+ -----------------_______________________________------------------
+*/
+
+int sfl_receiver_writeFlowSample(SFLReceiver *receiver, SFL_FLOW_SAMPLE_TYPE *fs)
+{
+ int packedSize;
+ if(fs == NULL) return -1;
+ if((packedSize = computeFlowSampleSize(receiver, fs)) == -1) return -1;
+
+ // check in case this one sample alone is too big for the datagram
+ // in fact - if it is even half as big then we should ditch it. Very
+ // important to avoid overruning the packet buffer.
+ if(packedSize > (int)(receiver->sFlowRcvrMaximumDatagramSize / 2)) {
+ sflError(receiver, "flow sample too big for datagram");
+ return -1;
+ }
+
+ // if the sample pkt is full enough so that this sample might put
+ // it over the limit, then we should send it now before going on.
+ if((receiver->sampleCollector.pktlen + packedSize) >= receiver->sFlowRcvrMaximumDatagramSize)
+ sendSample(receiver);
+
+ receiver->sampleCollector.numSamples++;
+
+#ifdef SFL_USE_32BIT_INDEX
+ putNet32(receiver, SFLFLOW_SAMPLE_EXPANDED);
+#else
+ putNet32(receiver, SFLFLOW_SAMPLE);
+#endif
+
+ putNet32(receiver, packedSize - 8); // don't include tag and len
+ putNet32(receiver, fs->sequence_number);
+
+#ifdef SFL_USE_32BIT_INDEX
+ putNet32(receiver, fs->ds_class);
+ putNet32(receiver, fs->ds_index);
+#else
+ putNet32(receiver, fs->source_id);
+#endif
+
+ putNet32(receiver, fs->sampling_rate);
+ putNet32(receiver, fs->sample_pool);
+ putNet32(receiver, fs->drops);
+
+#ifdef SFL_USE_32BIT_INDEX
+ putNet32(receiver, fs->inputFormat);
+ putNet32(receiver, fs->input);
+ putNet32(receiver, fs->outputFormat);
+ putNet32(receiver, fs->output);
+#else
+ putNet32(receiver, fs->input);
+ putNet32(receiver, fs->output);
+#endif
+
+ putNet32(receiver, fs->num_elements);
+
+ {
+ SFLFlow_sample_element *elem = fs->elements;
+ for(; elem != NULL; elem = elem->nxt) {
+
+ putNet32(receiver, elem->tag);
+ putNet32(receiver, elem->length); // length cached in computeFlowSampleSize()
+
+ switch(elem->tag) {
+ case SFLFLOW_HEADER:
+ putNet32(receiver, elem->flowType.header.header_protocol);
+ putNet32(receiver, elem->flowType.header.frame_length);
+ putNet32(receiver, elem->flowType.header.stripped);
+ putNet32(receiver, elem->flowType.header.header_length);
+ /* the header */
+ memcpy(receiver->sampleCollector.datap, elem->flowType.header.header_bytes, elem->flowType.header.header_length);
+ /* round up to multiple of 4 to preserve alignment */
+ receiver->sampleCollector.datap += ((elem->flowType.header.header_length + 3) / 4);
+ break;
+ case SFLFLOW_ETHERNET:
+ putNet32(receiver, elem->flowType.ethernet.eth_len);
+ putMACAddress(receiver, elem->flowType.ethernet.src_mac);
+ putMACAddress(receiver, elem->flowType.ethernet.dst_mac);
+ putNet32(receiver, elem->flowType.ethernet.eth_type);
+ break;
+ case SFLFLOW_IPV4:
+ putNet32(receiver, elem->flowType.ipv4.length);
+ putNet32(receiver, elem->flowType.ipv4.protocol);
+ put32(receiver, elem->flowType.ipv4.src_ip.addr);
+ put32(receiver, elem->flowType.ipv4.dst_ip.addr);
+ putNet32(receiver, elem->flowType.ipv4.src_port);
+ putNet32(receiver, elem->flowType.ipv4.dst_port);
+ putNet32(receiver, elem->flowType.ipv4.tcp_flags);
+ putNet32(receiver, elem->flowType.ipv4.tos);
+ break;
+ case SFLFLOW_IPV6:
+ putNet32(receiver, elem->flowType.ipv6.length);
+ putNet32(receiver, elem->flowType.ipv6.protocol);
+ put128(receiver, elem->flowType.ipv6.src_ip.addr);
+ put128(receiver, elem->flowType.ipv6.dst_ip.addr);
+ putNet32(receiver, elem->flowType.ipv6.src_port);
+ putNet32(receiver, elem->flowType.ipv6.dst_port);
+ putNet32(receiver, elem->flowType.ipv6.tcp_flags);
+ putNet32(receiver, elem->flowType.ipv6.priority);
+ break;
+ case SFLFLOW_EX_SWITCH: putSwitch(receiver, &elem->flowType.sw); break;
+ case SFLFLOW_EX_ROUTER: putRouter(receiver, &elem->flowType.router); break;
+ case SFLFLOW_EX_GATEWAY: putGateway(receiver, &elem->flowType.gateway); break;
+ case SFLFLOW_EX_USER: putUser(receiver, &elem->flowType.user); break;
+ case SFLFLOW_EX_URL: putUrl(receiver, &elem->flowType.url); break;
+ case SFLFLOW_EX_MPLS: putMpls(receiver, &elem->flowType.mpls); break;
+ case SFLFLOW_EX_NAT: putNat(receiver, &elem->flowType.nat); break;
+ case SFLFLOW_EX_MPLS_TUNNEL: putMplsTunnel(receiver, &elem->flowType.mpls_tunnel); break;
+ case SFLFLOW_EX_MPLS_VC: putMplsVc(receiver, &elem->flowType.mpls_vc); break;
+ case SFLFLOW_EX_MPLS_FTN: putMplsFtn(receiver, &elem->flowType.mpls_ftn); break;
+ case SFLFLOW_EX_MPLS_LDP_FEC: putMplsLdpFec(receiver, &elem->flowType.mpls_ldp_fec); break;
+ case SFLFLOW_EX_VLAN_TUNNEL: putVlanTunnel(receiver, &elem->flowType.vlan_tunnel); break;
+ default:
+ sflError(receiver, "unexpected packet_data_tag");
+ return -1;
+ break;
+ }
+ }
+ }
+
+ // sanity check
+ assert(((u_char *)receiver->sampleCollector.datap
+ - (u_char *)receiver->sampleCollector.data
+ - receiver->sampleCollector.pktlen) == (u_int32_t)packedSize);
+
+ // update the pktlen
+ receiver->sampleCollector.pktlen = (u_char *)receiver->sampleCollector.datap - (u_char *)receiver->sampleCollector.data;
+ return packedSize;
+}
+
+/*_________________-----------------------------__________________
+ _________________ computeCountersSampleSize __________________
+ -----------------_____________________________------------------
+*/
+
+static int computeCountersSampleSize(SFLReceiver *receiver, SFL_COUNTERS_SAMPLE_TYPE *cs)
+{
+ SFLCounters_sample_element *elem = cs->elements;
+#ifdef SFL_USE_32BIT_INDEX
+ u_int siz = 24; /* tag, length, sequence_number, ds_class, ds_index, number of elements */
+#else
+ u_int siz = 20; /* tag, length, sequence_number, source_id, number of elements */
+#endif
+
+ cs->num_elements = 0; /* we're going to count them again even if this was set by the client */
+ for(; elem != NULL; elem = elem->nxt) {
+ u_int elemSiz = 0;
+ cs->num_elements++;
+ siz += 8; /* tag, length */
+ switch(elem->tag) {
+ case SFLCOUNTERS_GENERIC: elemSiz = sizeof(elem->counterBlock.generic); break;
+ case SFLCOUNTERS_ETHERNET: elemSiz = sizeof(elem->counterBlock.ethernet); break;
+ case SFLCOUNTERS_TOKENRING: elemSiz = sizeof(elem->counterBlock.tokenring); break;
+ case SFLCOUNTERS_VG: elemSiz = sizeof(elem->counterBlock.vg); break;
+ case SFLCOUNTERS_VLAN: elemSiz = sizeof(elem->counterBlock.vlan); break;
+ default:
+ sflError(receiver, "unexpected counters_tag");
+ return -1;
+ break;
+ }
+ // cache the element size, and accumulate it into the overall FlowSample size
+ elem->length = elemSiz;
+ siz += elemSiz;
+ }
+ return siz;
+}
+
+/*_________________----------------------------------__________________
+ _________________ sfl_receiver_writeCountersSample __________________
+ -----------------__________________________________------------------
+*/
+
+int sfl_receiver_writeCountersSample(SFLReceiver *receiver, SFL_COUNTERS_SAMPLE_TYPE *cs)
+{
+ int packedSize;
+ if(cs == NULL) return -1;
+ // if the sample pkt is full enough so that this sample might put
+ // it over the limit, then we should send it now.
+ if((packedSize = computeCountersSampleSize(receiver, cs)) == -1) return -1;
+
+ // check in case this one sample alone is too big for the datagram
+ // in fact - if it is even half as big then we should ditch it. Very
+ // important to avoid overruning the packet buffer.
+ if(packedSize > (int)(receiver->sFlowRcvrMaximumDatagramSize / 2)) {
+ sflError(receiver, "counters sample too big for datagram");
+ return -1;
+ }
+
+ if((receiver->sampleCollector.pktlen + packedSize) >= receiver->sFlowRcvrMaximumDatagramSize)
+ sendSample(receiver);
+
+ receiver->sampleCollector.numSamples++;
+
+#ifdef SFL_USE_32BIT_INDEX
+ putNet32(receiver, SFLCOUNTERS_SAMPLE_EXPANDED);
+#else
+ putNet32(receiver, SFLCOUNTERS_SAMPLE);
+#endif
+
+ putNet32(receiver, packedSize - 8); // tag and length not included
+ putNet32(receiver, cs->sequence_number);
+
+#ifdef SFL_USE_32BIT_INDEX
+ putNet32(receiver, cs->ds_class);
+ putNet32(receiver, cs->ds_index);
+#else
+ putNet32(receiver, cs->source_id);
+#endif
+
+ putNet32(receiver, cs->num_elements);
+
+ {
+ SFLCounters_sample_element *elem = cs->elements;
+ for(; elem != NULL; elem = elem->nxt) {
+
+ putNet32(receiver, elem->tag);
+ putNet32(receiver, elem->length); // length cached in computeCountersSampleSize()
+
+ switch(elem->tag) {
+ case SFLCOUNTERS_GENERIC:
+ putGenericCounters(receiver, &(elem->counterBlock.generic));
+ break;
+ case SFLCOUNTERS_ETHERNET:
+ // all these counters are 32-bit
+ putNet32_run(receiver, &elem->counterBlock.ethernet, sizeof(elem->counterBlock.ethernet) / 4);
+ break;
+ case SFLCOUNTERS_TOKENRING:
+ // all these counters are 32-bit
+ putNet32_run(receiver, &elem->counterBlock.tokenring, sizeof(elem->counterBlock.tokenring) / 4);
+ break;
+ case SFLCOUNTERS_VG:
+ // mixed sizes
+ putNet32(receiver, elem->counterBlock.vg.dot12InHighPriorityFrames);
+ putNet64(receiver, elem->counterBlock.vg.dot12InHighPriorityOctets);
+ putNet32(receiver, elem->counterBlock.vg.dot12InNormPriorityFrames);
+ putNet64(receiver, elem->counterBlock.vg.dot12InNormPriorityOctets);
+ putNet32(receiver, elem->counterBlock.vg.dot12InIPMErrors);
+ putNet32(receiver, elem->counterBlock.vg.dot12InOversizeFrameErrors);
+ putNet32(receiver, elem->counterBlock.vg.dot12InDataErrors);
+ putNet32(receiver, elem->counterBlock.vg.dot12InNullAddressedFrames);
+ putNet32(receiver, elem->counterBlock.vg.dot12OutHighPriorityFrames);
+ putNet64(receiver, elem->counterBlock.vg.dot12OutHighPriorityOctets);
+ putNet32(receiver, elem->counterBlock.vg.dot12TransitionIntoTrainings);
+ putNet64(receiver, elem->counterBlock.vg.dot12HCInHighPriorityOctets);
+ putNet64(receiver, elem->counterBlock.vg.dot12HCInNormPriorityOctets);
+ putNet64(receiver, elem->counterBlock.vg.dot12HCOutHighPriorityOctets);
+ break;
+ case SFLCOUNTERS_VLAN:
+ // mixed sizes
+ putNet32(receiver, elem->counterBlock.vlan.vlan_id);
+ putNet64(receiver, elem->counterBlock.vlan.octets);
+ putNet32(receiver, elem->counterBlock.vlan.ucastPkts);
+ putNet32(receiver, elem->counterBlock.vlan.multicastPkts);
+ putNet32(receiver, elem->counterBlock.vlan.broadcastPkts);
+ putNet32(receiver, elem->counterBlock.vlan.discards);
+ break;
+ default:
+ sflError(receiver, "unexpected counters_tag");
+ return -1;
+ break;
+ }
+ }
+ }
+ // sanity check
+ assert(((u_char *)receiver->sampleCollector.datap
+ - (u_char *)receiver->sampleCollector.data
+ - receiver->sampleCollector.pktlen) == (u_int32_t)packedSize);
+
+ // update the pktlen
+ receiver->sampleCollector.pktlen = (u_char *)receiver->sampleCollector.datap - (u_char *)receiver->sampleCollector.data;
+ return packedSize;
+}
+
+/*_________________---------------------------------__________________
+ _________________ sfl_receiver_samplePacketsSent __________________
+ -----------------_________________________________------------------
+*/
+
+u_int32_t sfl_receiver_samplePacketsSent(SFLReceiver *receiver)
+{
+ return receiver->sampleCollector.packetSeqNo;
+}
+
+/*_________________---------------------------__________________
+ _________________ sendSample __________________
+ -----------------___________________________------------------
+*/
+
+static void sendSample(SFLReceiver *receiver)
+{
+ /* construct and send out the sample, then reset for the next one... */
+ /* first fill in the header with the latest values */
+ /* version, agent_address and sub_agent_id were pre-set. */
+ u_int32_t hdrIdx = (receiver->agent->myIP.type == SFLADDRESSTYPE_IP_V6) ? 7 : 4;
+ receiver->sampleCollector.data[hdrIdx++] = htonl(++receiver->sampleCollector.packetSeqNo); /* seq no */
+ receiver->sampleCollector.data[hdrIdx++] = htonl((receiver->agent->now - receiver->agent->bootTime) * 1000); /* uptime */
+ receiver->sampleCollector.data[hdrIdx++] = htonl(receiver->sampleCollector.numSamples); /* num samples */
+ /* send */
+ if(receiver->agent->sendFn) (*receiver->agent->sendFn)(receiver->agent->magic,
+ receiver->agent,
+ receiver,
+ (u_char *)receiver->sampleCollector.data,
+ receiver->sampleCollector.pktlen);
+ else {
+#ifdef SFLOW_DO_SOCKET
+ /* send it myself */
+ if (receiver->sFlowRcvrAddress.type == SFLADDRESSTYPE_IP_V6) {
+ u_int32_t soclen = sizeof(struct sockaddr_in6);
+ int result = sendto(receiver->agent->receiverSocket6,
+ receiver->sampleCollector.data,
+ receiver->sampleCollector.pktlen,
+ 0,
+ (struct sockaddr *)&receiver->receiver6,
+ soclen);
+ if(result == -1 && errno != EINTR) sfl_agent_sysError(receiver->agent, "receiver", "IPv6 socket sendto error");
+ if(result == 0) sfl_agent_error(receiver->agent, "receiver", "IPv6 socket sendto returned 0");
+ }
+ else {
+ u_int32_t soclen = sizeof(struct sockaddr_in);
+ int result = sendto(receiver->agent->receiverSocket4,
+ receiver->sampleCollector.data,
+ receiver->sampleCollector.pktlen,
+ 0,
+ (struct sockaddr *)&receiver->receiver4,
+ soclen);
+ if(result == -1 && errno != EINTR) sfl_agent_sysError(receiver->agent, "receiver", "socket sendto error");
+ if(result == 0) sfl_agent_error(receiver->agent, "receiver", "socket sendto returned 0");
+ }
+#endif
+ }
+
+ /* reset for the next time */
+ resetSampleCollector(receiver);
+}
+
+/*_________________---------------------------__________________
+ _________________ resetSampleCollector __________________
+ -----------------___________________________------------------
+*/
+
+static void resetSampleCollector(SFLReceiver *receiver)
+{
+ receiver->sampleCollector.pktlen = 0;
+ receiver->sampleCollector.numSamples = 0;
+ /* point the datap to just after the header */
+ receiver->sampleCollector.datap = (receiver->agent->myIP.type == SFLADDRESSTYPE_IP_V6) ?
+ (receiver->sampleCollector.data + 10) : (receiver->sampleCollector.data + 7);
+
+ receiver->sampleCollector.pktlen = (u_char *)receiver->sampleCollector.datap - (u_char *)receiver->sampleCollector.data;
+}
+
+/*_________________---------------------------__________________
+ _________________ sflError __________________
+ -----------------___________________________------------------
+*/
+
+static void sflError(SFLReceiver *receiver, char *msg)
+{
+ sfl_agent_error(receiver->agent, "receiver", msg);
+ resetSampleCollector(receiver);
+}
diff --git a/lib/sflow_sampler.c b/lib/sflow_sampler.c
new file mode 100644
index 00000000..759b5a22
--- /dev/null
+++ b/lib/sflow_sampler.c
@@ -0,0 +1,183 @@
+/* Copyright (c) 2002-2009 InMon Corp. Licensed under the terms of the InMon sFlow licence: */
+/* http://www.inmon.com/technology/sflowlicense.txt */
+
+#include "sflow_api.h"
+
+
+/*_________________--------------------------__________________
+ _________________ sfl_sampler_init __________________
+ -----------------__________________________------------------
+*/
+
+void sfl_sampler_init(SFLSampler *sampler, SFLAgent *agent, SFLDataSource_instance *pdsi)
+{
+ /* copy the dsi in case it points to sampler->dsi, which we are about to clear.
+ (Thanks to Jagjit Choudray of Force 10 Networks for pointing out this bug) */
+ SFLDataSource_instance dsi = *pdsi;
+
+ /* preserve the *nxt pointer too, in case we are resetting this poller and it is
+ already part of the agent's linked list (thanks to Matt Woodly for pointing this out) */
+ SFLSampler *nxtPtr = sampler->nxt;
+
+ /* clear everything */
+ memset(sampler, 0, sizeof(*sampler));
+
+ /* restore the linked list ptr */
+ sampler->nxt = nxtPtr;
+
+ /* now copy in the parameters */
+ sampler->agent = agent;
+ sampler->dsi = dsi;
+
+ /* set defaults */
+ sampler->sFlowFsMaximumHeaderSize = SFL_DEFAULT_HEADER_SIZE;
+ sampler->sFlowFsPacketSamplingRate = SFL_DEFAULT_SAMPLING_RATE;
+}
+
+/*_________________--------------------------__________________
+ _________________ reset __________________
+ -----------------__________________________------------------
+*/
+
+static void reset(SFLSampler *sampler)
+{
+ SFLDataSource_instance dsi = sampler->dsi;
+ sfl_sampler_init(sampler, sampler->agent, &dsi);
+}
+
+/*_________________---------------------------__________________
+ _________________ MIB access __________________
+ -----------------___________________________------------------
+*/
+u_int32_t sfl_sampler_get_sFlowFsReceiver(SFLSampler *sampler) {
+ return sampler->sFlowFsReceiver;
+}
+void sfl_sampler_set_sFlowFsReceiver(SFLSampler *sampler, u_int32_t sFlowFsReceiver) {
+ sampler->sFlowFsReceiver = sFlowFsReceiver;
+ if(sFlowFsReceiver == 0) reset(sampler);
+ else {
+ /* retrieve and cache a direct pointer to my receiver */
+ sampler->myReceiver = sfl_agent_getReceiver(sampler->agent, sampler->sFlowFsReceiver);
+ }
+}
+u_int32_t sfl_sampler_get_sFlowFsPacketSamplingRate(SFLSampler *sampler) {
+ return sampler->sFlowFsPacketSamplingRate;
+}
+void sfl_sampler_set_sFlowFsPacketSamplingRate(SFLSampler *sampler, u_int32_t sFlowFsPacketSamplingRate) {
+ sampler->sFlowFsPacketSamplingRate = sFlowFsPacketSamplingRate;
+}
+u_int32_t sfl_sampler_get_sFlowFsMaximumHeaderSize(SFLSampler *sampler) {
+ return sampler->sFlowFsMaximumHeaderSize;
+}
+void sfl_sampler_set_sFlowFsMaximumHeaderSize(SFLSampler *sampler, u_int32_t sFlowFsMaximumHeaderSize) {
+ sampler->sFlowFsMaximumHeaderSize = sFlowFsMaximumHeaderSize;
+}
+
+/* call this to set a maximum samples-per-second threshold. If the sampler reaches this
+ threshold it will automatically back off the sampling rate. A value of 0 disables the
+ mechanism */
+void sfl_sampler_set_backoffThreshold(SFLSampler *sampler, u_int32_t samplesPerSecond) {
+ sampler->backoffThreshold = samplesPerSecond;
+}
+u_int32_t sfl_sampler_get_backoffThreshold(SFLSampler *sampler) {
+ return sampler->backoffThreshold;
+}
+u_int32_t sfl_sampler_get_samplesLastTick(SFLSampler *sampler) {
+ return sampler->samplesLastTick;
+}
+
+/*_________________---------------------------------__________________
+ _________________ sequence number reset __________________
+ -----------------_________________________________------------------
+ Used by the agent to indicate a samplePool discontinuity
+ so that the sflow collector will know to ignore the next delta.
+*/
+void sfl_sampler_resetFlowSeqNo(SFLSampler *sampler) { sampler->flowSampleSeqNo = 0; }
+
+
+/*_________________---------------------------__________________
+ _________________ sfl_sampler_tick __________________
+ -----------------___________________________------------------
+*/
+
+void sfl_sampler_tick(SFLSampler *sampler, time_t now)
+{
+ if(sampler->backoffThreshold && sampler->samplesThisTick > sampler->backoffThreshold) {
+ /* automatic backoff. If using hardware sampling then this is where you have to
+ * call out to change the sampling rate and make sure that any other registers/variables
+ * that hold this value are updated.
+ */
+ sampler->sFlowFsPacketSamplingRate *= 2;
+ }
+ sampler->samplesLastTick = sampler->samplesThisTick;
+ sampler->samplesThisTick = 0;
+}
+
+
+
+/*_________________------------------------------__________________
+ _________________ sfl_sampler_writeFlowSample __________________
+ -----------------______________________________------------------
+*/
+
+void sfl_sampler_writeFlowSample(SFLSampler *sampler, SFL_FLOW_SAMPLE_TYPE *fs)
+{
+ if(fs == NULL) return;
+ sampler->samplesThisTick++;
+ /* increment the sequence number */
+ fs->sequence_number = ++sampler->flowSampleSeqNo;
+ /* copy the other header fields in */
+#ifdef SFL_USE_32BIT_INDEX
+ fs->ds_class = SFL_DS_CLASS(sampler->dsi);
+ fs->ds_index = SFL_DS_INDEX(sampler->dsi);
+#else
+ fs->source_id = SFL_DS_DATASOURCE(sampler->dsi);
+#endif
+ /* the sampling rate may have been set already. */
+ if(fs->sampling_rate == 0) fs->sampling_rate = sampler->sFlowFsPacketSamplingRate;
+ /* the samplePool may be maintained upstream too. */
+ if( fs->sample_pool == 0) fs->sample_pool = sampler->samplePool;
+ /* sent to my receiver */
+ if(sampler->myReceiver) sfl_receiver_writeFlowSample(sampler->myReceiver, fs);
+}
+
+#ifdef SFLOW_SOFTWARE_SAMPLING
+
+/* ================== software sampling ========================*/
+
+/*_________________---------------------------__________________
+ _________________ nextRandomSkip __________________
+ -----------------___________________________------------------
+*/
+
+inline static u_int32_t nextRandomSkip(u_int32_t mean)
+{
+ if(mean == 0 || mean == 1) return 1;
+ return ((random() % ((2 * mean) - 1)) + 1);
+}
+
+/*_________________---------------------------__________________
+ _________________ sfl_sampler_takeSample __________________
+ -----------------___________________________------------------
+*/
+
+int sfl_sampler_takeSample(SFLSampler *sampler)
+{
+ if(sampler->skip == 0) {
+ /* first time - seed the random number generator */
+ srandom(SFL_DS_INDEX(sampler->dsi));
+ sampler->skip = nextRandomSkip(sampler->sFlowFsPacketSamplingRate);
+ }
+
+ /* increment the samplePool */
+ sampler->samplePool++;
+
+ if(--sampler->skip == 0) {
+ /* reached zero. Set the next skip and return true. */
+ sampler->skip = nextRandomSkip(sampler->sFlowFsPacketSamplingRate);
+ return 1;
+ }
+ return 0;
+}
+
+#endif /* SFLOW_SOFTWARE_SAMPLING */
diff --git a/lib/vlog-modules.def b/lib/vlog-modules.def
index 0d44e734..b791525e 100644
--- a/lib/vlog-modules.def
+++ b/lib/vlog-modules.def
@@ -61,6 +61,7 @@ VLOG_MODULE(proc_net_compat)
VLOG_MODULE(process)
VLOG_MODULE(rconn)
VLOG_MODULE(rtnetlink)
+VLOG_MODULE(sflow)
VLOG_MODULE(stp)
VLOG_MODULE(stats)
VLOG_MODULE(status)
diff --git a/ofproto/automake.mk b/ofproto/automake.mk
index 87a0fa68..3c189771 100644
--- a/ofproto/automake.mk
+++ b/ofproto/automake.mk
@@ -21,6 +21,8 @@ ofproto_libofproto_a_SOURCES = \
ofproto/netflow.h \
ofproto/ofproto.c \
ofproto/ofproto.h \
+ ofproto/ofproto-sflow.c \
+ ofproto/ofproto-sflow.h \
ofproto/pktbuf.c \
ofproto/pktbuf.h \
ofproto/pinsched.c \
diff --git a/ofproto/collectors.c b/ofproto/collectors.c
index f7cb1dbe..4589f329 100644
--- a/ofproto/collectors.c
+++ b/ofproto/collectors.c
@@ -121,3 +121,9 @@ collectors_send(const struct collectors *c, const void *payload, size_t n)
}
}
}
+
+int
+collectors_count(const struct collectors *c)
+{
+ return c->n_fds;
+}
diff --git a/ofproto/collectors.h b/ofproto/collectors.h
index a4abb631..ac70f375 100644
--- a/ofproto/collectors.h
+++ b/ofproto/collectors.h
@@ -28,4 +28,6 @@ void collectors_destroy(struct collectors *);
void collectors_send(const struct collectors *, const void *, size_t);
+int collectors_count(const struct collectors *);
+
#endif /* collectors.h */
diff --git a/ofproto/ofproto-sflow.c b/ofproto/ofproto-sflow.c
new file mode 100644
index 00000000..b37db42d
--- /dev/null
+++ b/ofproto/ofproto-sflow.c
@@ -0,0 +1,607 @@
+/*
+ * Copyright (c) 2009, 2010 InMon Corp.
+ * Copyright (c) 2009 Nicira Networks.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <config.h>
+#include "ofproto-sflow.h"
+#include <inttypes.h>
+#include <stdlib.h>
+#include "collectors.h"
+#include "dpif.h"
+#include "compiler.h"
+#include "netdev.h"
+#include "ofpbuf.h"
+#include "ofproto.h"
+#include "poll-loop.h"
+#include "port-array.h"
+#include "sflow_api.h"
+#include "socket-util.h"
+#include "timeval.h"
+
+#define THIS_MODULE VLM_sflow
+#include "vlog.h"
+
+struct ofproto_sflow_port {
+ struct netdev *netdev; /* Underlying network device, for stats. */
+ SFLDataSource_instance dsi; /* sFlow library's notion of port number. */
+};
+
+struct ofproto_sflow {
+ struct ofproto *ofproto;
+ struct collectors *collectors;
+ SFLAgent *sflow_agent;
+ struct ofproto_sflow_options *options;
+ struct dpif *dpif;
+ time_t next_tick;
+ size_t n_flood, n_all;
+ struct port_array ports; /* Indexed by ODP port number. */
+};
+
+#define RECEIVER_INDEX 1
+
+static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
+
+static bool
+nullable_string_is_equal(const char *a, const char *b)
+{
+ return a ? b && !strcmp(a, b) : !b;
+}
+
+static bool
+ofproto_sflow_options_equal(const struct ofproto_sflow_options *a,
+ const struct ofproto_sflow_options *b)
+{
+ return (svec_equal(&a->targets, &b->targets)
+ && a->sampling_rate == b->sampling_rate
+ && a->polling_interval == b->polling_interval
+ && a->header_len == b->header_len
+ && a->sub_id == b->sub_id
+ && nullable_string_is_equal(a->agent_device, b->agent_device)
+ && nullable_string_is_equal(a->control_ip, b->control_ip));
+}
+
+static struct ofproto_sflow_options *
+ofproto_sflow_options_clone(const struct ofproto_sflow_options *old)
+{
+ struct ofproto_sflow_options *new = xmemdup(old, sizeof *old);
+ svec_clone(&new->targets, &old->targets);
+ new->agent_device = old->agent_device ? xstrdup(old->agent_device) : NULL;
+ new->control_ip = old->control_ip ? xstrdup(old->control_ip) : NULL;
+ return new;
+}
+
+static void
+ofproto_sflow_options_destroy(struct ofproto_sflow_options *options)
+{
+ if (options) {
+ svec_destroy(&options->targets);
+ free(options->agent_device);
+ free(options->control_ip);
+ free(options);
+ }
+}
+
+/* sFlow library callback to allocate memory. */
+static void *
+sflow_agent_alloc_cb(void *magic UNUSED, SFLAgent *agent UNUSED, size_t bytes)
+{
+ return calloc(1, bytes);
+}
+
+/* sFlow library callback to free memory. */
+static int
+sflow_agent_free_cb(void *magic UNUSED, SFLAgent *agent UNUSED, void *obj)
+{
+ free(obj);
+ return 0;
+}
+
+/* sFlow library callback to report error. */
+static void
+sflow_agent_error_cb(void *magic UNUSED, SFLAgent *agent UNUSED, char *msg)
+{
+ VLOG_WARN("sFlow agent error: %s", msg);
+}
+
+/* sFlow library callback to send datagram. */
+static void
+sflow_agent_send_packet_cb(void *os_, SFLAgent *agent UNUSED,
+ SFLReceiver *receiver UNUSED, u_char *pkt,
+ uint32_t pktLen)
+{
+ struct ofproto_sflow *os = os_;
+ collectors_send(os->collectors, pkt, pktLen);
+}
+
+static void
+sflow_agent_get_counters(void *os_, SFLPoller *poller,
+ SFL_COUNTERS_SAMPLE_TYPE *cs)
+{
+ struct ofproto_sflow *os = os_;
+ SFLCounters_sample_element elem;
+ struct ofproto_sflow_port *osp;
+ SFLIf_counters *counters;
+ struct netdev_stats stats;
+ enum netdev_flags flags;
+ uint32_t current;
+
+ osp = port_array_get(&os->ports, poller->bridgePort);
+ if (!osp) {
+ return;
+ }
+
+ elem.tag = SFLCOUNTERS_GENERIC;
+ counters = &elem.counterBlock.generic;
+ counters->ifIndex = SFL_DS_INDEX(poller->dsi);
+ counters->ifType = 6;
+ if (!netdev_get_features(osp->netdev, &current, NULL, NULL, NULL)) {
+ /* The values of ifDirection come from MAU MIB (RFC 2668): 0 = unknown,
+ 1 = full-duplex, 2 = half-duplex, 3 = in, 4=out */
+ counters->ifSpeed = netdev_features_to_bps(current);
+ counters->ifDirection = (netdev_features_is_full_duplex(current)
+ ? 1 : 2);
+ } else {
+ counters->ifSpeed = 100000000;
+ counters->ifDirection = 0;
+ }
+ if (!netdev_get_flags(osp->netdev, &flags) && flags & NETDEV_UP) {
+ bool carrier;
+
+ counters->ifStatus = 1; /* ifAdminStatus up. */
+ if (!netdev_get_carrier(osp->netdev, &carrier) && carrier) {
+ counters->ifStatus |= 2; /* ifOperStatus us. */
+ }
+ } else {
+ counters->ifStatus = 0; /* Down. */
+ }
+
+ /* XXX
+ 1. Is the multicast counter filled in?
+ 2. Does the multicast counter include broadcasts?
+ 3. Does the rx_packets counter include multicasts/broadcasts?
+ */
+ netdev_get_stats(osp->netdev, &stats);
+ counters->ifInOctets = stats.rx_bytes;
+ counters->ifInUcastPkts = stats.rx_packets;
+ counters->ifInMulticastPkts = stats.multicast;
+ counters->ifInBroadcastPkts = -1;
+ counters->ifInDiscards = stats.rx_dropped;
+ counters->ifInErrors = stats.rx_errors;
+ counters->ifInUnknownProtos = -1;
+ counters->ifOutOctets = stats.tx_bytes;
+ counters->ifOutUcastPkts = stats.tx_packets;
+ counters->ifOutMulticastPkts = -1;
+ counters->ifOutBroadcastPkts = -1;
+ counters->ifOutDiscards = stats.tx_dropped;
+ counters->ifOutErrors = stats.tx_errors;
+ counters->ifPromiscuousMode = 0;
+
+ SFLADD_ELEMENT(cs, &elem);
+ sfl_poller_writeCountersSample(poller, cs);
+}
+
+/* Obtains an address to use for the local sFlow agent and stores it into
+ * '*agent_addr'. Returns true if successful, false on failure.
+ *
+ * The sFlow agent address should be a local IP address that is persistent and
+ * reachable over the network, if possible. The IP address associated with
+ * 'agent_device' is used if it has one, and otherwise 'control_ip', the IP
+ * address used to talk to the controller. */
+static bool
+sflow_choose_agent_address(const char *agent_device, const char *control_ip,
+ SFLAddress *agent_addr)
+{
+ struct in_addr in4;
+
+ memset(agent_addr, 0, sizeof *agent_addr);
+ agent_addr->type = SFLADDRESSTYPE_IP_V4;
+
+ if (agent_device) {
+ struct netdev *netdev;
+
+ if (!netdev_open(agent_device, NETDEV_ETH_TYPE_NONE, &netdev)) {
+ int error = netdev_get_in4(netdev, &in4, NULL);
+ netdev_close(netdev);
+ if (!error) {
+ goto success;
+ }
+ }
+ }
+
+ if (control_ip && !lookup_ip(control_ip, &in4)) {
+ goto success;
+ }
+
+ VLOG_ERR("could not determine IP address for sFlow agent");
+ return false;
+
+success:
+ agent_addr->address.ip_v4.addr = in4.s_addr;
+ return true;
+}
+
+void
+ofproto_sflow_clear(struct ofproto_sflow *os)
+{
+ struct ofproto_sflow_port *osp;
+ unsigned int odp_port;
+
+ if (os->sflow_agent) {
+ sfl_agent_release(os->sflow_agent);
+ os->sflow_agent = NULL;
+ }
+ collectors_destroy(os->collectors);
+ os->collectors = NULL;
+ ofproto_sflow_options_destroy(os->options);
+ os->options = NULL;
+
+ PORT_ARRAY_FOR_EACH (osp, &os->ports, odp_port) {
+ ofproto_sflow_del_port(os, odp_port);
+ }
+ port_array_clear(&os->ports);
+
+ /* Turn off sampling to save CPU cycles. */
+ dpif_set_sflow_probability(os->dpif, 0);
+}
+
+bool
+ofproto_sflow_is_enabled(const struct ofproto_sflow *os)
+{
+ return os->collectors != NULL;
+}
+
+struct ofproto_sflow *
+ofproto_sflow_create(struct dpif *dpif)
+{
+ struct ofproto_sflow *os;
+
+ os = xcalloc(1, sizeof *os);
+ os->dpif = dpif;
+ os->next_tick = time_now() + 1;
+ port_array_init(&os->ports);
+ return os;
+}
+
+void
+ofproto_sflow_destroy(struct ofproto_sflow *os)
+{
+ if (os) {
+ ofproto_sflow_clear(os);
+ port_array_destroy(&os->ports);
+ free(os);
+ }
+}
+
+static void
+ofproto_sflow_add_poller(struct ofproto_sflow *os,
+ struct ofproto_sflow_port *osp, uint16_t odp_port)
+{
+ SFLPoller *poller = sfl_agent_addPoller(os->sflow_agent, &osp->dsi, os,
+ sflow_agent_get_counters);
+ sfl_poller_set_sFlowCpInterval(poller, os->options->polling_interval);
+ sfl_poller_set_sFlowCpReceiver(poller, RECEIVER_INDEX);
+ sfl_poller_set_bridgePort(poller, odp_port);
+}
+
+static void
+ofproto_sflow_add_sampler(struct ofproto_sflow *os,
+ struct ofproto_sflow_port *osp,
+ u_int32_t sampling_rate, u_int32_t header_len)
+{
+ SFLSampler *sampler = sfl_agent_addSampler(os->sflow_agent, &osp->dsi);
+ sfl_sampler_set_sFlowFsPacketSamplingRate(sampler, sampling_rate);
+ sfl_sampler_set_sFlowFsMaximumHeaderSize(sampler, header_len);
+ sfl_sampler_set_sFlowFsReceiver(sampler, RECEIVER_INDEX);
+}
+
+void
+ofproto_sflow_add_port(struct ofproto_sflow *os, uint16_t odp_port,
+ const char *netdev_name)
+{
+ struct ofproto_sflow_port *osp;
+ struct netdev *netdev;
+ uint32_t ifindex;
+ int error;
+
+ ofproto_sflow_del_port(os, odp_port);
+
+ /* Open network device. */
+ error = netdev_open(netdev_name, NETDEV_ETH_TYPE_NONE, &netdev);
+ if (error) {
+ VLOG_WARN_RL(&rl, "failed to open network device \"%s\": %s",
+ netdev_name, strerror(error));
+ return;
+ }
+
+ /* Add to table of ports. */
+ osp = xmalloc(sizeof *osp);
+ osp->netdev = netdev;
+ ifindex = netdev_get_ifindex(netdev);
+ if (ifindex <= 0) {
+ ifindex = (os->sflow_agent->subId << 16) + odp_port;
+ }
+ SFL_DS_SET(osp->dsi, 0, ifindex, 0);
+ port_array_set(&os->ports, odp_port, osp);
+
+ /* Add poller. */
+ if (os->sflow_agent) {
+ ofproto_sflow_add_poller(os, osp, odp_port);
+ }
+}
+
+void
+ofproto_sflow_del_port(struct ofproto_sflow *os, uint16_t odp_port)
+{
+ struct ofproto_sflow_port *osp = port_array_get(&os->ports, odp_port);
+ if (osp) {
+ if (os->sflow_agent) {
+ sfl_agent_removePoller(os->sflow_agent, &osp->dsi);
+ sfl_agent_removeSampler(os->sflow_agent, &osp->dsi);
+ }
+ netdev_close(osp->netdev);
+ free(osp);
+ port_array_set(&os->ports, odp_port, NULL);
+ }
+}
+
+void
+ofproto_sflow_set_options(struct ofproto_sflow *os,
+ const struct ofproto_sflow_options *options)
+{
+ struct ofproto_sflow_port *osp;
+ bool options_changed;
+ SFLReceiver *receiver;
+ unsigned int odp_port;
+ SFLAddress agentIP;
+ time_t now;
+ int error;
+
+ if (!options->targets.n || !options->sampling_rate) {
+ /* No point in doing any work if there are no targets or nothing to
+ * sample. */
+ ofproto_sflow_clear(os);
+ return;
+ }
+
+ options_changed = (!os->options
+ || !ofproto_sflow_options_equal(options, os->options));
+
+ /* Configure collectors if options have changed or if we're shortchanged in
+ * collectors (which indicates that opening one or more of the configured
+ * collectors failed, so that we should retry). */
+ if (options_changed
+ || collectors_count(os->collectors) < options->targets.n) {
+ collectors_destroy(os->collectors);
+ error = collectors_create(&options->targets,
+ SFL_DEFAULT_COLLECTOR_PORT, &os->collectors);
+ if (os->collectors == NULL) {
+ VLOG_WARN_RL(&rl, "no collectors could be initialized, "
+ "sFlow disabled");
+ ofproto_sflow_clear(os);
+ return;
+ }
+ }
+
+ /* Avoid reconfiguring if options didn't change. */
+ if (!options_changed) {
+ return;
+ }
+ ofproto_sflow_options_destroy(os->options);
+ os->options = ofproto_sflow_options_clone(options);
+
+ /* Choose agent IP address. */
+ if (!sflow_choose_agent_address(options->agent_device,
+ options->control_ip, &agentIP)) {
+ ofproto_sflow_clear(os);
+ return;
+ }
+
+ /* Create agent. */
+ VLOG_INFO("creating sFlow agent %d", options->sub_id);
+ if (os->sflow_agent) {
+ sfl_agent_release(os->sflow_agent);
+ }
+ os->sflow_agent = xcalloc(1, sizeof *os->sflow_agent);
+ now = time_now();
+ sfl_agent_init(os->sflow_agent,
+ &agentIP,
+ options->sub_id,
+ now, /* Boot time. */
+ now, /* Current time. */
+ os, /* Pointer supplied to callbacks. */
+ sflow_agent_alloc_cb,
+ sflow_agent_free_cb,
+ sflow_agent_error_cb,
+ sflow_agent_send_packet_cb);
+
+ receiver = sfl_agent_addReceiver(os->sflow_agent);
+ sfl_receiver_set_sFlowRcvrOwner(receiver, "Open vSwitch sFlow");
+ sfl_receiver_set_sFlowRcvrTimeout(receiver, 0xffffffff);
+
+ /* Set the sampling_rate down in the datapath. */
+ dpif_set_sflow_probability(os->dpif,
+ MAX(1, UINT32_MAX / options->sampling_rate));
+
+ /* Add samplers and pollers for the currently known ports. */
+ PORT_ARRAY_FOR_EACH (osp, &os->ports, odp_port) {
+ ofproto_sflow_add_sampler(os, osp,
+ options->sampling_rate, options->header_len);
+ }
+}
+
+static int
+ofproto_sflow_odp_port_to_ifindex(const struct ofproto_sflow *os,
+ uint16_t odp_port)
+{
+ struct ofproto_sflow_port *osp = port_array_get(&os->ports, odp_port);
+ return osp ? SFL_DS_INDEX(osp->dsi) : 0;
+}
+
+void
+ofproto_sflow_received(struct ofproto_sflow *os, struct odp_msg *msg)
+{
+ SFL_FLOW_SAMPLE_TYPE fs;
+ SFLFlow_sample_element hdrElem;
+ SFLSampled_header *header;
+ SFLFlow_sample_element switchElem;
+ SFLSampler *sampler;
+ const struct odp_sflow_sample_header *hdr;
+ const union odp_action *actions;
+ struct ofpbuf payload;
+ size_t n_actions, n_outputs;
+ size_t min_size;
+ flow_t flow;
+ size_t i;
+
+ /* Get odp_sflow_sample_header. */
+ min_size = sizeof *msg + sizeof *hdr;
+ if (min_size > msg->length) {
+ VLOG_WARN_RL(&rl, "sFlow packet too small (%"PRIu32" < %zu)",
+ msg->length, min_size);
+ return;
+ }
+ hdr = (const struct odp_sflow_sample_header *) (msg + 1);
+
+ /* Get actions. */
+ n_actions = hdr->n_actions;
+ if (n_actions > 65536 / sizeof *actions) {
+ VLOG_WARN_RL(&rl, "too many actions in sFlow packet (%zu > %zu)",
+ 65536 / sizeof *actions, n_actions);
+ return;
+ }
+ min_size += n_actions * sizeof *actions;
+ if (min_size > msg->length) {
+ VLOG_WARN_RL(&rl, "sFlow packet with %zu actions too small "
+ "(%"PRIu32" < %zu)",
+ n_actions, msg->length, min_size);
+ return;
+ }
+ actions = (const union odp_action *) (hdr + 1);
+
+ /* Get packet payload and extract flow. */
+ payload.data = (union odp_action *) (actions + n_actions);
+ payload.size = msg->length - min_size;
+ flow_extract(&payload, msg->port, &flow);
+
+ /* Build a flow sample */
+ memset(&fs, 0, sizeof fs);
+ fs.input = ofproto_sflow_odp_port_to_ifindex(os, msg->port);
+ fs.output = 0; /* Filled in correctly below. */
+ fs.sample_pool = hdr->sample_pool;
+
+ /* We are going to give it to the sampler that represents this input port.
+ * By implementing "ingress-only" sampling like this we ensure that we
+ * never have to offer the same sample to more than one sampler. */
+ sampler = sfl_agent_getSamplerByIfIndex(os->sflow_agent, fs.input);
+ if (!sampler) {
+ VLOG_WARN_RL(&rl, "no sampler for input ifIndex (%"PRIu32")",
+ fs.input);
+ return;
+ }
+
+ /* Sampled header. */
+ memset(&hdrElem, 0, sizeof hdrElem);
+ hdrElem.tag = SFLFLOW_HEADER;
+ header = &hdrElem.flowType.header;
+ header->header_protocol = SFLHEADER_ETHERNET_ISO8023;
+ header->frame_length = payload.size;
+ header->stripped = 4; /* Ethernet FCS stripped off. */
+ header->header_length = MIN(payload.size,
+ sampler->sFlowFsMaximumHeaderSize);
+ header->header_bytes = payload.data;
+
+ /* Add extended switch element. */
+ memset(&switchElem, 0, sizeof(switchElem));
+ switchElem.tag = SFLFLOW_EX_SWITCH;
+ switchElem.flowType.sw.src_vlan = ntohs(flow.dl_vlan);
+ switchElem.flowType.sw.src_priority = -1; /* XXX */
+ switchElem.flowType.sw.dst_vlan = -1; /* Filled in correctly below. */
+ switchElem.flowType.sw.dst_priority = switchElem.flowType.sw.src_priority;
+
+ /* Figure out the output ports. */
+ n_outputs = 0;
+ for (i = 0; i < n_actions; i++) {
+ const union odp_action *a = &actions[i];
+
+ switch (a->type) {
+ case ODPAT_OUTPUT:
+ fs.output = ofproto_sflow_odp_port_to_ifindex(os, a->output.port);
+ n_outputs++;
+ break;
+
+ case ODPAT_OUTPUT_GROUP:
+ n_outputs += (a->output_group.group == DP_GROUP_FLOOD ? os->n_flood
+ : a->output_group.group == DP_GROUP_ALL ? os->n_all
+ : 0);
+ break;
+
+ case ODPAT_SET_VLAN_VID:
+ switchElem.flowType.sw.dst_vlan = ntohs(a->vlan_vid.vlan_vid);
+ break;
+
+ case ODPAT_SET_VLAN_PCP:
+ switchElem.flowType.sw.dst_priority = a->vlan_pcp.vlan_pcp;
+ break;
+
+ default:
+ break;
+ }
+ }
+
+ /* Set output port, as defined by http://www.sflow.org/sflow_version_5.txt
+ (search for "Input/output port information"). */
+ if (!n_outputs) {
+ /* This value indicates that the packet was dropped for an unknown
+ * reason. */
+ fs.output = 0x40000000 | 256;
+ } else if (n_outputs > 1 || !fs.output) {
+ /* Setting the high bit means "multiple output ports". */
+ fs.output = 0x80000000 | n_outputs;
+ }
+
+ /* Submit the flow sample to be encoded into the next datagram. */
+ SFLADD_ELEMENT(&fs, &hdrElem);
+ SFLADD_ELEMENT(&fs, &switchElem);
+ sfl_sampler_writeFlowSample(sampler, &fs);
+}
+
+void
+ofproto_sflow_set_group_sizes(struct ofproto_sflow *os,
+ size_t n_flood, size_t n_all)
+{
+ os->n_flood = n_flood;
+ os->n_all = n_all;
+}
+
+void
+ofproto_sflow_run(struct ofproto_sflow *os)
+{
+ if (ofproto_sflow_is_enabled(os)) {
+ time_t now = time_now();
+ if (now >= os->next_tick) {
+ sfl_agent_tick(os->sflow_agent, now);
+ os->next_tick = now + 1;
+ }
+ }
+}
+
+void
+ofproto_sflow_wait(struct ofproto_sflow *os)
+{
+ if (ofproto_sflow_is_enabled(os)) {
+ poll_timer_wait(os->next_tick * 1000 - time_msec());
+ }
+}
diff --git a/ofproto/ofproto-sflow.h b/ofproto/ofproto-sflow.h
new file mode 100644
index 00000000..ec86d115
--- /dev/null
+++ b/ofproto/ofproto-sflow.h
@@ -0,0 +1,46 @@
+/*
+ * Copyright (c) 2009 InMon Corp.
+ * Copyright (c) 2009 Nicira Networks.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef OFPROTO_SFLOW_H
+#define OFPROTO_SFLOW_H 1
+
+#include <stdint.h>
+#include "svec.h"
+
+struct dpif;
+struct odp_msg;
+struct ofproto_sflow_options;
+
+struct ofproto_sflow *ofproto_sflow_create(struct dpif *);
+void ofproto_sflow_destroy(struct ofproto_sflow *);
+void ofproto_sflow_set_options(struct ofproto_sflow *,
+ const struct ofproto_sflow_options *);
+void ofproto_sflow_clear(struct ofproto_sflow *);
+bool ofproto_sflow_is_enabled(const struct ofproto_sflow *);
+
+void ofproto_sflow_add_port(struct ofproto_sflow *, uint16_t odp_port,
+ const char *netdev_name);
+void ofproto_sflow_del_port(struct ofproto_sflow *, uint16_t odp_port);
+void ofproto_sflow_set_group_sizes(struct ofproto_sflow *,
+ size_t n_flood, size_t n_all);
+
+void ofproto_sflow_run(struct ofproto_sflow *);
+void ofproto_sflow_wait(struct ofproto_sflow *);
+
+void ofproto_sflow_received(struct ofproto_sflow *, struct odp_msg *);
+
+#endif /* ofproto/ofproto-sflow.h */
diff --git a/ofproto/ofproto.c b/ofproto/ofproto.c
index 4995bbec..43054fa3 100644
--- a/ofproto/ofproto.c
+++ b/ofproto/ofproto.c
@@ -35,6 +35,7 @@
#include "netflow.h"
#include "odp-util.h"
#include "ofp-print.h"
+#include "ofproto-sflow.h"
#include "ofpbuf.h"
#include "openflow/nicira-ext.h"
#include "openflow/openflow.h"
@@ -60,10 +61,7 @@
#define THIS_MODULE VLM_ofproto
#include "vlog.h"
-enum {
- DP_GROUP_FLOOD = 0,
- DP_GROUP_ALL = 1
-};
+#include "sflow_api.h"
enum {
TABLEID_HASH = 0,
@@ -209,6 +207,7 @@ struct ofproto {
struct pinsched *miss_sched, *action_sched;
struct executer *executer;
struct netflow *netflow;
+ struct ofproto_sflow *sflow;
/* Flow table. */
struct classifier cls;
@@ -253,7 +252,8 @@ static void handle_odp_msg(struct ofproto *, struct ofpbuf *);
static void handle_openflow(struct ofconn *, struct ofproto *,
struct ofpbuf *);
-static void refresh_port_group(struct ofproto *, unsigned int group);
+static void refresh_port_groups(struct ofproto *);
+
static void update_port(struct ofproto *, const char *devname);
static int init_ports(struct ofproto *);
static void reinit_ports(struct ofproto *);
@@ -282,7 +282,7 @@ ofproto_create(const char *datapath, const struct ofhooks *ofhooks, void *aux,
dpif_close(dpif);
return error;
}
- error = dpif_recv_set_mask(dpif, ODPL_MISS | ODPL_ACTION);
+ error = dpif_recv_set_mask(dpif, ODPL_MISS | ODPL_ACTION | ODPL_SFLOW);
if (error) {
VLOG_ERR("failed to listen on datapath %s: %s",
datapath, strerror(error));
@@ -316,6 +316,7 @@ ofproto_create(const char *datapath, const struct ofhooks *ofhooks, void *aux,
p->miss_sched = p->action_sched = NULL;
p->executer = NULL;
p->netflow = NULL;
+ p->sflow = NULL;
/* Initialize flow table. */
classifier_init(&p->cls);
@@ -549,6 +550,30 @@ ofproto_set_netflow(struct ofproto *ofproto,
}
void
+ofproto_set_sflow(struct ofproto *ofproto,
+ const struct ofproto_sflow_options *oso)
+{
+ struct ofproto_sflow *os = ofproto->sflow;
+ if (oso) {
+ if (!os) {
+ struct ofport *ofport;
+ unsigned int odp_port;
+
+ os = ofproto->sflow = ofproto_sflow_create(ofproto->dpif);
+ refresh_port_groups(ofproto);
+ PORT_ARRAY_FOR_EACH (ofport, &ofproto->ports, odp_port) {
+ ofproto_sflow_add_port(os, odp_port,
+ netdev_get_name(ofport->netdev));
+ }
+ }
+ ofproto_sflow_set_options(os, oso);
+ } else {
+ ofproto_sflow_destroy(os);
+ ofproto->sflow = NULL;
+ }
+}
+
+void
ofproto_set_failure(struct ofproto *ofproto, bool fail_open)
{
if (fail_open) {
@@ -718,6 +743,7 @@ ofproto_destroy(struct ofproto *p)
pinsched_destroy(p->action_sched);
executer_destroy(p->executer);
netflow_destroy(p->netflow);
+ ofproto_sflow_destroy(p->sflow);
switch_status_unregister(p->ss_cat);
@@ -870,6 +896,9 @@ ofproto_run1(struct ofproto *p)
if (p->netflow) {
netflow_run(p->netflow);
}
+ if (p->sflow) {
+ ofproto_sflow_run(p->sflow);
+ }
return 0;
}
@@ -926,6 +955,9 @@ ofproto_wait(struct ofproto *p)
if (p->executer) {
executer_wait(p->executer);
}
+ if (p->sflow) {
+ ofproto_sflow_wait(p->sflow);
+ }
if (!tag_set_is_empty(&p->revalidate_set)) {
poll_immediate_wake();
}
@@ -1066,7 +1098,7 @@ reinit_ports(struct ofproto *p)
svec_destroy(&devnames);
}
-static void
+static size_t
refresh_port_group(struct ofproto *p, unsigned int group)
{
uint16_t *ports;
@@ -1085,13 +1117,18 @@ refresh_port_group(struct ofproto *p, unsigned int group)
}
dpif_port_group_set(p->dpif, group, ports, n_ports);
free(ports);
+
+ return n_ports;
}
static void
refresh_port_groups(struct ofproto *p)
{
- refresh_port_group(p, DP_GROUP_FLOOD);
- refresh_port_group(p, DP_GROUP_ALL);
+ size_t n_flood = refresh_port_group(p, DP_GROUP_FLOOD);
+ size_t n_all = refresh_port_group(p, DP_GROUP_ALL);
+ if (p->sflow) {
+ ofproto_sflow_set_group_sizes(p->sflow, n_flood, n_all);
+ }
}
static struct ofport *
@@ -1190,19 +1227,29 @@ send_port_status(struct ofproto *p, const struct ofport *ofport,
static void
ofport_install(struct ofproto *p, struct ofport *ofport)
{
+ uint16_t odp_port = ofp_port_to_odp_port(ofport->opp.port_no);
+ const char *netdev_name = (const char *) ofport->opp.name;
+
netdev_monitor_add(p->netdev_monitor, ofport->netdev);
- port_array_set(&p->ports, ofp_port_to_odp_port(ofport->opp.port_no),
- ofport);
- shash_add(&p->port_by_name, (char *) ofport->opp.name, ofport);
+ port_array_set(&p->ports, odp_port, ofport);
+ shash_add(&p->port_by_name, netdev_name, ofport);
+ if (p->sflow) {
+ ofproto_sflow_add_port(p->sflow, odp_port, netdev_name);
+ }
}
static void
ofport_remove(struct ofproto *p, struct ofport *ofport)
{
+ uint16_t odp_port = ofp_port_to_odp_port(ofport->opp.port_no);
+
netdev_monitor_remove(p->netdev_monitor, ofport->netdev);
- port_array_set(&p->ports, ofp_port_to_odp_port(ofport->opp.port_no), NULL);
+ port_array_set(&p->ports, odp_port, NULL);
shash_delete(&p->port_by_name,
shash_find(&p->port_by_name, (char *) ofport->opp.name));
+ if (p->sflow) {
+ ofproto_sflow_del_port(p->sflow, odp_port);
+ }
}
static void
@@ -2291,7 +2338,7 @@ update_port_config(struct ofproto *p, struct ofport *port,
#undef REVALIDATE_BITS
if (mask & OFPPC_NO_FLOOD) {
port->opp.config ^= OFPPC_NO_FLOOD;
- refresh_port_group(p, DP_GROUP_FLOOD);
+ refresh_port_groups(p);
}
if (mask & OFPPC_NO_PACKET_IN) {
port->opp.config ^= OFPPC_NO_PACKET_IN;
@@ -3108,7 +3155,7 @@ handle_openflow(struct ofconn *ofconn, struct ofproto *p,
}
static void
-handle_odp_msg(struct ofproto *p, struct ofpbuf *packet)
+handle_odp_miss_msg(struct ofproto *p, struct ofpbuf *packet)
{
struct odp_msg *msg = packet->data;
uint16_t in_port = odp_port_to_ofp_port(msg->port);
@@ -3116,14 +3163,6 @@ handle_odp_msg(struct ofproto *p, struct ofpbuf *packet)
struct ofpbuf payload;
flow_t flow;
- /* Handle controller actions. */
- if (msg->type == _ODPL_ACTION_NR) {
- COVERAGE_INC(ofproto_ctlr_action);
- pinsched_send(p->action_sched, in_port, packet,
- send_packet_in_action, p);
- return;
- }
-
payload.data = msg + 1;
payload.size = msg->length - sizeof *msg;
flow_extract(&payload, msg->port, &flow);
@@ -3193,6 +3232,36 @@ handle_odp_msg(struct ofproto *p, struct ofpbuf *packet)
ofpbuf_delete(packet);
}
}
+
+static void
+handle_odp_msg(struct ofproto *p, struct ofpbuf *packet)
+{
+ struct odp_msg *msg = packet->data;
+
+ switch (msg->type) {
+ case _ODPL_ACTION_NR:
+ COVERAGE_INC(ofproto_ctlr_action);
+ pinsched_send(p->action_sched, odp_port_to_ofp_port(msg->port), packet,
+ send_packet_in_action, p);
+ break;
+
+ case _ODPL_SFLOW_NR:
+ if (p->sflow) {
+ ofproto_sflow_received(p->sflow, msg);
+ }
+ ofpbuf_delete(packet);
+ break;
+
+ case _ODPL_MISS_NR:
+ handle_odp_miss_msg(p, packet);
+ break;
+
+ default:
+ VLOG_WARN_RL(&rl, "received ODP message of unexpected type %"PRIu32,
+ msg->type);
+ break;
+ }
+}
static void
revalidate_cb(struct cls_rule *sub_, void *cbdata_)
diff --git a/ofproto/ofproto.h b/ofproto/ofproto.h
index 50dd5d5b..6377e51e 100644
--- a/ofproto/ofproto.h
+++ b/ofproto/ofproto.h
@@ -29,6 +29,11 @@ struct ofhooks;
struct ofproto;
struct svec;
+enum {
+ DP_GROUP_FLOOD = 0,
+ DP_GROUP_ALL = 1
+};
+
struct ofexpired {
flow_t flow;
uint64_t packet_count; /* Packets from subrules. */
@@ -36,6 +41,16 @@ struct ofexpired {
long long int used; /* Last-used time (0 if never used). */
};
+struct ofproto_sflow_options {
+ struct svec targets;
+ uint32_t sampling_rate;
+ uint32_t polling_interval;
+ uint32_t header_len;
+ uint32_t sub_id;
+ char *agent_device;
+ char *control_ip;
+};
+
int ofproto_create(const char *datapath, const struct ofhooks *, void *aux,
struct ofproto **ofprotop);
void ofproto_destroy(struct ofproto *);
@@ -62,6 +77,7 @@ int ofproto_set_listeners(struct ofproto *, const struct svec *listeners);
int ofproto_set_snoops(struct ofproto *, const struct svec *snoops);
int ofproto_set_netflow(struct ofproto *,
const struct netflow_options *nf_options);
+void ofproto_set_sflow(struct ofproto *, const struct ofproto_sflow_options *);
void ofproto_set_failure(struct ofproto *, bool fail_open);
void ofproto_set_rate_limit(struct ofproto *, int rate_limit, int burst_limit);
int ofproto_set_stp(struct ofproto *, bool enable_stp);
diff --git a/utilities/automake.mk b/utilities/automake.mk
index 9ac12c92..1a9d4925 100644
--- a/utilities/automake.mk
+++ b/utilities/automake.mk
@@ -80,6 +80,7 @@ utilities_ovs_ofctl_LDADD = lib/libopenvswitch.a $(FAULT_LIBS) $(SSL_LIBS)
utilities_ovs_openflowd_SOURCES = utilities/ovs-openflowd.c
utilities_ovs_openflowd_LDADD = \
ofproto/libofproto.a \
+ lib/libsflow.a \
lib/libopenvswitch.a \
$(FAULT_LIBS) \
$(SSL_LIBS)
diff --git a/vswitchd/automake.mk b/vswitchd/automake.mk
index 8e27fc2f..d810c830 100644
--- a/vswitchd/automake.mk
+++ b/vswitchd/automake.mk
@@ -21,6 +21,7 @@ vswitchd_ovs_vswitchd_SOURCES = \
vswitchd/xenserver.h
vswitchd_ovs_vswitchd_LDADD = \
ofproto/libofproto.a \
+ lib/libsflow.a \
lib/libopenvswitch.a \
$(FAULT_LIBS) \
$(SSL_LIBS)
diff --git a/vswitchd/bridge.c b/vswitchd/bridge.c
index dbcf3125..3b7ec51f 100644
--- a/vswitchd/bridge.c
+++ b/vswitchd/bridge.c
@@ -61,6 +61,7 @@
#include "vconn-ssl.h"
#include "xenserver.h"
#include "xtoxll.h"
+#include "sflow_api.h"
#define THIS_MODULE VLM_bridge
#include "vlog.h"
@@ -210,6 +211,7 @@ static uint64_t bridge_pick_datapath_id(struct bridge *,
const uint8_t bridge_ea[ETH_ADDR_LEN],
struct iface *hw_addr_iface);
static struct iface *bridge_get_local_iface(struct bridge *);
+static const char *bridge_get_controller(const struct bridge *br);
static uint64_t dpid_from_hash(const void *, size_t nbytes);
static void bridge_unixctl_fdb_show(struct unixctl_conn *, const char *args);
@@ -527,6 +529,7 @@ bridge_reconfigure(void)
struct svec old_br, new_br;
struct bridge *br, *next;
size_t i;
+ int sflow_bridge_number;
COVERAGE_INC(bridge_reconfigure);
@@ -646,6 +649,7 @@ bridge_reconfigure(void)
svec_destroy(&want_ifaces);
svec_destroy(&add_ifaces);
}
+ sflow_bridge_number = 0;
LIST_FOR_EACH (br, struct bridge, node, &all_bridges) {
uint8_t ea[8];
uint64_t dpid;
@@ -716,6 +720,42 @@ bridge_reconfigure(void)
}
svec_destroy(&nf_options.collectors);
+ if (cfg_has("sflow.%s.host", br->name)) {
+ struct ofproto_sflow_options oso;
+
+ svec_init(&oso.targets);
+ cfg_get_all_keys(&oso.targets, "sflow.%s.host", br->name);
+
+ oso.sampling_rate = SFL_DEFAULT_SAMPLING_RATE;
+ if (cfg_has("sflow.%s.sampling", br->name)) {
+ oso.sampling_rate = cfg_get_int(0, "sflow.%s.sampling",
+ br->name);
+ }
+
+ oso.polling_interval = SFL_DEFAULT_POLLING_INTERVAL;
+ if (cfg_has("sflow.%s.polling", br->name)) {
+ oso.polling_interval = cfg_get_int(0, "sflow.%s.polling",
+ br->name);
+ }
+
+ oso.header_len = SFL_DEFAULT_HEADER_SIZE;
+ if (cfg_has("sflow.%s.header", br->name)) {
+ oso.header_len = cfg_get_int(0, "sflow.%s.header", br->name);
+ }
+
+ oso.sub_id = sflow_bridge_number++;
+ oso.agent_device = (char *) cfg_get_string(0, "sflow.%s.agent",
+ br->name);
+ oso.control_ip = (char *) cfg_get_string(0,
+ "bridge.%s.controller.ip",
+ br->name);
+ ofproto_set_sflow(br->ofproto, &oso);
+
+ svec_destroy(&oso.targets);
+ } else {
+ ofproto_set_sflow(br->ofproto, NULL);
+ }
+
/* Update the controller and related settings. It would be more
* straightforward to call this from bridge_reconfigure_one(), but we
* can't do it there for two reasons. First, and most importantly, at
diff --git a/vswitchd/ovs-vswitchd.8.in b/vswitchd/ovs-vswitchd.8.in
index 431c9488..fafc3fe8 100644
--- a/vswitchd/ovs-vswitchd.8.in
+++ b/vswitchd/ovs-vswitchd.8.in
@@ -48,6 +48,9 @@ Port mirroring, with optional VLAN tagging.
NetFlow v5 flow logging.
.
.IP \(bu
+sFlow(R) monitoring.
+.
+.IP \(bu
Connectivity to an external OpenFlow controller, such as NOX.
.
.PP
diff --git a/vswitchd/ovs-vswitchd.conf.5.in b/vswitchd/ovs-vswitchd.conf.5.in
index 01a1e4c6..5c4a092f 100644
--- a/vswitchd/ovs-vswitchd.conf.5.in
+++ b/vswitchd/ovs-vswitchd.conf.5.in
@@ -459,6 +459,38 @@ netflow.mybr.host=nflow.example.com:9995
.fi
.RE
+.SS "sFlow Monitoring"
+sFlow(R) is a protocol for monitoring switches. A bridge may be
+configured to send sFlow records to sFlow collectors by defining the
+key \fBsflow.\fIbridge\fB.host\fR for each collector in the form
+\fIip\fR[\fB:\fIport\fR]. Records from \fIbridge\fR will be sent to
+each \fIip\fR on UDP \fIport\fR. The \fIip\fR must be specified
+numerically, not as a DNS name. If \Iport\fR is omitted, port 6343 is
+used.
+.PP
+By default, 1 out of every 400 packets is sent to the configured sFlow
+collector. To override this, set \fBsflow.\fIbridge\fB.sampling\fR to
+the number of switched packets out of which one, on average, will be
+sent to the sFlow collector, e.g. a value of 1 sends every packet to
+the collector, a value of 2 sends 50% of the packets to the collector,
+and so on.
+.PP
+\fBovs\-vswitchd\fR also occasionally sends switch port statistics to
+sFlow collectors, by default every 30 seconds. To override this, set
+\fBsflow.\fIbridge\fB.polling\fR to a duration in seconds.
+.PP
+By default, \fBovs\-vswitchd\fR sends the first 128 bytes of sampled
+packets to sFlow collectors. To override this, set
+\fBsflow.\fIbridge\fB.header\fR to a size in bytes.
+.PP
+The sFlow module must be able to report an ``agent address'' to sFlow
+collectors, which should be an IP address for the Open vSwitch that is
+persistent and reachable over the network, if possible. If a
+local IP is configured as \fBbridge.\fIbridge\fB.controller.ip\fR,
+then that IP address is used by default. To override this default,
+set \fBsflow.\fIbridge\fB.agent\fR to the name of a network device, in
+which case the IP address set on that device is used. If no IP
+address can be determined either way, sFlow is disabled.
.SS "Remote Management"
A \fBovs\-vswitchd\fR instance may be remotely managed by a controller that
supports the OpenFlow Management Protocol, such as NOX. This
@@ -514,7 +546,7 @@ switch will perform all configured bridging and switching locally.
.TP
\fBdiscover\fR
Use controller discovery to find the local OpenFlow controller.
-Refer to \fB\ovs\-openflowd\fR(8) for information on how to configure a DHCP
+Refer to \fBovs\-openflowd\fR(8) for information on how to configure a DHCP
server to support controller discovery. The following additional
options control the discovery process:
.