diff options
author | Ben Pfaff <blp@nicira.com> | 2010-01-25 10:52:28 -0800 |
---|---|---|
committer | Ben Pfaff <blp@nicira.com> | 2010-01-25 10:52:28 -0800 |
commit | 49c36903d6d65bed96cba31f05534510a21a68d7 (patch) | |
tree | ffd8b53b6da72e0bc9aa7265eb296cd43669e57d | |
parent | b3080599f6b280c63b9b6f4ca2d3c6006bcd9590 (diff) | |
parent | 56fd8edf80b6098289f9ddd94a6a4be3be648472 (diff) |
Merge "sflow" into "master".
No conflicts, but lib/dpif.c needed a few changes since struct dpif's
member "class" was renamed to "dpif_class" in master since sflow was
branched off.
35 files changed, 3683 insertions, 50 deletions
@@ -22,3 +22,7 @@ Public License, version 2. Files under the xenserver directory are licensed on a file-by-file basis. Some files are under an uncertain license that may not be DFSG-compliant or GPL-compatible. Refer to each file for details. + +Files lib/sflow*.[ch] are licensed under the terms of the InMon sFlow +licence that is available at: + http://www.inmon.com/technology/sflowlicense.txt @@ -6,8 +6,8 @@ What is Open vSwitch? Open vSwitch is a multilayer software switch licensed under the open source Apache 2 license. Our goal is to implement a production quality switch platform that supports standard management interfaces -(e.g. NetFlow, RSPAN, ERSPAN, IOS-like CLI), and opens the forwarding -functions to programmatic extension and control. +(e.g. NetFlow, sFlow(R), RSPAN, ERSPAN, IOS-like CLI), and opens the +forwarding functions to programmatic extension and control. Open vSwitch is well suited to function as a virtual switch in VM environments. In addition to exposing standard control and visibility @@ -20,7 +20,8 @@ The bulk of the code is written in platform-independent C and is easily ported to other environments. The current release of Open vSwitch supports the following features: - * Visibility into inter-VM communication via NetFlow, SPAN, and RSPAN + * Visibility into inter-VM communication via NetFlow, sFlow, SPAN, + and RSPAN * Standard 802.1Q VLAN model with trunking * Per VM policing * NIC bonding with source-MAC load balancing diff --git a/acinclude.m4 b/acinclude.m4 index e37a316d..e074c7d4 100644 --- a/acinclude.m4 +++ b/acinclude.m4 @@ -235,4 +235,14 @@ dnl Example: OVS_ENABLE_OPTION([-Wdeclaration-after-statement]) AC_DEFUN([OVS_ENABLE_OPTION], [OVS_CHECK_CC_OPTION([$1], [WARNING_FLAGS="$WARNING_FLAGS $1"]) AC_SUBST([WARNING_FLAGS])]) + +dnl OVS_CONDITIONAL_CC_OPTION([OPTION], [CONDITIONAL]) +dnl Check whether the given C compiler OPTION is accepted. +dnl If so, enable the given Automake CONDITIONAL. + +dnl Example: OVS_CONDITIONAL_CC_OPTION([-Wno-unused], [HAVE_WNO_UNUSED]) +AC_DEFUN([OVS_CONDITIONAL_CC_OPTION], + [OVS_CHECK_CC_OPTION( + [$1], [ovs_have_cc_option=yes], [ovs_have_cc_option=no]) + AM_CONDITIONAL([$2], [test $ovs_have_cc_option = yes])]) dnl ---------------------------------------------------------------------- diff --git a/configure.ac b/configure.ac index 1e56c462..5b0b3c65 100644 --- a/configure.ac +++ b/configure.ac @@ -75,6 +75,7 @@ OVS_ENABLE_OPTION([-Wold-style-definition]) OVS_ENABLE_OPTION([-Wmissing-prototypes]) OVS_ENABLE_OPTION([-Wmissing-field-initializers]) OVS_ENABLE_OPTION([-Wno-override-init]) +OVS_CONDITIONAL_CC_OPTION([-Wno-unused], [HAVE_WNO_UNUSED]) AC_ARG_VAR(KARCH, [Kernel Architecture String]) AC_SUBST(KARCH) diff --git a/datapath/actions.c b/datapath/actions.c index cadab05f..8b32de47 100644 --- a/datapath/actions.c +++ b/datapath/actions.c @@ -1,6 +1,6 @@ /* * Distributed under the terms of the GNU GPL version 2. - * Copyright (c) 2007, 2008, 2009 Nicira Networks. + * Copyright (c) 2007, 2008, 2009, 2010 Nicira Networks. * * Significant portions of this file may be copied from parts of the Linux * kernel, by Linus Torvalds and others. @@ -366,6 +366,28 @@ output_control(struct datapath *dp, struct sk_buff *skb, u32 arg, gfp_t gfp) return dp_output_control(dp, skb, _ODPL_ACTION_NR, arg); } +/* Send a copy of this packet up to the sFlow agent, along with extra + * information about what happened to it. */ +static void sflow_sample(struct datapath *dp, struct sk_buff *skb, + const union odp_action *a, int n_actions, + gfp_t gfp, struct net_bridge_port *nbp) +{ + struct odp_sflow_sample_header *hdr; + unsigned int actlen = n_actions * sizeof(union odp_action); + unsigned int hdrlen = sizeof(struct odp_sflow_sample_header); + struct sk_buff *nskb; + + nskb = skb_copy_expand(skb, actlen + hdrlen, 0, gfp); + if (!nskb) + return; + + memcpy(__skb_push(nskb, actlen), a, actlen); + hdr = (struct odp_sflow_sample_header*)__skb_push(nskb, hdrlen); + hdr->n_actions = n_actions; + hdr->sample_pool = atomic_read(&nbp->sflow_pool); + dp_output_control(dp, nskb, _ODPL_SFLOW_NR, 0); +} + /* Execute a list of actions against 'skb'. */ int execute_actions(struct datapath *dp, struct sk_buff *skb, struct odp_flow_key *key, @@ -378,6 +400,17 @@ int execute_actions(struct datapath *dp, struct sk_buff *skb, * is slightly obscure just to avoid that. */ int prev_port = -1; int err; + + if (dp->sflow_probability) { + struct net_bridge_port *p = skb->dev->br_port; + if (p) { + atomic_inc(&p->sflow_pool); + if (dp->sflow_probability == UINT_MAX || + net_random() < dp->sflow_probability) + sflow_sample(dp, skb, a, n_actions, gfp, p); + } + } + for (; n_actions > 0; a++, n_actions--) { WARN_ON_ONCE(skb_shared(skb)); if (prev_port != -1) { diff --git a/datapath/datapath.c b/datapath/datapath.c index 12798958..ba363fb7 100644 --- a/datapath/datapath.c +++ b/datapath/datapath.c @@ -1,5 +1,5 @@ /* - * Copyright (c) 2007, 2008, 2009 Nicira Networks. + * Copyright (c) 2007, 2008, 2009, 2010 Nicira Networks. * Distributed under the terms of the GNU GPL version 2. * * Significant portions of this file may be copied from parts of the Linux @@ -349,6 +349,7 @@ static int new_nbp(struct datapath *dp, struct net_device *dev, int port_no) p->port_no = port_no; p->dp = dp; p->dev = dev; + atomic_set(&p->sflow_pool, 0); if (!is_dp_dev(dev)) rcu_assign_pointer(dev->br_port, p); else { @@ -713,8 +714,7 @@ dp_output_control(struct datapath *dp, struct sk_buff *skb, int queue_no, int err; WARN_ON_ONCE(skb_shared(skb)); - BUG_ON(queue_no != _ODPL_MISS_NR && queue_no != _ODPL_ACTION_NR); - + BUG_ON(queue_no != _ODPL_MISS_NR && queue_no != _ODPL_ACTION_NR && queue_no != _ODPL_SFLOW_NR); queue = &dp->queues[queue_no]; err = -ENOBUFS; if (skb_queue_len(queue) >= DP_MAX_QUEUE_LEN) @@ -1391,6 +1391,7 @@ static long openvswitch_ioctl(struct file *f, unsigned int cmd, int dp_idx = iminor(f->f_dentry->d_inode); struct datapath *dp; int drop_frags, listeners, port_no; + unsigned int sflow_probability; int err; /* Handle commands with special locking requirements up front. */ @@ -1454,6 +1455,16 @@ static long openvswitch_ioctl(struct file *f, unsigned int cmd, set_listen_mask(f, listeners); break; + case ODP_GET_SFLOW_PROBABILITY: + err = put_user(dp->sflow_probability, (unsigned int __user *)argp); + break; + + case ODP_SET_SFLOW_PROBABILITY: + err = get_user(sflow_probability, (unsigned int __user *)argp); + if (!err) + dp->sflow_probability = sflow_probability; + break; + case ODP_PORT_QUERY: err = query_port(dp, (struct odp_port __user *)argp); break; diff --git a/datapath/datapath.h b/datapath/datapath.h index 643c91ac..3b5a67b1 100644 --- a/datapath/datapath.h +++ b/datapath/datapath.h @@ -1,5 +1,5 @@ /* - * Copyright (c) 2009 Nicira Networks. + * Copyright (c) 2009, 2010 Nicira Networks. * Distributed under the terms of the GNU GPL version 2. * * Significant portions of this file may be copied from parts of the Linux @@ -79,9 +79,22 @@ struct dp_bucket { struct sw_flow *flows[]; }; -#define DP_N_QUEUES 2 +#define DP_N_QUEUES 3 #define DP_MAX_QUEUE_LEN 100 +/** + * struct dp_stats_percpu - per-cpu packet processing statistics for a given + * datapath. + * @n_frags: Number of IP fragments processed by datapath. + * @n_hit: Number of received packets for which a matching flow was found in + * the flow table. + * @n_miss: Number of received packets that had no matching flow in the flow + * table. The sum of @n_hit and @n_miss is the number of packets that have + * been received by the datapath. + * @n_lost: Number of received packets that had no matching flow in the flow + * table that could not be sent to userspace (normally due to an overflow in + * one of the datapath's queues). + */ struct dp_stats_percpu { u64 n_frags; u64 n_hit; @@ -95,10 +108,29 @@ struct dp_port_group { u16 ports[]; }; +/** + * struct datapath - datapath for flow-based packet switching + * @mutex: Mutual exclusion for ioctls. + * @dp_idx: Datapath number (index into the dps[] array in datapath.c). + * @ifobj: Represents /sys/class/net/<devname>/brif. + * @drop_frags: Drop all IP fragments if nonzero. + * @queues: %DP_N_QUEUES sets of queued packets for userspace to handle. + * @waitqueue: Waitqueue, for waiting for new packets in @queues. + * @n_flows: Number of flows currently in flow table. + * @table: Current flow table (RCU protected). + * @groups: Port groups, used by ODPAT_OUTPUT_GROUP action (RCU protected). + * @n_ports: Number of ports currently in @ports. + * @ports: Map from port number to &struct net_bridge_port. %ODPP_LOCAL port + * always exists, other ports may be %NULL. + * @port_list: List of all ports in @ports in arbitrary order. + * @stats_percpu: Per-CPU datapath statistics. + * @sflow_probability: Number of packets out of UINT_MAX to sample to the + * %ODPL_SFLOW queue, e.g. (@sflow_probability/UINT_MAX) is the probability of + * sampling a given packet. + */ struct datapath { struct mutex mutex; int dp_idx; - struct kobject ifobj; int drop_frags; @@ -117,19 +149,37 @@ struct datapath { /* Switch ports. */ unsigned int n_ports; struct net_bridge_port *ports[DP_MAX_PORTS]; - struct list_head port_list; /* All ports, including local_port. */ + struct list_head port_list; /* Stats. */ struct dp_stats_percpu *stats_percpu; + + /* sFlow Sampling */ + unsigned int sflow_probability; }; +/** + * struct net_bridge_port - one port within a datapath + * @port_no: Index into @dp's @ports array. + * @dp: Datapath to which this port belongs. + * @dev: The network device attached to this port. The @br_port member in @dev + * points back to this &struct net_bridge_port. + * @kobj: Represents /sys/class/net/<devname>/brport. + * @linkname: The name of the link from /sys/class/net/<datapath>/brif to this + * &struct net_bridge_port. (We keep this around so that we can delete it + * if @dev gets renamed.) Set to the null string when no link exists. + * @node: Element in @dp's @port_list. + * @sflow_pool: Number of packets that were candidates for sFlow sampling, + * regardless of whether they were actually chosen and sent down to userspace. + */ struct net_bridge_port { u16 port_no; struct datapath *dp; struct net_device *dev; struct kobject kobj; char linkname[IFNAMSIZ]; - struct list_head node; /* Element in datapath.ports. */ + struct list_head node; + atomic_t sflow_pool; }; extern struct notifier_block dp_device_notifier; diff --git a/include/openvswitch/datapath-protocol.h b/include/openvswitch/datapath-protocol.h index ab7eb9e3..b079f529 100644 --- a/include/openvswitch/datapath-protocol.h +++ b/include/openvswitch/datapath-protocol.h @@ -1,5 +1,5 @@ /* - * Copyright (c) 2009 Nicira Networks. + * Copyright (c) 2009, 2010 Nicira Networks. * * This file is offered under your choice of two licenses: Apache 2.0 or GNU * GPL 2.0 or later. The permission statements for each of these licenses is @@ -77,6 +77,9 @@ #define ODP_EXECUTE _IOR('O', 18, struct odp_execute) +#define ODP_SET_SFLOW_PROBABILITY _IOR('O', 19, int) +#define ODP_GET_SFLOW_PROBABILITY _IOW('O', 20, int) + struct odp_stats { /* Flows. */ __u32 n_flows; /* Number of flows in flow table. */ @@ -98,6 +101,7 @@ struct odp_stats { /* Queues. */ __u16 max_miss_queue; /* Max length of ODPL_MISS queue. */ __u16 max_action_queue; /* Max length of ODPL_ACTION queue. */ + __u16 max_sflow_queue; /* Max length of ODPL_SFLOW queue. */ }; /* Logical ports. */ @@ -109,16 +113,51 @@ struct odp_stats { #define ODPL_MISS (1 << _ODPL_MISS_NR) #define _ODPL_ACTION_NR 1 /* Packet output to ODPP_CONTROLLER. */ #define ODPL_ACTION (1 << _ODPL_ACTION_NR) -#define ODPL_ALL (ODPL_MISS | ODPL_ACTION) - -/* Format of messages read from datapath fd. */ +#define _ODPL_SFLOW_NR 2 /* sFlow samples. */ +#define ODPL_SFLOW (1 << _ODPL_SFLOW_NR) +#define ODPL_ALL (ODPL_MISS | ODPL_ACTION | ODPL_SFLOW) + +/** + * struct odp_msg - format of messages read from datapath fd. + * @type: One of the %_ODPL_* constants. + * @length: Total length of message, including this header. + * @port: Port that received the packet embedded in this message. + * @reserved: Not currently used. Should be set to 0. + * @arg: Argument value whose meaning depends on @type. + * + * For @type == %_ODPL_MISS_NR, the header is followed by packet data. The + * @arg member is unused and set to 0. + * + * For @type == %_ODPL_ACTION_NR, the header is followed by packet data. The + * @arg member is copied from the &struct odp_action_controller that caused + * the &struct odp_msg to be composed. + * + * For @type == %_ODPL_SFLOW_NR, the header is followed by &struct + * odp_sflow_sample_header, then by an array of &union odp_action (the number + * of which is specified in &struct odp_sflow_sample_header), then by packet + * data. + */ struct odp_msg { - __u32 type; /* _ODPL_MISS_NR or _ODPL_ACTION_NR. */ - __u32 length; /* Message length, including header. */ - __u16 port; /* Port on which frame was received. */ + __u32 type; + __u32 length; + __u16 port; __u16 reserved; - __u32 arg; /* Argument value specified in action. */ - /* Followed by packet data. */ + __u32 arg; +}; + +/** + * struct odp_sflow_sample_header - header added to sFlow sampled packet. + * @sample_pool: Number of packets that were candidates for sFlow sampling, + * regardless of whether they were actually chosen and sent down to userspace. + * @n_actions: Number of "union odp_action"s immediately following this header. + * + * This header follows &struct odp_msg when that structure's @type is + * %_ODPL_SFLOW_NR, and it is itself followed by an array of &union odp_action + * (the number of which is specified in @n_actions) and then by packet data. + */ +struct odp_sflow_sample_header { + __u32 sample_pool; + __u32 n_actions; }; #define ODP_PORT_INTERNAL (1 << 0) /* This port is simulated. */ diff --git a/lib/automake.mk b/lib/automake.mk index 5265eb77..67100fc1 100644 --- a/lib/automake.mk +++ b/lib/automake.mk @@ -126,6 +126,19 @@ nodist_lib_libopenvswitch_a_SOURCES = \ lib/dirs.c CLEANFILES += $(nodist_lib_libopenvswitch_a_SOURCES) +noinst_LIBRARIES += lib/libsflow.a +lib_libsflow_a_SOURCES = \ + lib/sflow_api.h \ + lib/sflow.h \ + lib/sflow_agent.c \ + lib/sflow_sampler.c \ + lib/sflow_poller.c \ + lib/sflow_receiver.c +lib_libsflow_a_CFLAGS = $(AM_CFLAGS) +if HAVE_WNO_UNUSED +lib_libsflow_a_CFLAGS += -Wno-unused +endif + if HAVE_NETLINK lib_libopenvswitch_a_SOURCES += \ lib/netlink-protocol.h \ diff --git a/lib/dpif-linux.c b/lib/dpif-linux.c index 2bf329f4..e7d43811 100644 --- a/lib/dpif-linux.c +++ b/lib/dpif-linux.c @@ -1,5 +1,5 @@ /* - * Copyright (c) 2008, 2009 Nicira Networks. + * Copyright (c) 2008, 2009, 2010 Nicira Networks. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -196,6 +196,7 @@ dpif_linux_delete(struct dpif *dpif_) static int dpif_linux_get_stats(const struct dpif *dpif_, struct odp_stats *stats) { + memset(stats, 0, sizeof *stats); return do_ioctl(dpif_, ODP_DP_STATS, stats); } @@ -396,6 +397,19 @@ dpif_linux_recv_set_mask(struct dpif *dpif_, int listen_mask) } static int +dpif_linux_get_sflow_probability(const struct dpif *dpif_, + uint32_t *probability) +{ + return do_ioctl(dpif_, ODP_GET_SFLOW_PROBABILITY, probability); +} + +static int +dpif_linux_set_sflow_probability(struct dpif *dpif_, uint32_t probability) +{ + return do_ioctl(dpif_, ODP_SET_SFLOW_PROBABILITY, &probability); +} + +static int dpif_linux_recv(struct dpif *dpif_, struct ofpbuf **bufp) { struct dpif_linux *dpif = dpif_linux_cast(dpif_); @@ -475,6 +489,8 @@ const struct dpif_class dpif_linux_class = { dpif_linux_execute, dpif_linux_recv_get_mask, dpif_linux_recv_set_mask, + dpif_linux_get_sflow_probability, + dpif_linux_set_sflow_probability, dpif_linux_recv, dpif_linux_recv_wait, }; @@ -555,13 +571,14 @@ make_openvswitch_device(int minor, char **fnp) struct stat s; char fn[128]; + *fnp = NULL; + major = get_openvswitch_major(); if (major < 0) { return -major; } dev = makedev(major, minor); - *fnp = NULL; sprintf(fn, "%s/dp%d", dirname, minor); if (!stat(fn, &s)) { if (!S_ISCHR(s.st_mode)) { diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c index c4b5a994..720e8cb0 100644 --- a/lib/dpif-netdev.c +++ b/lib/dpif-netdev.c @@ -1333,6 +1333,8 @@ const struct dpif_class dpif_netdev_class = { dpif_netdev_execute, dpif_netdev_recv_get_mask, dpif_netdev_recv_set_mask, + NULL, /* get_sflow_probability */ + NULL, /* set_sflow_probability */ dpif_netdev_recv, dpif_netdev_recv_wait, }; diff --git a/lib/dpif-provider.h b/lib/dpif-provider.h index 1d41eafd..39c66e11 100644 --- a/lib/dpif-provider.h +++ b/lib/dpif-provider.h @@ -1,5 +1,5 @@ /* - * Copyright (c) 2009 Nicira Networks. + * Copyright (c) 2009, 2010 Nicira Networks. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -282,6 +282,25 @@ struct dpif_class { * corresponding type when it calls the recv member function. */ int (*recv_set_mask)(struct dpif *dpif, int listen_mask); + /* Retrieves 'dpif''s sFlow sampling probability into '*probability'. + * Return value is 0 or a positive errno value. EOPNOTSUPP indicates that + * the datapath does not support sFlow, as does a null pointer. + * + * '*probability' is expressed as the number of packets out of UINT_MAX to + * sample, e.g. probability/UINT_MAX is the probability of sampling a given + * packet. */ + int (*get_sflow_probability)(const struct dpif *dpif, + uint32_t *probability); + + /* Sets 'dpif''s sFlow sampling probability to 'probability'. Return value + * is 0 or a positive errno value. EOPNOTSUPP indicates that the datapath + * does not support sFlow, as does a null pointer. + * + * 'probability' is expressed as the number of packets out of UINT_MAX to + * sample, e.g. probability/UINT_MAX is the probability of sampling a given + * packet. */ + int (*set_sflow_probability)(struct dpif *dpif, uint32_t probability); + /* Attempts to receive a message from 'dpif'. If successful, stores the * message into '*packetp'. The message, if one is received, must begin * with 'struct odp_msg' as a header. Only messages of the types selected @@ -1,5 +1,5 @@ /* - * Copyright (c) 2008, 2009 Nicira Networks. + * Copyright (c) 2008, 2009, 2010 Nicira Networks. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -845,6 +845,41 @@ dpif_recv_set_mask(struct dpif *dpif, int listen_mask) return error; } +/* Retrieve the sFlow sampling probability. '*probability' is expressed as the + * number of packets out of UINT_MAX to sample, e.g. probability/UINT_MAX is + * the probability of sampling a given packet. + * + * Returns 0 if successful, otherwise a positive errno value. EOPNOTSUPP + * indicates that 'dpif' does not support sFlow sampling. */ +int +dpif_get_sflow_probability(const struct dpif *dpif, uint32_t *probability) +{ + int error = (dpif->dpif_class->get_sflow_probability + ? dpif->dpif_class->get_sflow_probability(dpif, probability) + : EOPNOTSUPP); + if (error) { + *probability = 0; + } + log_operation(dpif, "get_sflow_probability", error); + return error; +} + +/* Set the sFlow sampling probability. 'probability' is expressed as the + * number of packets out of UINT_MAX to sample, e.g. probability/UINT_MAX is + * the probability of sampling a given packet. + * + * Returns 0 if successful, otherwise a positive errno value. EOPNOTSUPP + * indicates that 'dpif' does not support sFlow sampling. */ +int +dpif_set_sflow_probability(struct dpif *dpif, uint32_t probability) +{ + int error = (dpif->dpif_class->set_sflow_probability + ? dpif->dpif_class->set_sflow_probability(dpif, probability) + : EOPNOTSUPP); + log_operation(dpif, "set_sflow_probability", error); + return error; +} + /* Attempts to receive a message from 'dpif'. If successful, stores the * message into '*packetp'. The message, if one is received, will begin with * 'struct odp_msg' as a header. Only messages of the types selected with @@ -868,6 +903,7 @@ dpif_recv(struct dpif *dpif, struct ofpbuf **packetp) "%zu on port %"PRIu16": %s", dpif_name(dpif), (msg->type == _ODPL_MISS_NR ? "miss" : msg->type == _ODPL_ACTION_NR ? "action" + : msg->type == _ODPL_SFLOW_NR ? "sFlow" : "<unknown>"), payload_len, msg->port, s); free(s); @@ -894,7 +930,7 @@ dpif_recv_purge(struct dpif *dpif) return error; } - for (i = 0; i < stats.max_miss_queue + stats.max_action_queue; i++) { + for (i = 0; i < stats.max_miss_queue + stats.max_action_queue + stats.max_sflow_queue; i++) { struct ofpbuf *buf; error = dpif_recv(dpif, &buf); if (error) { @@ -84,6 +84,8 @@ int dpif_execute(struct dpif *, uint16_t in_port, int dpif_recv_get_mask(const struct dpif *, int *listen_mask); int dpif_recv_set_mask(struct dpif *, int listen_mask); +int dpif_get_sflow_probability(const struct dpif *, uint32_t *probability); +int dpif_set_sflow_probability(struct dpif *, uint32_t probability); int dpif_recv(struct dpif *, struct ofpbuf **); int dpif_recv_purge(struct dpif *); void dpif_recv_wait(struct dpif *); diff --git a/lib/netdev.c b/lib/netdev.c index f5089c1d..804050fc 100644 --- a/lib/netdev.c +++ b/lib/netdev.c @@ -31,6 +31,7 @@ #include "list.h" #include "netdev-provider.h" #include "ofpbuf.h" +#include "openflow/openflow.h" #include "packets.h" #include "poll-loop.h" #include "shash.h" @@ -525,6 +526,35 @@ netdev_get_features(struct netdev *netdev, return error; } +/* Returns the maximum speed of a network connection that has the "enum + * ofp_port_features" bits in 'features', in bits per second. If no bits that + * indicate a speed are set in 'features', assumes 100Mbps. */ +uint64_t +netdev_features_to_bps(uint32_t features) +{ + enum { + F_10000MB = OFPPF_10GB_FD, + F_1000MB = OFPPF_1GB_HD | OFPPF_1GB_FD, + F_100MB = OFPPF_100MB_HD | OFPPF_100MB_FD, + F_10MB = OFPPF_10MB_HD | OFPPF_10MB_FD + }; + + return ( features & F_10000MB ? UINT64_C(10000000000) + : features & F_1000MB ? UINT64_C(1000000000) + : features & F_100MB ? UINT64_C(100000000) + : features & F_10MB ? UINT64_C(10000000) + : UINT64_C(100000000)); +} + +/* Returns true if any of the "enum ofp_port_features" bits that indicate a + * full-duplex link are set in 'features', otherwise false. */ +bool +netdev_features_is_full_duplex(uint32_t features) +{ + return (features & (OFPPF_10MB_FD | OFPPF_100MB_FD | OFPPF_1GB_FD + | OFPPF_10GB_FD)) != 0; +} + /* Set the features advertised by 'netdev' to 'advertise'. Returns 0 if * successful, otherwise a positive errno value. */ int diff --git a/lib/netdev.h b/lib/netdev.h index b8c7dfb4..8060ddb9 100644 --- a/lib/netdev.h +++ b/lib/netdev.h @@ -112,6 +112,8 @@ int netdev_get_carrier(const struct netdev *, bool *carrier); int netdev_get_features(struct netdev *, uint32_t *current, uint32_t *advertised, uint32_t *supported, uint32_t *peer); +uint64_t netdev_features_to_bps(uint32_t features); +bool netdev_features_is_full_duplex(uint32_t features); int netdev_set_advertisements(struct netdev *, uint32_t advertise); int netdev_get_in4(const struct netdev *, struct in_addr *address, diff --git a/lib/sflow.h b/lib/sflow.h new file mode 100644 index 00000000..397ae2da --- /dev/null +++ b/lib/sflow.h @@ -0,0 +1,548 @@ +/* Copyright (c) 2002-2009 InMon Corp. Licensed under the terms of the InMon sFlow licence: */ +/* http://www.inmon.com/technology/sflowlicense.txt */ + +#ifndef SFLOW_H +#define SFLOW_H 1 + +enum SFLAddress_type { + SFLADDRESSTYPE_IP_V4 = 1, + SFLADDRESSTYPE_IP_V6 = 2 +}; + +typedef struct { + u_int32_t addr; +} SFLIPv4; + +typedef struct { + u_char addr[16]; +} SFLIPv6; + +typedef union _SFLAddress_value { + SFLIPv4 ip_v4; + SFLIPv6 ip_v6; +} SFLAddress_value; + +typedef struct _SFLAddress { + u_int32_t type; /* enum SFLAddress_type */ + SFLAddress_value address; +} SFLAddress; + +/* Packet header data */ + +#define SFL_DEFAULT_HEADER_SIZE 128 +#define SFL_DEFAULT_COLLECTOR_PORT 6343 +#define SFL_DEFAULT_SAMPLING_RATE 400 +#define SFL_DEFAULT_POLLING_INTERVAL 30 + +/* The header protocol describes the format of the sampled header */ +enum SFLHeader_protocol { + SFLHEADER_ETHERNET_ISO8023 = 1, + SFLHEADER_ISO88024_TOKENBUS = 2, + SFLHEADER_ISO88025_TOKENRING = 3, + SFLHEADER_FDDI = 4, + SFLHEADER_FRAME_RELAY = 5, + SFLHEADER_X25 = 6, + SFLHEADER_PPP = 7, + SFLHEADER_SMDS = 8, + SFLHEADER_AAL5 = 9, + SFLHEADER_AAL5_IP = 10, /* e.g. Cisco AAL5 mux */ + SFLHEADER_IPv4 = 11, + SFLHEADER_IPv6 = 12, + SFLHEADER_MPLS = 13 +}; + +/* raw sampled header */ + +typedef struct _SFLSampled_header { + u_int32_t header_protocol; /* (enum SFLHeader_protocol) */ + u_int32_t frame_length; /* Original length of packet before sampling */ + u_int32_t stripped; /* header/trailer bytes stripped by sender */ + u_int32_t header_length; /* length of sampled header bytes to follow */ + u_int8_t *header_bytes; /* Header bytes */ +} SFLSampled_header; + +/* decoded ethernet header */ + +typedef struct _SFLSampled_ethernet { + u_int32_t eth_len; /* The length of the MAC packet excluding + lower layer encapsulations */ + u_int8_t src_mac[8]; /* 6 bytes + 2 pad */ + u_int8_t dst_mac[8]; + u_int32_t eth_type; +} SFLSampled_ethernet; + +/* decoded IP version 4 header */ + +typedef struct _SFLSampled_ipv4 { + u_int32_t length; /* The length of the IP packet + excluding lower layer encapsulations */ + u_int32_t protocol; /* IP Protocol type (for example, TCP = 6, UDP = 17) */ + SFLIPv4 src_ip; /* Source IP Address */ + SFLIPv4 dst_ip; /* Destination IP Address */ + u_int32_t src_port; /* TCP/UDP source port number or equivalent */ + u_int32_t dst_port; /* TCP/UDP destination port number or equivalent */ + u_int32_t tcp_flags; /* TCP flags */ + u_int32_t tos; /* IP type of service */ +} SFLSampled_ipv4; + +/* decoded IP version 6 data */ + +typedef struct _SFLSampled_ipv6 { + u_int32_t length; /* The length of the IP packet + excluding lower layer encapsulations */ + u_int32_t protocol; /* IP Protocol type (for example, TCP = 6, UDP = 17) */ + SFLIPv6 src_ip; /* Source IP Address */ + SFLIPv6 dst_ip; /* Destination IP Address */ + u_int32_t src_port; /* TCP/UDP source port number or equivalent */ + u_int32_t dst_port; /* TCP/UDP destination port number or equivalent */ + u_int32_t tcp_flags; /* TCP flags */ + u_int32_t priority; /* IP priority */ +} SFLSampled_ipv6; + +/* Extended data types */ + +/* Extended switch data */ + +typedef struct _SFLExtended_switch { + u_int32_t src_vlan; /* The 802.1Q VLAN id of incomming frame */ + u_int32_t src_priority; /* The 802.1p priority */ + u_int32_t dst_vlan; /* The 802.1Q VLAN id of outgoing frame */ + u_int32_t dst_priority; /* The 802.1p priority */ +} SFLExtended_switch; + +/* Extended router data */ + +typedef struct _SFLExtended_router { + SFLAddress nexthop; /* IP address of next hop router */ + u_int32_t src_mask; /* Source address prefix mask bits */ + u_int32_t dst_mask; /* Destination address prefix mask bits */ +} SFLExtended_router; + +/* Extended gateway data */ +enum SFLExtended_as_path_segment_type { + SFLEXTENDED_AS_SET = 1, /* Unordered set of ASs */ + SFLEXTENDED_AS_SEQUENCE = 2 /* Ordered sequence of ASs */ +}; + +typedef struct _SFLExtended_as_path_segment { + u_int32_t type; /* enum SFLExtended_as_path_segment_type */ + u_int32_t length; /* number of AS numbers in set/sequence */ + union { + u_int32_t *set; + u_int32_t *seq; + } as; +} SFLExtended_as_path_segment; + +typedef struct _SFLExtended_gateway { + SFLAddress nexthop; /* Address of the border router that should + be used for the destination network */ + u_int32_t as; /* AS number for this gateway */ + u_int32_t src_as; /* AS number of source (origin) */ + u_int32_t src_peer_as; /* AS number of source peer */ + u_int32_t dst_as_path_segments; /* number of segments in path */ + SFLExtended_as_path_segment *dst_as_path; /* list of seqs or sets */ + u_int32_t communities_length; /* number of communities */ + u_int32_t *communities; /* set of communities */ + u_int32_t localpref; /* LocalPref associated with this route */ +} SFLExtended_gateway; + +typedef struct _SFLString { + u_int32_t len; + char *str; +} SFLString; + +/* Extended user data */ + +typedef struct _SFLExtended_user { + u_int32_t src_charset; /* MIBEnum value of character set used to encode a string - See RFC 2978 + Where possible UTF-8 encoding (MIBEnum=106) should be used. A value + of zero indicates an unknown encoding. */ + SFLString src_user; + u_int32_t dst_charset; + SFLString dst_user; +} SFLExtended_user; + +/* Extended URL data */ + +enum SFLExtended_url_direction { + SFLEXTENDED_URL_SRC = 1, /* URL is associated with source address */ + SFLEXTENDED_URL_DST = 2 /* URL is associated with destination address */ +}; + +typedef struct _SFLExtended_url { + u_int32_t direction; /* enum SFLExtended_url_direction */ + SFLString url; /* URL associated with the packet flow. + Must be URL encoded */ + SFLString host; /* The host field from the HTTP header */ +} SFLExtended_url; + +/* Extended MPLS data */ + +typedef struct _SFLLabelStack { + u_int32_t depth; + u_int32_t *stack; /* first entry is top of stack - see RFC 3032 for encoding */ +} SFLLabelStack; + +typedef struct _SFLExtended_mpls { + SFLAddress nextHop; /* Address of the next hop */ + SFLLabelStack in_stack; + SFLLabelStack out_stack; +} SFLExtended_mpls; + +/* Extended NAT data + Packet header records report addresses as seen at the sFlowDataSource. + The extended_nat structure reports on translated source and/or destination + addesses for this packet. If an address was not translated it should + be equal to that reported for the header. */ + +typedef struct _SFLExtended_nat { + SFLAddress src; /* Source address */ + SFLAddress dst; /* Destination address */ +} SFLExtended_nat; + +/* additional Extended MPLS stucts */ + +typedef struct _SFLExtended_mpls_tunnel { + SFLString tunnel_lsp_name; /* Tunnel name */ + u_int32_t tunnel_id; /* Tunnel ID */ + u_int32_t tunnel_cos; /* Tunnel COS value */ +} SFLExtended_mpls_tunnel; + +typedef struct _SFLExtended_mpls_vc { + SFLString vc_instance_name; /* VC instance name */ + u_int32_t vll_vc_id; /* VLL/VC instance ID */ + u_int32_t vc_label_cos; /* VC Label COS value */ +} SFLExtended_mpls_vc; + +/* Extended MPLS FEC + - Definitions from MPLS-FTN-STD-MIB mplsFTNTable */ + +typedef struct _SFLExtended_mpls_FTN { + SFLString mplsFTNDescr; + u_int32_t mplsFTNMask; +} SFLExtended_mpls_FTN; + +/* Extended MPLS LVP FEC + - Definition from MPLS-LDP-STD-MIB mplsFecTable + Note: mplsFecAddrType, mplsFecAddr information available + from packet header */ + +typedef struct _SFLExtended_mpls_LDP_FEC { + u_int32_t mplsFecAddrPrefixLength; +} SFLExtended_mpls_LDP_FEC; + +/* Extended VLAN tunnel information + Record outer VLAN encapsulations that have + been stripped. extended_vlantunnel information + should only be reported if all the following conditions are satisfied: + 1. The packet has nested vlan tags, AND + 2. The reporting device is VLAN aware, AND + 3. One or more VLAN tags have been stripped, either + because they represent proprietary encapsulations, or + because switch hardware automatically strips the outer VLAN + encapsulation. + Reporting extended_vlantunnel information is not a substitute for + reporting extended_switch information. extended_switch data must + always be reported to describe the ingress/egress VLAN information + for the packet. The extended_vlantunnel information only applies to + nested VLAN tags, and then only when one or more tags has been + stripped. */ + +typedef SFLLabelStack SFLVlanStack; +typedef struct _SFLExtended_vlan_tunnel { + SFLVlanStack stack; /* List of stripped 802.1Q TPID/TCI layers. Each + TPID,TCI pair is represented as a single 32 bit + integer. Layers listed from outermost to + innermost. */ +} SFLExtended_vlan_tunnel; + +enum SFLFlow_type_tag { + /* enterprise = 0, format = ... */ + SFLFLOW_HEADER = 1, /* Packet headers are sampled */ + SFLFLOW_ETHERNET = 2, /* MAC layer information */ + SFLFLOW_IPV4 = 3, /* IP version 4 data */ + SFLFLOW_IPV6 = 4, /* IP version 6 data */ + SFLFLOW_EX_SWITCH = 1001, /* Extended switch information */ + SFLFLOW_EX_ROUTER = 1002, /* Extended router information */ + SFLFLOW_EX_GATEWAY = 1003, /* Extended gateway router information */ + SFLFLOW_EX_USER = 1004, /* Extended TACAS/RADIUS user information */ + SFLFLOW_EX_URL = 1005, /* Extended URL information */ + SFLFLOW_EX_MPLS = 1006, /* Extended MPLS information */ + SFLFLOW_EX_NAT = 1007, /* Extended NAT information */ + SFLFLOW_EX_MPLS_TUNNEL = 1008, /* additional MPLS information */ + SFLFLOW_EX_MPLS_VC = 1009, + SFLFLOW_EX_MPLS_FTN = 1010, + SFLFLOW_EX_MPLS_LDP_FEC = 1011, + SFLFLOW_EX_VLAN_TUNNEL = 1012, /* VLAN stack */ +}; + +typedef union _SFLFlow_type { + SFLSampled_header header; + SFLSampled_ethernet ethernet; + SFLSampled_ipv4 ipv4; + SFLSampled_ipv6 ipv6; + SFLExtended_switch sw; + SFLExtended_router router; + SFLExtended_gateway gateway; + SFLExtended_user user; + SFLExtended_url url; + SFLExtended_mpls mpls; + SFLExtended_nat nat; + SFLExtended_mpls_tunnel mpls_tunnel; + SFLExtended_mpls_vc mpls_vc; + SFLExtended_mpls_FTN mpls_ftn; + SFLExtended_mpls_LDP_FEC mpls_ldp_fec; + SFLExtended_vlan_tunnel vlan_tunnel; +} SFLFlow_type; + +typedef struct _SFLFlow_sample_element { + struct _SFLFlow_sample_element *nxt; + u_int32_t tag; /* SFLFlow_type_tag */ + u_int32_t length; + SFLFlow_type flowType; +} SFLFlow_sample_element; + +enum SFL_sample_tag { + SFLFLOW_SAMPLE = 1, /* enterprise = 0 : format = 1 */ + SFLCOUNTERS_SAMPLE = 2, /* enterprise = 0 : format = 2 */ + SFLFLOW_SAMPLE_EXPANDED = 3, /* enterprise = 0 : format = 3 */ + SFLCOUNTERS_SAMPLE_EXPANDED = 4 /* enterprise = 0 : format = 4 */ +}; + +/* Format of a single flow sample */ + +typedef struct _SFLFlow_sample { + /* u_int32_t tag; */ /* SFL_sample_tag -- enterprise = 0 : format = 1 */ + /* u_int32_t length; */ + u_int32_t sequence_number; /* Incremented with each flow sample + generated */ + u_int32_t source_id; /* fsSourceId */ + u_int32_t sampling_rate; /* fsPacketSamplingRate */ + u_int32_t sample_pool; /* Total number of packets that could have been + sampled (i.e. packets skipped by sampling + process + total number of samples) */ + u_int32_t drops; /* Number of times a packet was dropped due to + lack of resources */ + u_int32_t input; /* SNMP ifIndex of input interface. + 0 if interface is not known. */ + u_int32_t output; /* SNMP ifIndex of output interface, + 0 if interface is not known. + Set most significant bit to indicate + multiple destination interfaces + (i.e. in case of broadcast or multicast) + and set lower order bits to indicate + number of destination interfaces. + Examples: + 0x00000002 indicates ifIndex = 2 + 0x00000000 ifIndex unknown. + 0x80000007 indicates a packet sent + to 7 interfaces. + 0x80000000 indicates a packet sent to + an unknown number of + interfaces greater than 1.*/ + u_int32_t num_elements; + SFLFlow_sample_element *elements; +} SFLFlow_sample; + +/* same thing, but the expanded version (for full 32-bit ifIndex numbers) */ + +typedef struct _SFLFlow_sample_expanded { + /* u_int32_t tag; */ /* SFL_sample_tag -- enterprise = 0 : format = 1 */ + /* u_int32_t length; */ + u_int32_t sequence_number; /* Incremented with each flow sample + generated */ + u_int32_t ds_class; /* EXPANDED */ + u_int32_t ds_index; /* EXPANDED */ + u_int32_t sampling_rate; /* fsPacketSamplingRate */ + u_int32_t sample_pool; /* Total number of packets that could have been + sampled (i.e. packets skipped by sampling + process + total number of samples) */ + u_int32_t drops; /* Number of times a packet was dropped due to + lack of resources */ + u_int32_t inputFormat; /* EXPANDED */ + u_int32_t input; /* SNMP ifIndex of input interface. + 0 if interface is not known. */ + u_int32_t outputFormat; /* EXPANDED */ + u_int32_t output; /* SNMP ifIndex of output interface, + 0 if interface is not known. */ + u_int32_t num_elements; + SFLFlow_sample_element *elements; +} SFLFlow_sample_expanded; + +/* Counter types */ + +/* Generic interface counters - see RFC 1573, 2233 */ + +typedef struct _SFLIf_counters { + u_int32_t ifIndex; + u_int32_t ifType; + u_int64_t ifSpeed; + u_int32_t ifDirection; /* Derived from MAU MIB (RFC 2668) + 0 = unknown, 1 = full-duplex, + 2 = half-duplex, 3 = in, 4 = out */ + u_int32_t ifStatus; /* bit field with the following bits assigned: + bit 0 = ifAdminStatus (0 = down, 1 = up) + bit 1 = ifOperStatus (0 = down, 1 = up) */ + u_int64_t ifInOctets; + u_int32_t ifInUcastPkts; + u_int32_t ifInMulticastPkts; + u_int32_t ifInBroadcastPkts; + u_int32_t ifInDiscards; + u_int32_t ifInErrors; + u_int32_t ifInUnknownProtos; + u_int64_t ifOutOctets; + u_int32_t ifOutUcastPkts; + u_int32_t ifOutMulticastPkts; + u_int32_t ifOutBroadcastPkts; + u_int32_t ifOutDiscards; + u_int32_t ifOutErrors; + u_int32_t ifPromiscuousMode; +} SFLIf_counters; + +/* Ethernet interface counters - see RFC 2358 */ +typedef struct _SFLEthernet_counters { + u_int32_t dot3StatsAlignmentErrors; + u_int32_t dot3StatsFCSErrors; + u_int32_t dot3StatsSingleCollisionFrames; + u_int32_t dot3StatsMultipleCollisionFrames; + u_int32_t dot3StatsSQETestErrors; + u_int32_t dot3StatsDeferredTransmissions; + u_int32_t dot3StatsLateCollisions; + u_int32_t dot3StatsExcessiveCollisions; + u_int32_t dot3StatsInternalMacTransmitErrors; + u_int32_t dot3StatsCarrierSenseErrors; + u_int32_t dot3StatsFrameTooLongs; + u_int32_t dot3StatsInternalMacReceiveErrors; + u_int32_t dot3StatsSymbolErrors; +} SFLEthernet_counters; + +/* Token ring counters - see RFC 1748 */ + +typedef struct _SFLTokenring_counters { + u_int32_t dot5StatsLineErrors; + u_int32_t dot5StatsBurstErrors; + u_int32_t dot5StatsACErrors; + u_int32_t dot5StatsAbortTransErrors; + u_int32_t dot5StatsInternalErrors; + u_int32_t dot5StatsLostFrameErrors; + u_int32_t dot5StatsReceiveCongestions; + u_int32_t dot5StatsFrameCopiedErrors; + u_int32_t dot5StatsTokenErrors; + u_int32_t dot5StatsSoftErrors; + u_int32_t dot5StatsHardErrors; + u_int32_t dot5StatsSignalLoss; + u_int32_t dot5StatsTransmitBeacons; + u_int32_t dot5StatsRecoverys; + u_int32_t dot5StatsLobeWires; + u_int32_t dot5StatsRemoves; + u_int32_t dot5StatsSingles; + u_int32_t dot5StatsFreqErrors; +} SFLTokenring_counters; + +/* 100 BaseVG interface counters - see RFC 2020 */ + +typedef struct _SFLVg_counters { + u_int32_t dot12InHighPriorityFrames; + u_int64_t dot12InHighPriorityOctets; + u_int32_t dot12InNormPriorityFrames; + u_int64_t dot12InNormPriorityOctets; + u_int32_t dot12InIPMErrors; + u_int32_t dot12InOversizeFrameErrors; + u_int32_t dot12InDataErrors; + u_int32_t dot12InNullAddressedFrames; + u_int32_t dot12OutHighPriorityFrames; + u_int64_t dot12OutHighPriorityOctets; + u_int32_t dot12TransitionIntoTrainings; + u_int64_t dot12HCInHighPriorityOctets; + u_int64_t dot12HCInNormPriorityOctets; + u_int64_t dot12HCOutHighPriorityOctets; +} SFLVg_counters; + +typedef struct _SFLVlan_counters { + u_int32_t vlan_id; + u_int64_t octets; + u_int32_t ucastPkts; + u_int32_t multicastPkts; + u_int32_t broadcastPkts; + u_int32_t discards; +} SFLVlan_counters; + +/* Counters data */ + +enum SFLCounters_type_tag { + /* enterprise = 0, format = ... */ + SFLCOUNTERS_GENERIC = 1, + SFLCOUNTERS_ETHERNET = 2, + SFLCOUNTERS_TOKENRING = 3, + SFLCOUNTERS_VG = 4, + SFLCOUNTERS_VLAN = 5 +}; + +typedef union _SFLCounters_type { + SFLIf_counters generic; + SFLEthernet_counters ethernet; + SFLTokenring_counters tokenring; + SFLVg_counters vg; + SFLVlan_counters vlan; +} SFLCounters_type; + +typedef struct _SFLCounters_sample_element { + struct _SFLCounters_sample_element *nxt; /* linked list */ + u_int32_t tag; /* SFLCounters_type_tag */ + u_int32_t length; + SFLCounters_type counterBlock; +} SFLCounters_sample_element; + +typedef struct _SFLCounters_sample { + /* u_int32_t tag; */ /* SFL_sample_tag -- enterprise = 0 : format = 2 */ + /* u_int32_t length; */ + u_int32_t sequence_number; /* Incremented with each counters sample + generated by this source_id */ + u_int32_t source_id; /* fsSourceId */ + u_int32_t num_elements; + SFLCounters_sample_element *elements; +} SFLCounters_sample; + +/* same thing, but the expanded version, so ds_index can be a full 32 bits */ +typedef struct _SFLCounters_sample_expanded { + /* u_int32_t tag; */ /* SFL_sample_tag -- enterprise = 0 : format = 2 */ + /* u_int32_t length; */ + u_int32_t sequence_number; /* Incremented with each counters sample + generated by this source_id */ + u_int32_t ds_class; /* EXPANDED */ + u_int32_t ds_index; /* EXPANDED */ + u_int32_t num_elements; + SFLCounters_sample_element *elements; +} SFLCounters_sample_expanded; + +#define SFLADD_ELEMENT(_sm, _el) do { (_el)->nxt = (_sm)->elements; (_sm)->elements = (_el); } while(0) + +/* Format of a sample datagram */ + +enum SFLDatagram_version { + SFLDATAGRAM_VERSION2 = 2, + SFLDATAGRAM_VERSION4 = 4, + SFLDATAGRAM_VERSION5 = 5 +}; + +typedef struct _SFLSample_datagram_hdr { + u_int32_t datagram_version; /* (enum SFLDatagram_version) = VERSION5 = 5 */ + SFLAddress agent_address; /* IP address of sampling agent */ + u_int32_t sub_agent_id; /* Used to distinguishing between datagram + streams from separate agent sub entities + within an device. */ + u_int32_t sequence_number; /* Incremented with each sample datagram + generated */ + u_int32_t uptime; /* Current time (in milliseconds since device + last booted). Should be set as close to + datagram transmission time as possible.*/ + u_int32_t num_records; /* Number of tag-len-val flow/counter records to follow */ +} SFLSample_datagram_hdr; + +#define SFL_MAX_DATAGRAM_SIZE 1500 +#define SFL_MIN_DATAGRAM_SIZE 200 +#define SFL_DEFAULT_DATAGRAM_SIZE 1400 + +#define SFL_DATA_PAD 400 + +#endif /* SFLOW_H */ diff --git a/lib/sflow_agent.c b/lib/sflow_agent.c new file mode 100644 index 00000000..4b25c25a --- /dev/null +++ b/lib/sflow_agent.c @@ -0,0 +1,492 @@ +/* Copyright (c) 2002-2009 InMon Corp. Licensed under the terms of the InMon sFlow licence: */ +/* http://www.inmon.com/technology/sflowlicense.txt */ + +#include "sflow_api.h" + +static void * sflAlloc(SFLAgent *agent, size_t bytes); +static void sflFree(SFLAgent *agent, void *obj); +static void sfl_agent_jumpTableAdd(SFLAgent *agent, SFLSampler *sampler); +static void sfl_agent_jumpTableRemove(SFLAgent *agent, SFLSampler *sampler); + +/*________________--------------------------__________________ + ________________ sfl_agent_init __________________ + ----------------__________________________------------------ +*/ + +void sfl_agent_init(SFLAgent *agent, + SFLAddress *myIP, /* IP address of this agent in net byte order */ + u_int32_t subId, /* agent_sub_id */ + time_t bootTime, /* agent boot time */ + time_t now, /* time now */ + void *magic, /* ptr to pass back in logging and alloc fns */ + allocFn_t allocFn, + freeFn_t freeFn, + errorFn_t errorFn, + sendFn_t sendFn) +{ + /* first clear everything */ + memset(agent, 0, sizeof(*agent)); + /* now copy in the parameters */ + agent->myIP = *myIP; /* structure copy */ + agent->subId = subId; + agent->bootTime = bootTime; + agent->now = now; + agent->magic = magic; + agent->allocFn = allocFn; + agent->freeFn = freeFn; + agent->errorFn = errorFn; + agent->sendFn = sendFn; + +#ifdef SFLOW_DO_SOCKET + if(sendFn == NULL) { + /* open the socket - really need one for v4 and another for v6? */ + if((agent->receiverSocket4 = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1) + sfl_agent_sysError(agent, "agent", "IPv4 socket open failed"); + if((agent->receiverSocket6 = socket(AF_INET6, SOCK_DGRAM, IPPROTO_UDP)) == -1) + sfl_agent_sysError(agent, "agent", "IPv6 socket open failed"); + } +#endif +} + +/*_________________---------------------------__________________ + _________________ sfl_agent_release __________________ + -----------------___________________________------------------ +*/ + +void sfl_agent_release(SFLAgent *agent) +{ + /* release and free the samplers, pollers and receivers */ + SFLSampler *sm = agent->samplers; + SFLPoller *pl = agent->pollers; + SFLReceiver *rcv = agent->receivers; + + for(; sm != NULL; ) { + SFLSampler *nextSm = sm->nxt; + sflFree(agent, sm); + sm = nextSm; + } + agent->samplers = NULL; + + for(; pl != NULL; ) { + SFLPoller *nextPl = pl->nxt; + sflFree(agent, pl); + pl = nextPl; + } + agent->pollers = NULL; + + for(; rcv != NULL; ) { + SFLReceiver *nextRcv = rcv->nxt; + sflFree(agent, rcv); + rcv = nextRcv; + } + agent->receivers = NULL; + +#ifdef SFLOW_DO_SOCKET + /* close the sockets */ + if(agent->receiverSocket4 > 0) close(agent->receiverSocket4); + if(agent->receiverSocket6 > 0) close(agent->receiverSocket6); +#endif +} + + +/*_________________---------------------------__________________ + _________________ sfl_agent_set_* __________________ + -----------------___________________________------------------ +*/ + +void sfl_agent_set_agentAddress(SFLAgent *agent, SFLAddress *addr) +{ + if(addr && memcmp(addr, &agent->myIP, sizeof(agent->myIP)) != 0) { + /* change of address */ + agent->myIP = *addr; /* structure copy */ + /* reset sequence numbers here? */ + } +} + +void sfl_agent_set_agentSubId(SFLAgent *agent, u_int32_t subId) +{ + if(subId != agent->subId) { + /* change of subId */ + agent->subId = subId; + /* reset sequence numbers here? */ + } +} + +/*_________________---------------------------__________________ + _________________ sfl_agent_tick __________________ + -----------------___________________________------------------ +*/ + +void sfl_agent_tick(SFLAgent *agent, time_t now) +{ + SFLReceiver *rcv = agent->receivers; + SFLSampler *sm = agent->samplers; + SFLPoller *pl = agent->pollers; + agent->now = now; + /* receivers use ticks to flush send data */ + for(; rcv != NULL; rcv = rcv->nxt) sfl_receiver_tick(rcv, now); + /* samplers use ticks to decide when they are sampling too fast */ + for(; sm != NULL; sm = sm->nxt) sfl_sampler_tick(sm, now); + /* pollers use ticks to decide when to ask for counters */ + for(; pl != NULL; pl = pl->nxt) sfl_poller_tick(pl, now); +} + +/*_________________---------------------------__________________ + _________________ sfl_agent_addReceiver __________________ + -----------------___________________________------------------ +*/ + +SFLReceiver *sfl_agent_addReceiver(SFLAgent *agent) +{ + SFLReceiver *rcv = (SFLReceiver *)sflAlloc(agent, sizeof(SFLReceiver)); + sfl_receiver_init(rcv, agent); + /* add to end of list - to preserve the receiver index numbers for existing receivers */ + { + SFLReceiver *r, *prev = NULL; + for(r = agent->receivers; r != NULL; prev = r, r = r->nxt); + if(prev) prev->nxt = rcv; + else agent->receivers = rcv; + rcv->nxt = NULL; + } + return rcv; +} + +/*_________________---------------------------__________________ + _________________ sfl_dsi_compare __________________ + -----------------___________________________------------------ + + Note that if there is a mixture of ds_classes for this agent, then + the simple numeric comparison may not be correct - the sort order (for + the purposes of the SNMP MIB) should really be determined by the OID + that these numeric ds_class numbers are a shorthand for. For example, + ds_class == 0 means ifIndex, which is the oid "1.3.6.1.2.1.2.2.1" +*/ + +static inline int sfl_dsi_compare(SFLDataSource_instance *pdsi1, SFLDataSource_instance *pdsi2) { + /* could have used just memcmp(), but not sure if that would + give the right answer on little-endian platforms. Safer to be explicit... */ + int cmp = pdsi2->ds_class - pdsi1->ds_class; + if(cmp == 0) cmp = pdsi2->ds_index - pdsi1->ds_index; + if(cmp == 0) cmp = pdsi2->ds_instance - pdsi1->ds_instance; + return cmp; +} + +/*_________________---------------------------__________________ + _________________ sfl_agent_addSampler __________________ + -----------------___________________________------------------ +*/ + +SFLSampler *sfl_agent_addSampler(SFLAgent *agent, SFLDataSource_instance *pdsi) +{ + /* Keep the list sorted. */ + SFLSampler *prev = NULL, *sm = agent->samplers; + for(; sm != NULL; prev = sm, sm = sm->nxt) { + int64_t cmp = sfl_dsi_compare(pdsi, &sm->dsi); + if(cmp == 0) return sm; /* found - return existing one */ + if(cmp < 0) break; /* insert here */ + } + /* either we found the insert point, or reached the end of the list...*/ + + { + SFLSampler *newsm = (SFLSampler *)sflAlloc(agent, sizeof(SFLSampler)); + sfl_sampler_init(newsm, agent, pdsi); + if(prev) prev->nxt = newsm; + else agent->samplers = newsm; + newsm->nxt = sm; + + /* see if we should go in the ifIndex jumpTable */ + if(SFL_DS_CLASS(newsm->dsi) == 0) { + SFLSampler *test = sfl_agent_getSamplerByIfIndex(agent, SFL_DS_INDEX(newsm->dsi)); + if(test && (SFL_DS_INSTANCE(newsm->dsi) < SFL_DS_INSTANCE(test->dsi))) { + /* replace with this new one because it has a lower ds_instance number */ + sfl_agent_jumpTableRemove(agent, test); + test = NULL; + } + if(test == NULL) sfl_agent_jumpTableAdd(agent, newsm); + } + return newsm; + } +} + +/*_________________---------------------------__________________ + _________________ sfl_agent_addPoller __________________ + -----------------___________________________------------------ +*/ + +SFLPoller *sfl_agent_addPoller(SFLAgent *agent, + SFLDataSource_instance *pdsi, + void *magic, /* ptr to pass back in getCountersFn() */ + getCountersFn_t getCountersFn) +{ + /* keep the list sorted */ + SFLPoller *prev = NULL, *pl = agent->pollers; + for(; pl != NULL; prev = pl, pl = pl->nxt) { + int64_t cmp = sfl_dsi_compare(pdsi, &pl->dsi); + if(cmp == 0) return pl; /* found - return existing one */ + if(cmp < 0) break; /* insert here */ + } + /* either we found the insert point, or reached the end of the list... */ + { + SFLPoller *newpl = (SFLPoller *)sflAlloc(agent, sizeof(SFLPoller)); + sfl_poller_init(newpl, agent, pdsi, magic, getCountersFn); + if(prev) prev->nxt = newpl; + else agent->pollers = newpl; + newpl->nxt = pl; + return newpl; + } +} + +/*_________________---------------------------__________________ + _________________ sfl_agent_removeSampler __________________ + -----------------___________________________------------------ +*/ + +int sfl_agent_removeSampler(SFLAgent *agent, SFLDataSource_instance *pdsi) +{ + /* find it, unlink it and free it */ + SFLSampler *prev = NULL, *sm = agent->samplers; + for(; sm != NULL; prev = sm, sm = sm->nxt) { + if(sfl_dsi_compare(pdsi, &sm->dsi) == 0) { + if(prev == NULL) agent->samplers = sm->nxt; + else prev->nxt = sm->nxt; + sfl_agent_jumpTableRemove(agent, sm); + sflFree(agent, sm); + return 1; + } + } + /* not found */ + return 0; +} + +/*_________________---------------------------__________________ + _________________ sfl_agent_removePoller __________________ + -----------------___________________________------------------ +*/ + +int sfl_agent_removePoller(SFLAgent *agent, SFLDataSource_instance *pdsi) +{ + /* find it, unlink it and free it */ + SFLPoller *prev = NULL, *pl = agent->pollers; + for(; pl != NULL; prev = pl, pl = pl->nxt) { + if(sfl_dsi_compare(pdsi, &pl->dsi) == 0) { + if(prev == NULL) agent->pollers = pl->nxt; + else prev->nxt = pl->nxt; + sflFree(agent, pl); + return 1; + } + } + /* not found */ + return 0; +} + +/*_________________--------------------------------__________________ + _________________ sfl_agent_jumpTableAdd __________________ + -----------------________________________________------------------ +*/ + +static void sfl_agent_jumpTableAdd(SFLAgent *agent, SFLSampler *sampler) +{ + u_int32_t hashIndex = SFL_DS_INDEX(sampler->dsi) % SFL_HASHTABLE_SIZ; + sampler->hash_nxt = agent->jumpTable[hashIndex]; + agent->jumpTable[hashIndex] = sampler; +} + +/*_________________--------------------------------__________________ + _________________ sfl_agent_jumpTableRemove __________________ + -----------------________________________________------------------ +*/ + +static void sfl_agent_jumpTableRemove(SFLAgent *agent, SFLSampler *sampler) +{ + u_int32_t hashIndex = SFL_DS_INDEX(sampler->dsi) % SFL_HASHTABLE_SIZ; + SFLSampler *search = agent->jumpTable[hashIndex], *prev = NULL; + for( ; search != NULL; prev = search, search = search->hash_nxt) if(search == sampler) break; + if(search) { + // found - unlink + if(prev) prev->hash_nxt = search->hash_nxt; + else agent->jumpTable[hashIndex] = search->hash_nxt; + search->hash_nxt = NULL; + } +} + +/*_________________--------------------------------__________________ + _________________ sfl_agent_getSamplerByIfIndex __________________ + -----------------________________________________------------------ + fast lookup (pointers cached in hash table). If there are multiple + sampler instances for a given ifIndex, then this fn will return + the one with the lowest instance number. Since the samplers + list is sorted, this means the other instances will be accesible + by following the sampler->nxt pointer (until the ds_class + or ds_index changes). This is helpful if you need to offer + the same flowSample to multiple samplers. +*/ + +SFLSampler *sfl_agent_getSamplerByIfIndex(SFLAgent *agent, u_int32_t ifIndex) +{ + SFLSampler *search = agent->jumpTable[ifIndex % SFL_HASHTABLE_SIZ]; + for( ; search != NULL; search = search->hash_nxt) if(SFL_DS_INDEX(search->dsi) == ifIndex) break; + return search; +} + +/*_________________---------------------------__________________ + _________________ sfl_agent_getSampler __________________ + -----------------___________________________------------------ +*/ + +SFLSampler *sfl_agent_getSampler(SFLAgent *agent, SFLDataSource_instance *pdsi) +{ + /* find it and return it */ + SFLSampler *sm = agent->samplers; + for(; sm != NULL; sm = sm->nxt) + if(sfl_dsi_compare(pdsi, &sm->dsi) == 0) return sm; + /* not found */ + return NULL; +} + +/*_________________---------------------------__________________ + _________________ sfl_agent_getPoller __________________ + -----------------___________________________------------------ +*/ + +SFLPoller *sfl_agent_getPoller(SFLAgent *agent, SFLDataSource_instance *pdsi) +{ + /* find it and return it */ + SFLPoller *pl = agent->pollers; + for(; pl != NULL; pl = pl->nxt) + if(sfl_dsi_compare(pdsi, &pl->dsi) == 0) return pl; + /* not found */ + return NULL; +} + +/*_________________---------------------------__________________ + _________________ sfl_agent_getReceiver __________________ + -----------------___________________________------------------ +*/ + +SFLReceiver *sfl_agent_getReceiver(SFLAgent *agent, u_int32_t receiverIndex) +{ + u_int32_t rcvIdx = 0; + SFLReceiver *rcv = agent->receivers; + for(; rcv != NULL; rcv = rcv->nxt) + if(receiverIndex == ++rcvIdx) return rcv; + + /* not found - ran off the end of the table */ + return NULL; +} + +/*_________________---------------------------__________________ + _________________ sfl_agent_getNextSampler __________________ + -----------------___________________________------------------ +*/ + +SFLSampler *sfl_agent_getNextSampler(SFLAgent *agent, SFLDataSource_instance *pdsi) +{ + /* return the one lexograpically just after it - assume they are sorted + correctly according to the lexographical ordering of the object ids */ + SFLSampler *sm = sfl_agent_getSampler(agent, pdsi); + return sm ? sm->nxt : NULL; +} + +/*_________________---------------------------__________________ + _________________ sfl_agent_getNextPoller __________________ + -----------------___________________________------------------ +*/ + +SFLPoller *sfl_agent_getNextPoller(SFLAgent *agent, SFLDataSource_instance *pdsi) +{ + /* return the one lexograpically just after it - assume they are sorted + correctly according to the lexographical ordering of the object ids */ + SFLPoller *pl = sfl_agent_getPoller(agent, pdsi); + return pl ? pl->nxt : NULL; +} + +/*_________________---------------------------__________________ + _________________ sfl_agent_getNextReceiver __________________ + -----------------___________________________------------------ +*/ + +SFLReceiver *sfl_agent_getNextReceiver(SFLAgent *agent, u_int32_t receiverIndex) +{ + return sfl_agent_getReceiver(agent, receiverIndex + 1); +} + + +/*_________________---------------------------__________________ + _________________ sfl_agent_resetReceiver __________________ + -----------------___________________________------------------ +*/ + +void sfl_agent_resetReceiver(SFLAgent *agent, SFLReceiver *receiver) +{ + /* tell samplers and pollers to stop sending to this receiver */ + /* first get his receiverIndex */ + u_int32_t rcvIdx = 0; + SFLReceiver *rcv = agent->receivers; + for(; rcv != NULL; rcv = rcv->nxt) { + rcvIdx++; /* thanks to Diego Valverde for pointing out this bugfix */ + if(rcv == receiver) { + /* now tell anyone that is using it to stop */ + SFLSampler *sm = agent->samplers; + SFLPoller *pl = agent->pollers; + + for(; sm != NULL; sm = sm->nxt) + if(sfl_sampler_get_sFlowFsReceiver(sm) == rcvIdx) sfl_sampler_set_sFlowFsReceiver(sm, 0); + + for(; pl != NULL; pl = pl->nxt) + if(sfl_poller_get_sFlowCpReceiver(pl) == rcvIdx) sfl_poller_set_sFlowCpReceiver(pl, 0); + + break; + } + } +} + +/*_________________---------------------------__________________ + _________________ sfl_agent_error __________________ + -----------------___________________________------------------ +*/ +#define MAX_ERRMSG_LEN 1000 + +void sfl_agent_error(SFLAgent *agent, char *modName, char *msg) +{ + char errm[MAX_ERRMSG_LEN]; + sprintf(errm, "sfl_agent_error: %s: %s\n", modName, msg); + if(agent->errorFn) (*agent->errorFn)(agent->magic, agent, errm); + else { + fprintf(stderr, "%s\n", errm); + fflush(stderr); + } +} + +/*_________________---------------------------__________________ + _________________ sfl_agent_sysError __________________ + -----------------___________________________------------------ +*/ + +void sfl_agent_sysError(SFLAgent *agent, char *modName, char *msg) +{ + char errm[MAX_ERRMSG_LEN]; + sprintf(errm, "sfl_agent_sysError: %s: %s (errno = %d - %s)\n", modName, msg, errno, strerror(errno)); + if(agent->errorFn) (*agent->errorFn)(agent->magic, agent, errm); + else { + fprintf(stderr, "%s\n", errm); + fflush(stderr); + } +} + + +/*_________________---------------------------__________________ + _________________ alloc and free __________________ + -----------------___________________________------------------ +*/ + +static void * sflAlloc(SFLAgent *agent, size_t bytes) +{ + if(agent->allocFn) return (*agent->allocFn)(agent->magic, agent, bytes); + else return SFL_ALLOC(bytes); +} + +static void sflFree(SFLAgent *agent, void *obj) +{ + if(agent->freeFn) (*agent->freeFn)(agent->magic, agent, obj); + else SFL_FREE(obj); +} diff --git a/lib/sflow_api.h b/lib/sflow_api.h new file mode 100644 index 00000000..be8d9977 --- /dev/null +++ b/lib/sflow_api.h @@ -0,0 +1,340 @@ +/* Copyright (c) 2002-2009 InMon Corp. Licensed under the terms of the InMon sFlow licence: */ +/* http://www.inmon.com/technology/sflowlicense.txt */ + +#ifndef SFLOW_API_H +#define SFLOW_API_H 1 + +/* define SFLOW_DO_SOCKET to 1 if you want the agent + to send the packets itself, otherwise set the sendFn + callback in sfl_agent_init.*/ +/* #define SFLOW_DO_SOCKET */ + +#include <stdio.h> +#include <stdlib.h> +#include <unistd.h> +#include <errno.h> +#include <string.h> +#include <sys/types.h> +#include <arpa/inet.h> /* for htonl */ + +#ifdef SFLOW_DO_SOCKET +#include <sys/socket.h> +#include <netinet/in_systm.h> +#include <netinet/in.h> +#include <netinet/ip.h> +#endif + +#include "sflow.h" + +/* define SFLOW_SOFTWARE_SAMPLING to 1 if you need to use the + sfl_sampler_takeSample routine and give it every packet */ +/* #define SFLOW_SOFTWARE_SAMPLING */ + +/* + uncomment this preprocessor flag (or compile with -DSFL_USE_32BIT_INDEX) + if your ds_index numbers can ever be >= 2^30-1 (i.e. >= 0x3FFFFFFF) +*/ +/* #define SFL_USE_32BIT_INDEX */ + + +/* Used to combine ds_class, ds_index and instance into + a single 64-bit number like this: + __________________________________ + | cls| index | instance | + ---------------------------------- + + but now is opened up to a 12-byte struct to ensure + that ds_index has a full 32-bit field, and to make + accessing the components simpler. The macros have + the same behavior as before, so this change should + be transparent. The only difference is that these + objects are now passed around by reference instead + of by value, and the comparison is done using a fn. +*/ + +typedef struct _SFLDataSource_instance { + u_int32_t ds_class; + u_int32_t ds_index; + u_int32_t ds_instance; +} SFLDataSource_instance; + +#ifdef SFL_USE_32BIT_INDEX +#define SFL_FLOW_SAMPLE_TYPE SFLFlow_sample_expanded +#define SFL_COUNTERS_SAMPLE_TYPE SFLCounters_sample_expanded +#else +#define SFL_FLOW_SAMPLE_TYPE SFLFlow_sample +#define SFL_COUNTERS_SAMPLE_TYPE SFLCounters_sample +/* if index numbers are not going to use all 32 bits, then we can use + the more compact encoding, with the dataSource class and index merged */ +#define SFL_DS_DATASOURCE(dsi) (((dsi).ds_class << 24) + (dsi).ds_index) +#endif + +#define SFL_DS_INSTANCE(dsi) (dsi).ds_instance +#define SFL_DS_CLASS(dsi) (dsi).ds_class +#define SFL_DS_INDEX(dsi) (dsi).ds_index +#define SFL_DS_SET(dsi,clss,indx,inst) \ + do { \ + (dsi).ds_class = (clss); \ + (dsi).ds_index = (indx); \ + (dsi).ds_instance = (inst); \ + } while(0) + +typedef struct _SFLSampleCollector { + u_int32_t data[(SFL_MAX_DATAGRAM_SIZE + SFL_DATA_PAD) / sizeof(u_int32_t)]; + u_int32_t *datap; /* packet fill pointer */ + u_int32_t pktlen; /* accumulated size */ + u_int32_t packetSeqNo; + u_int32_t numSamples; +} SFLSampleCollector; + +struct _SFLAgent; /* forward decl */ + +typedef struct _SFLReceiver { + struct _SFLReceiver *nxt; + /* MIB fields */ + char *sFlowRcvrOwner; + time_t sFlowRcvrTimeout; + u_int32_t sFlowRcvrMaximumDatagramSize; + SFLAddress sFlowRcvrAddress; + u_int32_t sFlowRcvrPort; + u_int32_t sFlowRcvrDatagramVersion; + /* public fields */ + struct _SFLAgent *agent; /* pointer to my agent */ + /* private fields */ + SFLSampleCollector sampleCollector; +#ifdef SFLOW_DO_SOCKET + struct sockaddr_in receiver4; + struct sockaddr_in6 receiver6; +#endif +} SFLReceiver; + +typedef struct _SFLSampler { + /* for linked list */ + struct _SFLSampler *nxt; + /* for hash lookup table */ + struct _SFLSampler *hash_nxt; + /* MIB fields */ + SFLDataSource_instance dsi; + u_int32_t sFlowFsReceiver; + u_int32_t sFlowFsPacketSamplingRate; + u_int32_t sFlowFsMaximumHeaderSize; + /* public fields */ + struct _SFLAgent *agent; /* pointer to my agent */ + /* private fields */ + SFLReceiver *myReceiver; + u_int32_t skip; + u_int32_t samplePool; + u_int32_t flowSampleSeqNo; + /* rate checking */ + u_int32_t samplesThisTick; + u_int32_t samplesLastTick; + u_int32_t backoffThreshold; +} SFLSampler; + +/* declare */ +struct _SFLPoller; + +typedef void (*getCountersFn_t)(void *magic, /* callback to get counters */ + struct _SFLPoller *sampler, /* called with self */ + SFL_COUNTERS_SAMPLE_TYPE *cs); /* struct to fill in */ + +typedef struct _SFLPoller { + /* for linked list */ + struct _SFLPoller *nxt; + /* MIB fields */ + SFLDataSource_instance dsi; + u_int32_t sFlowCpReceiver; + time_t sFlowCpInterval; + /* public fields */ + struct _SFLAgent *agent; /* pointer to my agent */ + void *magic; /* ptr to pass back in getCountersFn() */ + getCountersFn_t getCountersFn; + u_int32_t bridgePort; /* port number local to bridge */ + /* private fields */ + SFLReceiver *myReceiver; + time_t countersCountdown; + u_int32_t countersSampleSeqNo; +} SFLPoller; + +typedef void *(*allocFn_t)(void *magic, /* callback to allocate space on heap */ + struct _SFLAgent *agent, /* called with self */ + size_t bytes); /* bytes requested */ + +typedef int (*freeFn_t)(void *magic, /* callback to free space on heap */ + struct _SFLAgent *agent, /* called with self */ + void *obj); /* obj to free */ + +typedef void (*errorFn_t)(void *magic, /* callback to log error message */ + struct _SFLAgent *agent, /* called with self */ + char *msg); /* error message */ + +typedef void (*sendFn_t)(void *magic, /* optional override fn to send packet */ + struct _SFLAgent *agent, + SFLReceiver *receiver, + u_char *pkt, + u_int32_t pktLen); + + +/* prime numbers are good for hash tables */ +#define SFL_HASHTABLE_SIZ 199 + +typedef struct _SFLAgent { + SFLSampler *jumpTable[SFL_HASHTABLE_SIZ]; /* fast lookup table for samplers (by ifIndex) */ + SFLSampler *samplers; /* the list of samplers */ + SFLPoller *pollers; /* the list of samplers */ + SFLReceiver *receivers; /* the array of receivers */ + time_t bootTime; /* time when we booted or started */ + time_t now; /* time now */ + SFLAddress myIP; /* IP address of this node */ + u_int32_t subId; /* sub_agent_id */ + void *magic; /* ptr to pass back in logging and alloc fns */ + allocFn_t allocFn; + freeFn_t freeFn; + errorFn_t errorFn; + sendFn_t sendFn; +#ifdef SFLOW_DO_SOCKET + int receiverSocket4; + int receiverSocket6; +#endif +} SFLAgent; + +/* call this at the start with a newly created agent */ +void sfl_agent_init(SFLAgent *agent, + SFLAddress *myIP, /* IP address of this agent */ + u_int32_t subId, /* agent_sub_id */ + time_t bootTime, /* agent boot time */ + time_t now, /* time now */ + void *magic, /* ptr to pass back in logging and alloc fns */ + allocFn_t allocFn, + freeFn_t freeFn, + errorFn_t errorFn, + sendFn_t sendFn); + +/* call this to create samplers */ +SFLSampler *sfl_agent_addSampler(SFLAgent *agent, SFLDataSource_instance *pdsi); + +/* call this to create pollers */ +SFLPoller *sfl_agent_addPoller(SFLAgent *agent, + SFLDataSource_instance *pdsi, + void *magic, /* ptr to pass back in getCountersFn() */ + getCountersFn_t getCountersFn); + +/* call this to create receivers */ +SFLReceiver *sfl_agent_addReceiver(SFLAgent *agent); + +/* call this to remove samplers */ +int sfl_agent_removeSampler(SFLAgent *agent, SFLDataSource_instance *pdsi); + +/* call this to remove pollers */ +int sfl_agent_removePoller(SFLAgent *agent, SFLDataSource_instance *pdsi); + +/* note: receivers should not be removed. Typically the receivers + list will be created at init time and never changed */ + +/* call these fns to retrieve sampler, poller or receiver (e.g. for SNMP GET or GETNEXT operation) */ +SFLSampler *sfl_agent_getSampler(SFLAgent *agent, SFLDataSource_instance *pdsi); +SFLSampler *sfl_agent_getNextSampler(SFLAgent *agent, SFLDataSource_instance *pdsi); +SFLPoller *sfl_agent_getPoller(SFLAgent *agent, SFLDataSource_instance *pdsi); +SFLPoller *sfl_agent_getNextPoller(SFLAgent *agent, SFLDataSource_instance *pdsi); +SFLReceiver *sfl_agent_getReceiver(SFLAgent *agent, u_int32_t receiverIndex); +SFLReceiver *sfl_agent_getNextReceiver(SFLAgent *agent, u_int32_t receiverIndex); + +/* jump table access - for performance */ +SFLSampler *sfl_agent_getSamplerByIfIndex(SFLAgent *agent, u_int32_t ifIndex); + +/* call these functions to GET and SET MIB values */ + +/* receiver */ +char * sfl_receiver_get_sFlowRcvrOwner(SFLReceiver *receiver); +void sfl_receiver_set_sFlowRcvrOwner(SFLReceiver *receiver, char *sFlowRcvrOwner); +time_t sfl_receiver_get_sFlowRcvrTimeout(SFLReceiver *receiver); +void sfl_receiver_set_sFlowRcvrTimeout(SFLReceiver *receiver, time_t sFlowRcvrTimeout); +u_int32_t sfl_receiver_get_sFlowRcvrMaximumDatagramSize(SFLReceiver *receiver); +void sfl_receiver_set_sFlowRcvrMaximumDatagramSize(SFLReceiver *receiver, u_int32_t sFlowRcvrMaximumDatagramSize); +SFLAddress *sfl_receiver_get_sFlowRcvrAddress(SFLReceiver *receiver); +void sfl_receiver_set_sFlowRcvrAddress(SFLReceiver *receiver, SFLAddress *sFlowRcvrAddress); +u_int32_t sfl_receiver_get_sFlowRcvrPort(SFLReceiver *receiver); +void sfl_receiver_set_sFlowRcvrPort(SFLReceiver *receiver, u_int32_t sFlowRcvrPort); +/* sampler */ +u_int32_t sfl_sampler_get_sFlowFsReceiver(SFLSampler *sampler); +void sfl_sampler_set_sFlowFsReceiver(SFLSampler *sampler, u_int32_t sFlowFsReceiver); +u_int32_t sfl_sampler_get_sFlowFsPacketSamplingRate(SFLSampler *sampler); +void sfl_sampler_set_sFlowFsPacketSamplingRate(SFLSampler *sampler, u_int32_t sFlowFsPacketSamplingRate); +u_int32_t sfl_sampler_get_sFlowFsMaximumHeaderSize(SFLSampler *sampler); +void sfl_sampler_set_sFlowFsMaximumHeaderSize(SFLSampler *sampler, u_int32_t sFlowFsMaximumHeaderSize); +u_int32_t sfl_sampler_get_samplesLastTick(SFLSampler *sampler); +/* poller */ +u_int32_t sfl_poller_get_sFlowCpReceiver(SFLPoller *poller); +void sfl_poller_set_sFlowCpReceiver(SFLPoller *poller, u_int32_t sFlowCpReceiver); +u_int32_t sfl_poller_get_sFlowCpInterval(SFLPoller *poller); +void sfl_poller_set_sFlowCpInterval(SFLPoller *poller, u_int32_t sFlowCpInterval); + +/* fns to set the sflow agent address or sub-id */ +void sfl_agent_set_agentAddress(SFLAgent *agent, SFLAddress *addr); +void sfl_agent_set_agentSubId(SFLAgent *agent, u_int32_t subId); + +/* The poller may need a separate number to reference the local bridge port + to get counters if it is not the same as the global ifIndex */ +void sfl_poller_set_bridgePort(SFLPoller *poller, u_int32_t port_no); +u_int32_t sfl_poller_get_bridgePort(SFLPoller *poller); + +/* call this to indicate a discontinuity with a counter like samplePool so that the + sflow collector will ignore the next delta */ +void sfl_sampler_resetFlowSeqNo(SFLSampler *sampler); + +/* call this to indicate a discontinuity with one or more of the counters so that the + sflow collector will ignore the next delta */ +void sfl_poller_resetCountersSeqNo(SFLPoller *poller); + +#ifdef SFLOW_SOFTWARE_SAMLING +/* software sampling: call this with every packet - returns non-zero if the packet + should be sampled (in which case you then call sfl_sampler_writeFlowSample()) */ +int sfl_sampler_takeSample(SFLSampler *sampler); +#endif + +/* call this to set a maximum samples-per-second threshold. If the sampler reaches this + threshold it will automatically back off the sampling rate. A value of 0 disables the + mechanism */ +void sfl_sampler_set_backoffThreshold(SFLSampler *sampler, u_int32_t samplesPerSecond); +u_int32_t sfl_sampler_get_backoffThreshold(SFLSampler *sampler); + +/* call this once per second (N.B. not on interrupt stack i.e. not hard real-time) */ +void sfl_agent_tick(SFLAgent *agent, time_t now); + +/* call this with each flow sample */ +void sfl_sampler_writeFlowSample(SFLSampler *sampler, SFL_FLOW_SAMPLE_TYPE *fs); + +/* call this to push counters samples (usually done in the getCountersFn callback) */ +void sfl_poller_writeCountersSample(SFLPoller *poller, SFL_COUNTERS_SAMPLE_TYPE *cs); + +/* call this to deallocate resources */ +void sfl_agent_release(SFLAgent *agent); + + +/* internal fns */ + +void sfl_receiver_init(SFLReceiver *receiver, SFLAgent *agent); +void sfl_sampler_init(SFLSampler *sampler, SFLAgent *agent, SFLDataSource_instance *pdsi); +void sfl_poller_init(SFLPoller *poller, SFLAgent *agent, SFLDataSource_instance *pdsi, void *magic, getCountersFn_t getCountersFn); + + +void sfl_receiver_tick(SFLReceiver *receiver, time_t now); +void sfl_poller_tick(SFLPoller *poller, time_t now); +void sfl_sampler_tick(SFLSampler *sampler, time_t now); + +int sfl_receiver_writeFlowSample(SFLReceiver *receiver, SFL_FLOW_SAMPLE_TYPE *fs); +int sfl_receiver_writeCountersSample(SFLReceiver *receiver, SFL_COUNTERS_SAMPLE_TYPE *cs); + +void sfl_agent_resetReceiver(SFLAgent *agent, SFLReceiver *receiver); + +void sfl_agent_error(SFLAgent *agent, char *modName, char *msg); +void sfl_agent_sysError(SFLAgent *agent, char *modName, char *msg); + +u_int32_t sfl_receiver_samplePacketsSent(SFLReceiver *receiver); + +#define SFL_ALLOC malloc +#define SFL_FREE free + +#endif /* SFLOW_API_H */ + + diff --git a/lib/sflow_poller.c b/lib/sflow_poller.c new file mode 100644 index 00000000..ffd09d3c --- /dev/null +++ b/lib/sflow_poller.c @@ -0,0 +1,142 @@ +/* Copyright (c) 2002-2009 InMon Corp. Licensed under the terms of the InMon sFlow licence: */ +/* http://www.inmon.com/technology/sflowlicense.txt */ + +#include "sflow_api.h" + +/*_________________--------------------------__________________ + _________________ sfl_poller_init __________________ + -----------------__________________________------------------ +*/ + +void sfl_poller_init(SFLPoller *poller, + SFLAgent *agent, + SFLDataSource_instance *pdsi, + void *magic, /* ptr to pass back in getCountersFn() */ + getCountersFn_t getCountersFn) +{ + /* copy the dsi in case it points to poller->dsi, which we are about to clear */ + SFLDataSource_instance dsi = *pdsi; + + /* preserve the *nxt pointer too, in case we are resetting this poller and it is + already part of the agent's linked list (thanks to Matt Woodly for pointing this out) */ + SFLPoller *nxtPtr = poller->nxt; + + /* clear everything */ + memset(poller, 0, sizeof(*poller)); + + /* restore the linked list ptr */ + poller->nxt = nxtPtr; + + /* now copy in the parameters */ + poller->agent = agent; + poller->dsi = dsi; /* structure copy */ + poller->magic = magic; + poller->getCountersFn = getCountersFn; +} + +/*_________________--------------------------__________________ + _________________ reset __________________ + -----------------__________________________------------------ +*/ + +static void reset(SFLPoller *poller) +{ + SFLDataSource_instance dsi = poller->dsi; + sfl_poller_init(poller, poller->agent, &dsi, poller->magic, poller->getCountersFn); +} + +/*_________________---------------------------__________________ + _________________ MIB access __________________ + -----------------___________________________------------------ +*/ +u_int32_t sfl_poller_get_sFlowCpReceiver(SFLPoller *poller) { + return poller->sFlowCpReceiver; +} + +void sfl_poller_set_sFlowCpReceiver(SFLPoller *poller, u_int32_t sFlowCpReceiver) { + poller->sFlowCpReceiver = sFlowCpReceiver; + if(sFlowCpReceiver == 0) reset(poller); + else { + /* retrieve and cache a direct pointer to my receiver */ + poller->myReceiver = sfl_agent_getReceiver(poller->agent, poller->sFlowCpReceiver); + } +} + +u_int32_t sfl_poller_get_sFlowCpInterval(SFLPoller *poller) { + return poller->sFlowCpInterval; +} + +void sfl_poller_set_sFlowCpInterval(SFLPoller *poller, u_int32_t sFlowCpInterval) { + poller->sFlowCpInterval = sFlowCpInterval; + /* Set the countersCountdown to be a randomly selected value between 1 and + sFlowCpInterval. That way the counter polling would be desynchronised + (on a 200-port switch, polling all the counters in one second could be harmful). */ + poller->countersCountdown = 1 + (random() % sFlowCpInterval); +} + +/*_________________---------------------------------__________________ + _________________ bridge port __________________ + -----------------_________________________________------------------ + May need a separate number to reference the local bridge port + to get counters if it is not the same as the global ifIndex. +*/ + +void sfl_poller_set_bridgePort(SFLPoller *poller, u_int32_t port_no) { + poller->bridgePort = port_no; +} + +u_int32_t sfl_poller_get_bridgePort(SFLPoller *poller) { + return poller->bridgePort; +} + +/*_________________---------------------------------__________________ + _________________ sequence number reset __________________ + -----------------_________________________________------------------ + Used to indicate a counter discontinuity + so that the sflow collector will know to ignore the next delta. +*/ +void sfl_poller_resetCountersSeqNo(SFLPoller *poller) { poller->countersSampleSeqNo = 0; } + +/*_________________---------------------------__________________ + _________________ sfl_poller_tick __________________ + -----------------___________________________------------------ +*/ + +void sfl_poller_tick(SFLPoller *poller, time_t now) +{ + if(poller->countersCountdown == 0) return; /* counters retrieval was not enabled */ + if(poller->sFlowCpReceiver == 0) return; + + if(--poller->countersCountdown == 0) { + if(poller->getCountersFn != NULL) { + /* call out for counters */ + SFL_COUNTERS_SAMPLE_TYPE cs; + memset(&cs, 0, sizeof(cs)); + poller->getCountersFn(poller->magic, poller, &cs); + /* this countersFn is expected to fill in some counter block elements + and then call sfl_poller_writeCountersSample(poller, &cs); */ + } + /* reset the countdown */ + poller->countersCountdown = poller->sFlowCpInterval; + } +} + +/*_________________---------------------------------__________________ + _________________ sfl_poller_writeCountersSample __________________ + -----------------_________________________________------------------ +*/ + +void sfl_poller_writeCountersSample(SFLPoller *poller, SFL_COUNTERS_SAMPLE_TYPE *cs) +{ + /* fill in the rest of the header fields, and send to the receiver */ + cs->sequence_number = ++poller->countersSampleSeqNo; +#ifdef SFL_USE_32BIT_INDEX + cs->ds_class = SFL_DS_CLASS(poller->dsi); + cs->ds_index = SFL_DS_INDEX(poller->dsi); +#else + cs->source_id = SFL_DS_DATASOURCE(poller->dsi); +#endif + /* sent to my receiver */ + if(poller->myReceiver) sfl_receiver_writeCountersSample(poller->myReceiver, cs); +} + diff --git a/lib/sflow_receiver.c b/lib/sflow_receiver.c new file mode 100644 index 00000000..7fccab30 --- /dev/null +++ b/lib/sflow_receiver.c @@ -0,0 +1,832 @@ +/* Copyright (c) 2002-2009 InMon Corp. Licensed under the terms of the InMon sFlow licence: */ +/* http://www.inmon.com/technology/sflowlicense.txt */ + +#include <assert.h> +#include "sflow_api.h" + +static void resetSampleCollector(SFLReceiver *receiver); +static void sendSample(SFLReceiver *receiver); +static void sflError(SFLReceiver *receiver, char *errm); +inline static void putNet32(SFLReceiver *receiver, u_int32_t val); +inline static void putAddress(SFLReceiver *receiver, SFLAddress *addr); +#ifdef SFLOW_DO_SOCKET +static void initSocket(SFLReceiver *receiver); +#endif + +/*_________________--------------------------__________________ + _________________ sfl_receiver_init __________________ + -----------------__________________________------------------ +*/ + +void sfl_receiver_init(SFLReceiver *receiver, SFLAgent *agent) +{ + /* first clear everything */ + memset(receiver, 0, sizeof(*receiver)); + + /* now copy in the parameters */ + receiver->agent = agent; + + /* set defaults */ + receiver->sFlowRcvrMaximumDatagramSize = SFL_DEFAULT_DATAGRAM_SIZE; + receiver->sFlowRcvrPort = SFL_DEFAULT_COLLECTOR_PORT; + +#ifdef SFLOW_DO_SOCKET + /* initialize the socket address */ + initSocket(receiver); +#endif + + /* preset some of the header fields */ + receiver->sampleCollector.datap = receiver->sampleCollector.data; + putNet32(receiver, SFLDATAGRAM_VERSION5); + putAddress(receiver, &agent->myIP); + putNet32(receiver, agent->subId); + + /* prepare to receive the first sample */ + resetSampleCollector(receiver); +} + +/*_________________---------------------------__________________ + _________________ reset __________________ + -----------------___________________________------------------ + + called on timeout, or when owner string is cleared +*/ + +static void reset(SFLReceiver *receiver) { + // ask agent to tell samplers and pollers to stop sending samples + sfl_agent_resetReceiver(receiver->agent, receiver); + // reinitialize + sfl_receiver_init(receiver, receiver->agent); +} + +#ifdef SFLOW_DO_SOCKET +/*_________________---------------------------__________________ + _________________ initSocket __________________ + -----------------___________________________------------------ +*/ + +static void initSocket(SFLReceiver *receiver) { + if(receiver->sFlowRcvrAddress.type == SFLADDRESSTYPE_IP_V6) { + struct sockaddr_in6 *sa6 = &receiver->receiver6; + sa6->sin6_port = htons((u_int16_t)receiver->sFlowRcvrPort); + sa6->sin6_family = AF_INET6; + sa6->sin6_addr = receiver->sFlowRcvrAddress.address.ip_v6; + } + else { + struct sockaddr_in *sa4 = &receiver->receiver4; + sa4->sin_port = htons((u_int16_t)receiver->sFlowRcvrPort); + sa4->sin_family = AF_INET; + sa4->sin_addr = receiver->sFlowRcvrAddress.address.ip_v4; + } +} +#endif + +/*_________________----------------------------------------_____________ + _________________ MIB Vars _____________ + -----------------________________________________________------------- +*/ + +char * sfl_receiver_get_sFlowRcvrOwner(SFLReceiver *receiver) { + return receiver->sFlowRcvrOwner; +} +void sfl_receiver_set_sFlowRcvrOwner(SFLReceiver *receiver, char *sFlowRcvrOwner) { + receiver->sFlowRcvrOwner = sFlowRcvrOwner; + if(sFlowRcvrOwner == NULL || sFlowRcvrOwner[0] == '\0') { + // reset condition! owner string was cleared + reset(receiver); + } +} +time_t sfl_receiver_get_sFlowRcvrTimeout(SFLReceiver *receiver) { + return receiver->sFlowRcvrTimeout; +} +void sfl_receiver_set_sFlowRcvrTimeout(SFLReceiver *receiver, time_t sFlowRcvrTimeout) { + receiver->sFlowRcvrTimeout =sFlowRcvrTimeout; +} +u_int32_t sfl_receiver_get_sFlowRcvrMaximumDatagramSize(SFLReceiver *receiver) { + return receiver->sFlowRcvrMaximumDatagramSize; +} +void sfl_receiver_set_sFlowRcvrMaximumDatagramSize(SFLReceiver *receiver, u_int32_t sFlowRcvrMaximumDatagramSize) { + u_int32_t mdz = sFlowRcvrMaximumDatagramSize; + if(mdz < SFL_MIN_DATAGRAM_SIZE) mdz = SFL_MIN_DATAGRAM_SIZE; + receiver->sFlowRcvrMaximumDatagramSize = mdz; +} +SFLAddress *sfl_receiver_get_sFlowRcvrAddress(SFLReceiver *receiver) { + return &receiver->sFlowRcvrAddress; +} +void sfl_receiver_set_sFlowRcvrAddress(SFLReceiver *receiver, SFLAddress *sFlowRcvrAddress) { + if(sFlowRcvrAddress) receiver->sFlowRcvrAddress = *sFlowRcvrAddress; // structure copy +#ifdef SFLOW_DO_SOCKET + initSocket(receiver); +#endif +} +u_int32_t sfl_receiver_get_sFlowRcvrPort(SFLReceiver *receiver) { + return receiver->sFlowRcvrPort; +} +void sfl_receiver_set_sFlowRcvrPort(SFLReceiver *receiver, u_int32_t sFlowRcvrPort) { + receiver->sFlowRcvrPort = sFlowRcvrPort; + // update the socket structure +#ifdef SFLOW_DO_SOCKET + initSocket(receiver); +#endif +} + +/*_________________---------------------------__________________ + _________________ sfl_receiver_tick __________________ + -----------------___________________________------------------ +*/ + +void sfl_receiver_tick(SFLReceiver *receiver, time_t now) +{ + // if there are any samples to send, flush them now + if(receiver->sampleCollector.numSamples > 0) sendSample(receiver); + // check the timeout + if(receiver->sFlowRcvrTimeout && (u_int32_t)receiver->sFlowRcvrTimeout != 0xFFFFFFFF) { + // count down one tick and reset if we reach 0 + if(--receiver->sFlowRcvrTimeout == 0) reset(receiver); + } +} + +/*_________________-----------------------------__________________ + _________________ receiver write utilities __________________ + -----------------_____________________________------------------ +*/ + +inline static void put32(SFLReceiver *receiver, u_int32_t val) +{ + *receiver->sampleCollector.datap++ = val; +} + +inline static void putNet32(SFLReceiver *receiver, u_int32_t val) +{ + *receiver->sampleCollector.datap++ = htonl(val); +} + +inline static void putNet32_run(SFLReceiver *receiver, void *obj, size_t quads) +{ + u_int32_t *from = (u_int32_t *)obj; + while(quads--) putNet32(receiver, *from++); +} + +inline static void putNet64(SFLReceiver *receiver, u_int64_t val64) +{ + u_int32_t *firstQuadPtr = receiver->sampleCollector.datap; + // first copy the bytes in + memcpy((u_char *)firstQuadPtr, &val64, 8); + if(htonl(1) != 1) { + // swap the bytes, and reverse the quads too + u_int32_t tmp = *receiver->sampleCollector.datap++; + *firstQuadPtr = htonl(*receiver->sampleCollector.datap); + *receiver->sampleCollector.datap++ = htonl(tmp); + } + else receiver->sampleCollector.datap += 2; +} + +inline static void put128(SFLReceiver *receiver, u_char *val) +{ + memcpy(receiver->sampleCollector.datap, val, 16); + receiver->sampleCollector.datap += 4; +} + +inline static void putString(SFLReceiver *receiver, SFLString *s) +{ + putNet32(receiver, s->len); + memcpy(receiver->sampleCollector.datap, s->str, s->len); + receiver->sampleCollector.datap += (s->len + 3) / 4; /* pad to 4-byte boundary */ +} + +inline static u_int32_t stringEncodingLength(SFLString *s) { + // answer in bytes, so remember to mulitply by 4 after rounding up to nearest 4-byte boundary + return 4 + (((s->len + 3) / 4) * 4); +} + +inline static void putAddress(SFLReceiver *receiver, SFLAddress *addr) +{ + // encode unspecified addresses as IPV4:0.0.0.0 - or should we flag this as an error? + if(addr->type == 0) { + putNet32(receiver, SFLADDRESSTYPE_IP_V4); + put32(receiver, 0); + } + else { + putNet32(receiver, addr->type); + if(addr->type == SFLADDRESSTYPE_IP_V4) put32(receiver, addr->address.ip_v4.addr); + else put128(receiver, addr->address.ip_v6.addr); + } +} + +inline static u_int32_t addressEncodingLength(SFLAddress *addr) { + return (addr->type == SFLADDRESSTYPE_IP_V6) ? 20 : 8; // type + address (unspecified == IPV4) +} + +inline static void putMACAddress(SFLReceiver *receiver, u_int8_t *mac) +{ + memcpy(receiver->sampleCollector.datap, mac, 6); + receiver->sampleCollector.datap += 2; +} + +inline static void putSwitch(SFLReceiver *receiver, SFLExtended_switch *sw) +{ + putNet32(receiver, sw->src_vlan); + putNet32(receiver, sw->src_priority); + putNet32(receiver, sw->dst_vlan); + putNet32(receiver, sw->dst_priority); +} + +inline static void putRouter(SFLReceiver *receiver, SFLExtended_router *router) +{ + putAddress(receiver, &router->nexthop); + putNet32(receiver, router->src_mask); + putNet32(receiver, router->dst_mask); +} + +inline static u_int32_t routerEncodingLength(SFLExtended_router *router) { + return addressEncodingLength(&router->nexthop) + 8; +} + +inline static void putGateway(SFLReceiver *receiver, SFLExtended_gateway *gw) +{ + putAddress(receiver, &gw->nexthop); + putNet32(receiver, gw->as); + putNet32(receiver, gw->src_as); + putNet32(receiver, gw->src_peer_as); + putNet32(receiver, gw->dst_as_path_segments); + { + u_int32_t seg = 0; + for(; seg < gw->dst_as_path_segments; seg++) { + putNet32(receiver, gw->dst_as_path[seg].type); + putNet32(receiver, gw->dst_as_path[seg].length); + putNet32_run(receiver, gw->dst_as_path[seg].as.seq, gw->dst_as_path[seg].length); + } + } + putNet32(receiver, gw->communities_length); + putNet32_run(receiver, gw->communities, gw->communities_length); + putNet32(receiver, gw->localpref); +} + +inline static u_int32_t gatewayEncodingLength(SFLExtended_gateway *gw) { + u_int32_t elemSiz = addressEncodingLength(&gw->nexthop); + u_int32_t seg = 0; + elemSiz += 16; // as, src_as, src_peer_as, dst_as_path_segments + for(; seg < gw->dst_as_path_segments; seg++) { + elemSiz += 8; // type, length + elemSiz += 4 * gw->dst_as_path[seg].length; // set/seq bytes + } + elemSiz += 4; // communities_length + elemSiz += 4 * gw->communities_length; // communities + elemSiz += 4; // localpref + return elemSiz; +} + +inline static void putUser(SFLReceiver *receiver, SFLExtended_user *user) +{ + putNet32(receiver, user->src_charset); + putString(receiver, &user->src_user); + putNet32(receiver, user->dst_charset); + putString(receiver, &user->dst_user); +} + +inline static u_int32_t userEncodingLength(SFLExtended_user *user) { + return 4 + + stringEncodingLength(&user->src_user) + + 4 + + stringEncodingLength(&user->dst_user); +} + +inline static void putUrl(SFLReceiver *receiver, SFLExtended_url *url) +{ + putNet32(receiver, url->direction); + putString(receiver, &url->url); + putString(receiver, &url->host); +} + +inline static u_int32_t urlEncodingLength(SFLExtended_url *url) { + return 4 + + stringEncodingLength(&url->url) + + stringEncodingLength(&url->host); +} + +inline static void putLabelStack(SFLReceiver *receiver, SFLLabelStack *labelStack) +{ + putNet32(receiver, labelStack->depth); + putNet32_run(receiver, labelStack->stack, labelStack->depth); +} + +inline static u_int32_t labelStackEncodingLength(SFLLabelStack *labelStack) { + return 4 + (4 * labelStack->depth); +} + +inline static void putMpls(SFLReceiver *receiver, SFLExtended_mpls *mpls) +{ + putAddress(receiver, &mpls->nextHop); + putLabelStack(receiver, &mpls->in_stack); + putLabelStack(receiver, &mpls->out_stack); +} + +inline static u_int32_t mplsEncodingLength(SFLExtended_mpls *mpls) { + return addressEncodingLength(&mpls->nextHop) + + labelStackEncodingLength(&mpls->in_stack) + + labelStackEncodingLength(&mpls->out_stack); +} + +inline static void putNat(SFLReceiver *receiver, SFLExtended_nat *nat) +{ + putAddress(receiver, &nat->src); + putAddress(receiver, &nat->dst); +} + +inline static u_int32_t natEncodingLength(SFLExtended_nat *nat) { + return addressEncodingLength(&nat->src) + + addressEncodingLength(&nat->dst); +} + +inline static void putMplsTunnel(SFLReceiver *receiver, SFLExtended_mpls_tunnel *tunnel) +{ + putString(receiver, &tunnel->tunnel_lsp_name); + putNet32(receiver, tunnel->tunnel_id); + putNet32(receiver, tunnel->tunnel_cos); +} + +inline static u_int32_t mplsTunnelEncodingLength(SFLExtended_mpls_tunnel *tunnel) { + return stringEncodingLength(&tunnel->tunnel_lsp_name) + 8; +} + +inline static void putMplsVc(SFLReceiver *receiver, SFLExtended_mpls_vc *vc) +{ + putString(receiver, &vc->vc_instance_name); + putNet32(receiver, vc->vll_vc_id); + putNet32(receiver, vc->vc_label_cos); +} + +inline static u_int32_t mplsVcEncodingLength(SFLExtended_mpls_vc *vc) { + return stringEncodingLength( &vc->vc_instance_name) + 8; +} + +inline static void putMplsFtn(SFLReceiver *receiver, SFLExtended_mpls_FTN *ftn) +{ + putString(receiver, &ftn->mplsFTNDescr); + putNet32(receiver, ftn->mplsFTNMask); +} + +inline static u_int32_t mplsFtnEncodingLength(SFLExtended_mpls_FTN *ftn) { + return stringEncodingLength( &ftn->mplsFTNDescr) + 4; +} + +inline static void putMplsLdpFec(SFLReceiver *receiver, SFLExtended_mpls_LDP_FEC *ldpfec) +{ + putNet32(receiver, ldpfec->mplsFecAddrPrefixLength); +} + +inline static u_int32_t mplsLdpFecEncodingLength(SFLExtended_mpls_LDP_FEC *ldpfec) { + return 4; +} + +inline static void putVlanTunnel(SFLReceiver *receiver, SFLExtended_vlan_tunnel *vlanTunnel) +{ + putLabelStack(receiver, &vlanTunnel->stack); +} + +inline static u_int32_t vlanTunnelEncodingLength(SFLExtended_vlan_tunnel *vlanTunnel) { + return labelStackEncodingLength(&vlanTunnel->stack); +} + + +inline static void putGenericCounters(SFLReceiver *receiver, SFLIf_counters *counters) +{ + putNet32(receiver, counters->ifIndex); + putNet32(receiver, counters->ifType); + putNet64(receiver, counters->ifSpeed); + putNet32(receiver, counters->ifDirection); + putNet32(receiver, counters->ifStatus); + putNet64(receiver, counters->ifInOctets); + putNet32(receiver, counters->ifInUcastPkts); + putNet32(receiver, counters->ifInMulticastPkts); + putNet32(receiver, counters->ifInBroadcastPkts); + putNet32(receiver, counters->ifInDiscards); + putNet32(receiver, counters->ifInErrors); + putNet32(receiver, counters->ifInUnknownProtos); + putNet64(receiver, counters->ifOutOctets); + putNet32(receiver, counters->ifOutUcastPkts); + putNet32(receiver, counters->ifOutMulticastPkts); + putNet32(receiver, counters->ifOutBroadcastPkts); + putNet32(receiver, counters->ifOutDiscards); + putNet32(receiver, counters->ifOutErrors); + putNet32(receiver, counters->ifPromiscuousMode); +} + + +/*_________________-----------------------------__________________ + _________________ computeFlowSampleSize __________________ + -----------------_____________________________------------------ +*/ + +static int computeFlowSampleSize(SFLReceiver *receiver, SFL_FLOW_SAMPLE_TYPE *fs) +{ + SFLFlow_sample_element *elem = fs->elements; +#ifdef SFL_USE_32BIT_INDEX + u_int siz = 52; /* tag, length, sequence_number, ds_class, ds_index, sampling_rate, + sample_pool, drops, inputFormat, input, outputFormat, output, number of elements */ +#else + u_int siz = 40; /* tag, length, sequence_number, source_id, sampling_rate, + sample_pool, drops, input, output, number of elements */ +#endif + + fs->num_elements = 0; /* we're going to count them again even if this was set by the client */ + for(; elem != NULL; elem = elem->nxt) { + u_int elemSiz = 0; + fs->num_elements++; + siz += 8; /* tag, length */ + switch(elem->tag) { + case SFLFLOW_HEADER: + elemSiz = 16; /* header_protocol, frame_length, stripped, header_length */ + elemSiz += ((elem->flowType.header.header_length + 3) / 4) * 4; /* header, rounded up to nearest 4 bytes */ + break; + case SFLFLOW_ETHERNET: elemSiz = sizeof(SFLSampled_ethernet); break; + case SFLFLOW_IPV4: elemSiz = sizeof(SFLSampled_ipv4); break; + case SFLFLOW_IPV6: elemSiz = sizeof(SFLSampled_ipv6); break; + case SFLFLOW_EX_SWITCH: elemSiz = sizeof(SFLExtended_switch); break; + case SFLFLOW_EX_ROUTER: elemSiz = routerEncodingLength(&elem->flowType.router); break; + case SFLFLOW_EX_GATEWAY: elemSiz = gatewayEncodingLength(&elem->flowType.gateway); break; + case SFLFLOW_EX_USER: elemSiz = userEncodingLength(&elem->flowType.user); break; + case SFLFLOW_EX_URL: elemSiz = urlEncodingLength(&elem->flowType.url); break; + case SFLFLOW_EX_MPLS: elemSiz = mplsEncodingLength(&elem->flowType.mpls); break; + case SFLFLOW_EX_NAT: elemSiz = natEncodingLength(&elem->flowType.nat); break; + case SFLFLOW_EX_MPLS_TUNNEL: elemSiz = mplsTunnelEncodingLength(&elem->flowType.mpls_tunnel); break; + case SFLFLOW_EX_MPLS_VC: elemSiz = mplsVcEncodingLength(&elem->flowType.mpls_vc); break; + case SFLFLOW_EX_MPLS_FTN: elemSiz = mplsFtnEncodingLength(&elem->flowType.mpls_ftn); break; + case SFLFLOW_EX_MPLS_LDP_FEC: elemSiz = mplsLdpFecEncodingLength(&elem->flowType.mpls_ldp_fec); break; + case SFLFLOW_EX_VLAN_TUNNEL: elemSiz = vlanTunnelEncodingLength(&elem->flowType.vlan_tunnel); break; + default: + sflError(receiver, "unexpected packet_data_tag"); + return -1; + break; + } + // cache the element size, and accumulate it into the overall FlowSample size + elem->length = elemSiz; + siz += elemSiz; + } + + return siz; +} + +/*_________________-------------------------------__________________ + _________________ sfl_receiver_writeFlowSample __________________ + -----------------_______________________________------------------ +*/ + +int sfl_receiver_writeFlowSample(SFLReceiver *receiver, SFL_FLOW_SAMPLE_TYPE *fs) +{ + int packedSize; + if(fs == NULL) return -1; + if((packedSize = computeFlowSampleSize(receiver, fs)) == -1) return -1; + + // check in case this one sample alone is too big for the datagram + // in fact - if it is even half as big then we should ditch it. Very + // important to avoid overruning the packet buffer. + if(packedSize > (int)(receiver->sFlowRcvrMaximumDatagramSize / 2)) { + sflError(receiver, "flow sample too big for datagram"); + return -1; + } + + // if the sample pkt is full enough so that this sample might put + // it over the limit, then we should send it now before going on. + if((receiver->sampleCollector.pktlen + packedSize) >= receiver->sFlowRcvrMaximumDatagramSize) + sendSample(receiver); + + receiver->sampleCollector.numSamples++; + +#ifdef SFL_USE_32BIT_INDEX + putNet32(receiver, SFLFLOW_SAMPLE_EXPANDED); +#else + putNet32(receiver, SFLFLOW_SAMPLE); +#endif + + putNet32(receiver, packedSize - 8); // don't include tag and len + putNet32(receiver, fs->sequence_number); + +#ifdef SFL_USE_32BIT_INDEX + putNet32(receiver, fs->ds_class); + putNet32(receiver, fs->ds_index); +#else + putNet32(receiver, fs->source_id); +#endif + + putNet32(receiver, fs->sampling_rate); + putNet32(receiver, fs->sample_pool); + putNet32(receiver, fs->drops); + +#ifdef SFL_USE_32BIT_INDEX + putNet32(receiver, fs->inputFormat); + putNet32(receiver, fs->input); + putNet32(receiver, fs->outputFormat); + putNet32(receiver, fs->output); +#else + putNet32(receiver, fs->input); + putNet32(receiver, fs->output); +#endif + + putNet32(receiver, fs->num_elements); + + { + SFLFlow_sample_element *elem = fs->elements; + for(; elem != NULL; elem = elem->nxt) { + + putNet32(receiver, elem->tag); + putNet32(receiver, elem->length); // length cached in computeFlowSampleSize() + + switch(elem->tag) { + case SFLFLOW_HEADER: + putNet32(receiver, elem->flowType.header.header_protocol); + putNet32(receiver, elem->flowType.header.frame_length); + putNet32(receiver, elem->flowType.header.stripped); + putNet32(receiver, elem->flowType.header.header_length); + /* the header */ + memcpy(receiver->sampleCollector.datap, elem->flowType.header.header_bytes, elem->flowType.header.header_length); + /* round up to multiple of 4 to preserve alignment */ + receiver->sampleCollector.datap += ((elem->flowType.header.header_length + 3) / 4); + break; + case SFLFLOW_ETHERNET: + putNet32(receiver, elem->flowType.ethernet.eth_len); + putMACAddress(receiver, elem->flowType.ethernet.src_mac); + putMACAddress(receiver, elem->flowType.ethernet.dst_mac); + putNet32(receiver, elem->flowType.ethernet.eth_type); + break; + case SFLFLOW_IPV4: + putNet32(receiver, elem->flowType.ipv4.length); + putNet32(receiver, elem->flowType.ipv4.protocol); + put32(receiver, elem->flowType.ipv4.src_ip.addr); + put32(receiver, elem->flowType.ipv4.dst_ip.addr); + putNet32(receiver, elem->flowType.ipv4.src_port); + putNet32(receiver, elem->flowType.ipv4.dst_port); + putNet32(receiver, elem->flowType.ipv4.tcp_flags); + putNet32(receiver, elem->flowType.ipv4.tos); + break; + case SFLFLOW_IPV6: + putNet32(receiver, elem->flowType.ipv6.length); + putNet32(receiver, elem->flowType.ipv6.protocol); + put128(receiver, elem->flowType.ipv6.src_ip.addr); + put128(receiver, elem->flowType.ipv6.dst_ip.addr); + putNet32(receiver, elem->flowType.ipv6.src_port); + putNet32(receiver, elem->flowType.ipv6.dst_port); + putNet32(receiver, elem->flowType.ipv6.tcp_flags); + putNet32(receiver, elem->flowType.ipv6.priority); + break; + case SFLFLOW_EX_SWITCH: putSwitch(receiver, &elem->flowType.sw); break; + case SFLFLOW_EX_ROUTER: putRouter(receiver, &elem->flowType.router); break; + case SFLFLOW_EX_GATEWAY: putGateway(receiver, &elem->flowType.gateway); break; + case SFLFLOW_EX_USER: putUser(receiver, &elem->flowType.user); break; + case SFLFLOW_EX_URL: putUrl(receiver, &elem->flowType.url); break; + case SFLFLOW_EX_MPLS: putMpls(receiver, &elem->flowType.mpls); break; + case SFLFLOW_EX_NAT: putNat(receiver, &elem->flowType.nat); break; + case SFLFLOW_EX_MPLS_TUNNEL: putMplsTunnel(receiver, &elem->flowType.mpls_tunnel); break; + case SFLFLOW_EX_MPLS_VC: putMplsVc(receiver, &elem->flowType.mpls_vc); break; + case SFLFLOW_EX_MPLS_FTN: putMplsFtn(receiver, &elem->flowType.mpls_ftn); break; + case SFLFLOW_EX_MPLS_LDP_FEC: putMplsLdpFec(receiver, &elem->flowType.mpls_ldp_fec); break; + case SFLFLOW_EX_VLAN_TUNNEL: putVlanTunnel(receiver, &elem->flowType.vlan_tunnel); break; + default: + sflError(receiver, "unexpected packet_data_tag"); + return -1; + break; + } + } + } + + // sanity check + assert(((u_char *)receiver->sampleCollector.datap + - (u_char *)receiver->sampleCollector.data + - receiver->sampleCollector.pktlen) == (u_int32_t)packedSize); + + // update the pktlen + receiver->sampleCollector.pktlen = (u_char *)receiver->sampleCollector.datap - (u_char *)receiver->sampleCollector.data; + return packedSize; +} + +/*_________________-----------------------------__________________ + _________________ computeCountersSampleSize __________________ + -----------------_____________________________------------------ +*/ + +static int computeCountersSampleSize(SFLReceiver *receiver, SFL_COUNTERS_SAMPLE_TYPE *cs) +{ + SFLCounters_sample_element *elem = cs->elements; +#ifdef SFL_USE_32BIT_INDEX + u_int siz = 24; /* tag, length, sequence_number, ds_class, ds_index, number of elements */ +#else + u_int siz = 20; /* tag, length, sequence_number, source_id, number of elements */ +#endif + + cs->num_elements = 0; /* we're going to count them again even if this was set by the client */ + for(; elem != NULL; elem = elem->nxt) { + u_int elemSiz = 0; + cs->num_elements++; + siz += 8; /* tag, length */ + switch(elem->tag) { + case SFLCOUNTERS_GENERIC: elemSiz = sizeof(elem->counterBlock.generic); break; + case SFLCOUNTERS_ETHERNET: elemSiz = sizeof(elem->counterBlock.ethernet); break; + case SFLCOUNTERS_TOKENRING: elemSiz = sizeof(elem->counterBlock.tokenring); break; + case SFLCOUNTERS_VG: elemSiz = sizeof(elem->counterBlock.vg); break; + case SFLCOUNTERS_VLAN: elemSiz = sizeof(elem->counterBlock.vlan); break; + default: + sflError(receiver, "unexpected counters_tag"); + return -1; + break; + } + // cache the element size, and accumulate it into the overall FlowSample size + elem->length = elemSiz; + siz += elemSiz; + } + return siz; +} + +/*_________________----------------------------------__________________ + _________________ sfl_receiver_writeCountersSample __________________ + -----------------__________________________________------------------ +*/ + +int sfl_receiver_writeCountersSample(SFLReceiver *receiver, SFL_COUNTERS_SAMPLE_TYPE *cs) +{ + int packedSize; + if(cs == NULL) return -1; + // if the sample pkt is full enough so that this sample might put + // it over the limit, then we should send it now. + if((packedSize = computeCountersSampleSize(receiver, cs)) == -1) return -1; + + // check in case this one sample alone is too big for the datagram + // in fact - if it is even half as big then we should ditch it. Very + // important to avoid overruning the packet buffer. + if(packedSize > (int)(receiver->sFlowRcvrMaximumDatagramSize / 2)) { + sflError(receiver, "counters sample too big for datagram"); + return -1; + } + + if((receiver->sampleCollector.pktlen + packedSize) >= receiver->sFlowRcvrMaximumDatagramSize) + sendSample(receiver); + + receiver->sampleCollector.numSamples++; + +#ifdef SFL_USE_32BIT_INDEX + putNet32(receiver, SFLCOUNTERS_SAMPLE_EXPANDED); +#else + putNet32(receiver, SFLCOUNTERS_SAMPLE); +#endif + + putNet32(receiver, packedSize - 8); // tag and length not included + putNet32(receiver, cs->sequence_number); + +#ifdef SFL_USE_32BIT_INDEX + putNet32(receiver, cs->ds_class); + putNet32(receiver, cs->ds_index); +#else + putNet32(receiver, cs->source_id); +#endif + + putNet32(receiver, cs->num_elements); + + { + SFLCounters_sample_element *elem = cs->elements; + for(; elem != NULL; elem = elem->nxt) { + + putNet32(receiver, elem->tag); + putNet32(receiver, elem->length); // length cached in computeCountersSampleSize() + + switch(elem->tag) { + case SFLCOUNTERS_GENERIC: + putGenericCounters(receiver, &(elem->counterBlock.generic)); + break; + case SFLCOUNTERS_ETHERNET: + // all these counters are 32-bit + putNet32_run(receiver, &elem->counterBlock.ethernet, sizeof(elem->counterBlock.ethernet) / 4); + break; + case SFLCOUNTERS_TOKENRING: + // all these counters are 32-bit + putNet32_run(receiver, &elem->counterBlock.tokenring, sizeof(elem->counterBlock.tokenring) / 4); + break; + case SFLCOUNTERS_VG: + // mixed sizes + putNet32(receiver, elem->counterBlock.vg.dot12InHighPriorityFrames); + putNet64(receiver, elem->counterBlock.vg.dot12InHighPriorityOctets); + putNet32(receiver, elem->counterBlock.vg.dot12InNormPriorityFrames); + putNet64(receiver, elem->counterBlock.vg.dot12InNormPriorityOctets); + putNet32(receiver, elem->counterBlock.vg.dot12InIPMErrors); + putNet32(receiver, elem->counterBlock.vg.dot12InOversizeFrameErrors); + putNet32(receiver, elem->counterBlock.vg.dot12InDataErrors); + putNet32(receiver, elem->counterBlock.vg.dot12InNullAddressedFrames); + putNet32(receiver, elem->counterBlock.vg.dot12OutHighPriorityFrames); + putNet64(receiver, elem->counterBlock.vg.dot12OutHighPriorityOctets); + putNet32(receiver, elem->counterBlock.vg.dot12TransitionIntoTrainings); + putNet64(receiver, elem->counterBlock.vg.dot12HCInHighPriorityOctets); + putNet64(receiver, elem->counterBlock.vg.dot12HCInNormPriorityOctets); + putNet64(receiver, elem->counterBlock.vg.dot12HCOutHighPriorityOctets); + break; + case SFLCOUNTERS_VLAN: + // mixed sizes + putNet32(receiver, elem->counterBlock.vlan.vlan_id); + putNet64(receiver, elem->counterBlock.vlan.octets); + putNet32(receiver, elem->counterBlock.vlan.ucastPkts); + putNet32(receiver, elem->counterBlock.vlan.multicastPkts); + putNet32(receiver, elem->counterBlock.vlan.broadcastPkts); + putNet32(receiver, elem->counterBlock.vlan.discards); + break; + default: + sflError(receiver, "unexpected counters_tag"); + return -1; + break; + } + } + } + // sanity check + assert(((u_char *)receiver->sampleCollector.datap + - (u_char *)receiver->sampleCollector.data + - receiver->sampleCollector.pktlen) == (u_int32_t)packedSize); + + // update the pktlen + receiver->sampleCollector.pktlen = (u_char *)receiver->sampleCollector.datap - (u_char *)receiver->sampleCollector.data; + return packedSize; +} + +/*_________________---------------------------------__________________ + _________________ sfl_receiver_samplePacketsSent __________________ + -----------------_________________________________------------------ +*/ + +u_int32_t sfl_receiver_samplePacketsSent(SFLReceiver *receiver) +{ + return receiver->sampleCollector.packetSeqNo; +} + +/*_________________---------------------------__________________ + _________________ sendSample __________________ + -----------------___________________________------------------ +*/ + +static void sendSample(SFLReceiver *receiver) +{ + /* construct and send out the sample, then reset for the next one... */ + /* first fill in the header with the latest values */ + /* version, agent_address and sub_agent_id were pre-set. */ + u_int32_t hdrIdx = (receiver->agent->myIP.type == SFLADDRESSTYPE_IP_V6) ? 7 : 4; + receiver->sampleCollector.data[hdrIdx++] = htonl(++receiver->sampleCollector.packetSeqNo); /* seq no */ + receiver->sampleCollector.data[hdrIdx++] = htonl((receiver->agent->now - receiver->agent->bootTime) * 1000); /* uptime */ + receiver->sampleCollector.data[hdrIdx++] = htonl(receiver->sampleCollector.numSamples); /* num samples */ + /* send */ + if(receiver->agent->sendFn) (*receiver->agent->sendFn)(receiver->agent->magic, + receiver->agent, + receiver, + (u_char *)receiver->sampleCollector.data, + receiver->sampleCollector.pktlen); + else { +#ifdef SFLOW_DO_SOCKET + /* send it myself */ + if (receiver->sFlowRcvrAddress.type == SFLADDRESSTYPE_IP_V6) { + u_int32_t soclen = sizeof(struct sockaddr_in6); + int result = sendto(receiver->agent->receiverSocket6, + receiver->sampleCollector.data, + receiver->sampleCollector.pktlen, + 0, + (struct sockaddr *)&receiver->receiver6, + soclen); + if(result == -1 && errno != EINTR) sfl_agent_sysError(receiver->agent, "receiver", "IPv6 socket sendto error"); + if(result == 0) sfl_agent_error(receiver->agent, "receiver", "IPv6 socket sendto returned 0"); + } + else { + u_int32_t soclen = sizeof(struct sockaddr_in); + int result = sendto(receiver->agent->receiverSocket4, + receiver->sampleCollector.data, + receiver->sampleCollector.pktlen, + 0, + (struct sockaddr *)&receiver->receiver4, + soclen); + if(result == -1 && errno != EINTR) sfl_agent_sysError(receiver->agent, "receiver", "socket sendto error"); + if(result == 0) sfl_agent_error(receiver->agent, "receiver", "socket sendto returned 0"); + } +#endif + } + + /* reset for the next time */ + resetSampleCollector(receiver); +} + +/*_________________---------------------------__________________ + _________________ resetSampleCollector __________________ + -----------------___________________________------------------ +*/ + +static void resetSampleCollector(SFLReceiver *receiver) +{ + receiver->sampleCollector.pktlen = 0; + receiver->sampleCollector.numSamples = 0; + /* point the datap to just after the header */ + receiver->sampleCollector.datap = (receiver->agent->myIP.type == SFLADDRESSTYPE_IP_V6) ? + (receiver->sampleCollector.data + 10) : (receiver->sampleCollector.data + 7); + + receiver->sampleCollector.pktlen = (u_char *)receiver->sampleCollector.datap - (u_char *)receiver->sampleCollector.data; +} + +/*_________________---------------------------__________________ + _________________ sflError __________________ + -----------------___________________________------------------ +*/ + +static void sflError(SFLReceiver *receiver, char *msg) +{ + sfl_agent_error(receiver->agent, "receiver", msg); + resetSampleCollector(receiver); +} diff --git a/lib/sflow_sampler.c b/lib/sflow_sampler.c new file mode 100644 index 00000000..759b5a22 --- /dev/null +++ b/lib/sflow_sampler.c @@ -0,0 +1,183 @@ +/* Copyright (c) 2002-2009 InMon Corp. Licensed under the terms of the InMon sFlow licence: */ +/* http://www.inmon.com/technology/sflowlicense.txt */ + +#include "sflow_api.h" + + +/*_________________--------------------------__________________ + _________________ sfl_sampler_init __________________ + -----------------__________________________------------------ +*/ + +void sfl_sampler_init(SFLSampler *sampler, SFLAgent *agent, SFLDataSource_instance *pdsi) +{ + /* copy the dsi in case it points to sampler->dsi, which we are about to clear. + (Thanks to Jagjit Choudray of Force 10 Networks for pointing out this bug) */ + SFLDataSource_instance dsi = *pdsi; + + /* preserve the *nxt pointer too, in case we are resetting this poller and it is + already part of the agent's linked list (thanks to Matt Woodly for pointing this out) */ + SFLSampler *nxtPtr = sampler->nxt; + + /* clear everything */ + memset(sampler, 0, sizeof(*sampler)); + + /* restore the linked list ptr */ + sampler->nxt = nxtPtr; + + /* now copy in the parameters */ + sampler->agent = agent; + sampler->dsi = dsi; + + /* set defaults */ + sampler->sFlowFsMaximumHeaderSize = SFL_DEFAULT_HEADER_SIZE; + sampler->sFlowFsPacketSamplingRate = SFL_DEFAULT_SAMPLING_RATE; +} + +/*_________________--------------------------__________________ + _________________ reset __________________ + -----------------__________________________------------------ +*/ + +static void reset(SFLSampler *sampler) +{ + SFLDataSource_instance dsi = sampler->dsi; + sfl_sampler_init(sampler, sampler->agent, &dsi); +} + +/*_________________---------------------------__________________ + _________________ MIB access __________________ + -----------------___________________________------------------ +*/ +u_int32_t sfl_sampler_get_sFlowFsReceiver(SFLSampler *sampler) { + return sampler->sFlowFsReceiver; +} +void sfl_sampler_set_sFlowFsReceiver(SFLSampler *sampler, u_int32_t sFlowFsReceiver) { + sampler->sFlowFsReceiver = sFlowFsReceiver; + if(sFlowFsReceiver == 0) reset(sampler); + else { + /* retrieve and cache a direct pointer to my receiver */ + sampler->myReceiver = sfl_agent_getReceiver(sampler->agent, sampler->sFlowFsReceiver); + } +} +u_int32_t sfl_sampler_get_sFlowFsPacketSamplingRate(SFLSampler *sampler) { + return sampler->sFlowFsPacketSamplingRate; +} +void sfl_sampler_set_sFlowFsPacketSamplingRate(SFLSampler *sampler, u_int32_t sFlowFsPacketSamplingRate) { + sampler->sFlowFsPacketSamplingRate = sFlowFsPacketSamplingRate; +} +u_int32_t sfl_sampler_get_sFlowFsMaximumHeaderSize(SFLSampler *sampler) { + return sampler->sFlowFsMaximumHeaderSize; +} +void sfl_sampler_set_sFlowFsMaximumHeaderSize(SFLSampler *sampler, u_int32_t sFlowFsMaximumHeaderSize) { + sampler->sFlowFsMaximumHeaderSize = sFlowFsMaximumHeaderSize; +} + +/* call this to set a maximum samples-per-second threshold. If the sampler reaches this + threshold it will automatically back off the sampling rate. A value of 0 disables the + mechanism */ +void sfl_sampler_set_backoffThreshold(SFLSampler *sampler, u_int32_t samplesPerSecond) { + sampler->backoffThreshold = samplesPerSecond; +} +u_int32_t sfl_sampler_get_backoffThreshold(SFLSampler *sampler) { + return sampler->backoffThreshold; +} +u_int32_t sfl_sampler_get_samplesLastTick(SFLSampler *sampler) { + return sampler->samplesLastTick; +} + +/*_________________---------------------------------__________________ + _________________ sequence number reset __________________ + -----------------_________________________________------------------ + Used by the agent to indicate a samplePool discontinuity + so that the sflow collector will know to ignore the next delta. +*/ +void sfl_sampler_resetFlowSeqNo(SFLSampler *sampler) { sampler->flowSampleSeqNo = 0; } + + +/*_________________---------------------------__________________ + _________________ sfl_sampler_tick __________________ + -----------------___________________________------------------ +*/ + +void sfl_sampler_tick(SFLSampler *sampler, time_t now) +{ + if(sampler->backoffThreshold && sampler->samplesThisTick > sampler->backoffThreshold) { + /* automatic backoff. If using hardware sampling then this is where you have to + * call out to change the sampling rate and make sure that any other registers/variables + * that hold this value are updated. + */ + sampler->sFlowFsPacketSamplingRate *= 2; + } + sampler->samplesLastTick = sampler->samplesThisTick; + sampler->samplesThisTick = 0; +} + + + +/*_________________------------------------------__________________ + _________________ sfl_sampler_writeFlowSample __________________ + -----------------______________________________------------------ +*/ + +void sfl_sampler_writeFlowSample(SFLSampler *sampler, SFL_FLOW_SAMPLE_TYPE *fs) +{ + if(fs == NULL) return; + sampler->samplesThisTick++; + /* increment the sequence number */ + fs->sequence_number = ++sampler->flowSampleSeqNo; + /* copy the other header fields in */ +#ifdef SFL_USE_32BIT_INDEX + fs->ds_class = SFL_DS_CLASS(sampler->dsi); + fs->ds_index = SFL_DS_INDEX(sampler->dsi); +#else + fs->source_id = SFL_DS_DATASOURCE(sampler->dsi); +#endif + /* the sampling rate may have been set already. */ + if(fs->sampling_rate == 0) fs->sampling_rate = sampler->sFlowFsPacketSamplingRate; + /* the samplePool may be maintained upstream too. */ + if( fs->sample_pool == 0) fs->sample_pool = sampler->samplePool; + /* sent to my receiver */ + if(sampler->myReceiver) sfl_receiver_writeFlowSample(sampler->myReceiver, fs); +} + +#ifdef SFLOW_SOFTWARE_SAMPLING + +/* ================== software sampling ========================*/ + +/*_________________---------------------------__________________ + _________________ nextRandomSkip __________________ + -----------------___________________________------------------ +*/ + +inline static u_int32_t nextRandomSkip(u_int32_t mean) +{ + if(mean == 0 || mean == 1) return 1; + return ((random() % ((2 * mean) - 1)) + 1); +} + +/*_________________---------------------------__________________ + _________________ sfl_sampler_takeSample __________________ + -----------------___________________________------------------ +*/ + +int sfl_sampler_takeSample(SFLSampler *sampler) +{ + if(sampler->skip == 0) { + /* first time - seed the random number generator */ + srandom(SFL_DS_INDEX(sampler->dsi)); + sampler->skip = nextRandomSkip(sampler->sFlowFsPacketSamplingRate); + } + + /* increment the samplePool */ + sampler->samplePool++; + + if(--sampler->skip == 0) { + /* reached zero. Set the next skip and return true. */ + sampler->skip = nextRandomSkip(sampler->sFlowFsPacketSamplingRate); + return 1; + } + return 0; +} + +#endif /* SFLOW_SOFTWARE_SAMPLING */ diff --git a/lib/vlog-modules.def b/lib/vlog-modules.def index 0d44e734..b791525e 100644 --- a/lib/vlog-modules.def +++ b/lib/vlog-modules.def @@ -61,6 +61,7 @@ VLOG_MODULE(proc_net_compat) VLOG_MODULE(process) VLOG_MODULE(rconn) VLOG_MODULE(rtnetlink) +VLOG_MODULE(sflow) VLOG_MODULE(stp) VLOG_MODULE(stats) VLOG_MODULE(status) diff --git a/ofproto/automake.mk b/ofproto/automake.mk index 87a0fa68..3c189771 100644 --- a/ofproto/automake.mk +++ b/ofproto/automake.mk @@ -21,6 +21,8 @@ ofproto_libofproto_a_SOURCES = \ ofproto/netflow.h \ ofproto/ofproto.c \ ofproto/ofproto.h \ + ofproto/ofproto-sflow.c \ + ofproto/ofproto-sflow.h \ ofproto/pktbuf.c \ ofproto/pktbuf.h \ ofproto/pinsched.c \ diff --git a/ofproto/collectors.c b/ofproto/collectors.c index f7cb1dbe..4589f329 100644 --- a/ofproto/collectors.c +++ b/ofproto/collectors.c @@ -121,3 +121,9 @@ collectors_send(const struct collectors *c, const void *payload, size_t n) } } } + +int +collectors_count(const struct collectors *c) +{ + return c->n_fds; +} diff --git a/ofproto/collectors.h b/ofproto/collectors.h index a4abb631..ac70f375 100644 --- a/ofproto/collectors.h +++ b/ofproto/collectors.h @@ -28,4 +28,6 @@ void collectors_destroy(struct collectors *); void collectors_send(const struct collectors *, const void *, size_t); +int collectors_count(const struct collectors *); + #endif /* collectors.h */ diff --git a/ofproto/ofproto-sflow.c b/ofproto/ofproto-sflow.c new file mode 100644 index 00000000..b37db42d --- /dev/null +++ b/ofproto/ofproto-sflow.c @@ -0,0 +1,607 @@ +/* + * Copyright (c) 2009, 2010 InMon Corp. + * Copyright (c) 2009 Nicira Networks. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <config.h> +#include "ofproto-sflow.h" +#include <inttypes.h> +#include <stdlib.h> +#include "collectors.h" +#include "dpif.h" +#include "compiler.h" +#include "netdev.h" +#include "ofpbuf.h" +#include "ofproto.h" +#include "poll-loop.h" +#include "port-array.h" +#include "sflow_api.h" +#include "socket-util.h" +#include "timeval.h" + +#define THIS_MODULE VLM_sflow +#include "vlog.h" + +struct ofproto_sflow_port { + struct netdev *netdev; /* Underlying network device, for stats. */ + SFLDataSource_instance dsi; /* sFlow library's notion of port number. */ +}; + +struct ofproto_sflow { + struct ofproto *ofproto; + struct collectors *collectors; + SFLAgent *sflow_agent; + struct ofproto_sflow_options *options; + struct dpif *dpif; + time_t next_tick; + size_t n_flood, n_all; + struct port_array ports; /* Indexed by ODP port number. */ +}; + +#define RECEIVER_INDEX 1 + +static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5); + +static bool +nullable_string_is_equal(const char *a, const char *b) +{ + return a ? b && !strcmp(a, b) : !b; +} + +static bool +ofproto_sflow_options_equal(const struct ofproto_sflow_options *a, + const struct ofproto_sflow_options *b) +{ + return (svec_equal(&a->targets, &b->targets) + && a->sampling_rate == b->sampling_rate + && a->polling_interval == b->polling_interval + && a->header_len == b->header_len + && a->sub_id == b->sub_id + && nullable_string_is_equal(a->agent_device, b->agent_device) + && nullable_string_is_equal(a->control_ip, b->control_ip)); +} + +static struct ofproto_sflow_options * +ofproto_sflow_options_clone(const struct ofproto_sflow_options *old) +{ + struct ofproto_sflow_options *new = xmemdup(old, sizeof *old); + svec_clone(&new->targets, &old->targets); + new->agent_device = old->agent_device ? xstrdup(old->agent_device) : NULL; + new->control_ip = old->control_ip ? xstrdup(old->control_ip) : NULL; + return new; +} + +static void +ofproto_sflow_options_destroy(struct ofproto_sflow_options *options) +{ + if (options) { + svec_destroy(&options->targets); + free(options->agent_device); + free(options->control_ip); + free(options); + } +} + +/* sFlow library callback to allocate memory. */ +static void * +sflow_agent_alloc_cb(void *magic UNUSED, SFLAgent *agent UNUSED, size_t bytes) +{ + return calloc(1, bytes); +} + +/* sFlow library callback to free memory. */ +static int +sflow_agent_free_cb(void *magic UNUSED, SFLAgent *agent UNUSED, void *obj) +{ + free(obj); + return 0; +} + +/* sFlow library callback to report error. */ +static void +sflow_agent_error_cb(void *magic UNUSED, SFLAgent *agent UNUSED, char *msg) +{ + VLOG_WARN("sFlow agent error: %s", msg); +} + +/* sFlow library callback to send datagram. */ +static void +sflow_agent_send_packet_cb(void *os_, SFLAgent *agent UNUSED, + SFLReceiver *receiver UNUSED, u_char *pkt, + uint32_t pktLen) +{ + struct ofproto_sflow *os = os_; + collectors_send(os->collectors, pkt, pktLen); +} + +static void +sflow_agent_get_counters(void *os_, SFLPoller *poller, + SFL_COUNTERS_SAMPLE_TYPE *cs) +{ + struct ofproto_sflow *os = os_; + SFLCounters_sample_element elem; + struct ofproto_sflow_port *osp; + SFLIf_counters *counters; + struct netdev_stats stats; + enum netdev_flags flags; + uint32_t current; + + osp = port_array_get(&os->ports, poller->bridgePort); + if (!osp) { + return; + } + + elem.tag = SFLCOUNTERS_GENERIC; + counters = &elem.counterBlock.generic; + counters->ifIndex = SFL_DS_INDEX(poller->dsi); + counters->ifType = 6; + if (!netdev_get_features(osp->netdev, ¤t, NULL, NULL, NULL)) { + /* The values of ifDirection come from MAU MIB (RFC 2668): 0 = unknown, + 1 = full-duplex, 2 = half-duplex, 3 = in, 4=out */ + counters->ifSpeed = netdev_features_to_bps(current); + counters->ifDirection = (netdev_features_is_full_duplex(current) + ? 1 : 2); + } else { + counters->ifSpeed = 100000000; + counters->ifDirection = 0; + } + if (!netdev_get_flags(osp->netdev, &flags) && flags & NETDEV_UP) { + bool carrier; + + counters->ifStatus = 1; /* ifAdminStatus up. */ + if (!netdev_get_carrier(osp->netdev, &carrier) && carrier) { + counters->ifStatus |= 2; /* ifOperStatus us. */ + } + } else { + counters->ifStatus = 0; /* Down. */ + } + + /* XXX + 1. Is the multicast counter filled in? + 2. Does the multicast counter include broadcasts? + 3. Does the rx_packets counter include multicasts/broadcasts? + */ + netdev_get_stats(osp->netdev, &stats); + counters->ifInOctets = stats.rx_bytes; + counters->ifInUcastPkts = stats.rx_packets; + counters->ifInMulticastPkts = stats.multicast; + counters->ifInBroadcastPkts = -1; + counters->ifInDiscards = stats.rx_dropped; + counters->ifInErrors = stats.rx_errors; + counters->ifInUnknownProtos = -1; + counters->ifOutOctets = stats.tx_bytes; + counters->ifOutUcastPkts = stats.tx_packets; + counters->ifOutMulticastPkts = -1; + counters->ifOutBroadcastPkts = -1; + counters->ifOutDiscards = stats.tx_dropped; + counters->ifOutErrors = stats.tx_errors; + counters->ifPromiscuousMode = 0; + + SFLADD_ELEMENT(cs, &elem); + sfl_poller_writeCountersSample(poller, cs); +} + +/* Obtains an address to use for the local sFlow agent and stores it into + * '*agent_addr'. Returns true if successful, false on failure. + * + * The sFlow agent address should be a local IP address that is persistent and + * reachable over the network, if possible. The IP address associated with + * 'agent_device' is used if it has one, and otherwise 'control_ip', the IP + * address used to talk to the controller. */ +static bool +sflow_choose_agent_address(const char *agent_device, const char *control_ip, + SFLAddress *agent_addr) +{ + struct in_addr in4; + + memset(agent_addr, 0, sizeof *agent_addr); + agent_addr->type = SFLADDRESSTYPE_IP_V4; + + if (agent_device) { + struct netdev *netdev; + + if (!netdev_open(agent_device, NETDEV_ETH_TYPE_NONE, &netdev)) { + int error = netdev_get_in4(netdev, &in4, NULL); + netdev_close(netdev); + if (!error) { + goto success; + } + } + } + + if (control_ip && !lookup_ip(control_ip, &in4)) { + goto success; + } + + VLOG_ERR("could not determine IP address for sFlow agent"); + return false; + +success: + agent_addr->address.ip_v4.addr = in4.s_addr; + return true; +} + +void +ofproto_sflow_clear(struct ofproto_sflow *os) +{ + struct ofproto_sflow_port *osp; + unsigned int odp_port; + + if (os->sflow_agent) { + sfl_agent_release(os->sflow_agent); + os->sflow_agent = NULL; + } + collectors_destroy(os->collectors); + os->collectors = NULL; + ofproto_sflow_options_destroy(os->options); + os->options = NULL; + + PORT_ARRAY_FOR_EACH (osp, &os->ports, odp_port) { + ofproto_sflow_del_port(os, odp_port); + } + port_array_clear(&os->ports); + + /* Turn off sampling to save CPU cycles. */ + dpif_set_sflow_probability(os->dpif, 0); +} + +bool +ofproto_sflow_is_enabled(const struct ofproto_sflow *os) +{ + return os->collectors != NULL; +} + +struct ofproto_sflow * +ofproto_sflow_create(struct dpif *dpif) +{ + struct ofproto_sflow *os; + + os = xcalloc(1, sizeof *os); + os->dpif = dpif; + os->next_tick = time_now() + 1; + port_array_init(&os->ports); + return os; +} + +void +ofproto_sflow_destroy(struct ofproto_sflow *os) +{ + if (os) { + ofproto_sflow_clear(os); + port_array_destroy(&os->ports); + free(os); + } +} + +static void +ofproto_sflow_add_poller(struct ofproto_sflow *os, + struct ofproto_sflow_port *osp, uint16_t odp_port) +{ + SFLPoller *poller = sfl_agent_addPoller(os->sflow_agent, &osp->dsi, os, + sflow_agent_get_counters); + sfl_poller_set_sFlowCpInterval(poller, os->options->polling_interval); + sfl_poller_set_sFlowCpReceiver(poller, RECEIVER_INDEX); + sfl_poller_set_bridgePort(poller, odp_port); +} + +static void +ofproto_sflow_add_sampler(struct ofproto_sflow *os, + struct ofproto_sflow_port *osp, + u_int32_t sampling_rate, u_int32_t header_len) +{ + SFLSampler *sampler = sfl_agent_addSampler(os->sflow_agent, &osp->dsi); + sfl_sampler_set_sFlowFsPacketSamplingRate(sampler, sampling_rate); + sfl_sampler_set_sFlowFsMaximumHeaderSize(sampler, header_len); + sfl_sampler_set_sFlowFsReceiver(sampler, RECEIVER_INDEX); +} + +void +ofproto_sflow_add_port(struct ofproto_sflow *os, uint16_t odp_port, + const char *netdev_name) +{ + struct ofproto_sflow_port *osp; + struct netdev *netdev; + uint32_t ifindex; + int error; + + ofproto_sflow_del_port(os, odp_port); + + /* Open network device. */ + error = netdev_open(netdev_name, NETDEV_ETH_TYPE_NONE, &netdev); + if (error) { + VLOG_WARN_RL(&rl, "failed to open network device \"%s\": %s", + netdev_name, strerror(error)); + return; + } + + /* Add to table of ports. */ + osp = xmalloc(sizeof *osp); + osp->netdev = netdev; + ifindex = netdev_get_ifindex(netdev); + if (ifindex <= 0) { + ifindex = (os->sflow_agent->subId << 16) + odp_port; + } + SFL_DS_SET(osp->dsi, 0, ifindex, 0); + port_array_set(&os->ports, odp_port, osp); + + /* Add poller. */ + if (os->sflow_agent) { + ofproto_sflow_add_poller(os, osp, odp_port); + } +} + +void +ofproto_sflow_del_port(struct ofproto_sflow *os, uint16_t odp_port) +{ + struct ofproto_sflow_port *osp = port_array_get(&os->ports, odp_port); + if (osp) { + if (os->sflow_agent) { + sfl_agent_removePoller(os->sflow_agent, &osp->dsi); + sfl_agent_removeSampler(os->sflow_agent, &osp->dsi); + } + netdev_close(osp->netdev); + free(osp); + port_array_set(&os->ports, odp_port, NULL); + } +} + +void +ofproto_sflow_set_options(struct ofproto_sflow *os, + const struct ofproto_sflow_options *options) +{ + struct ofproto_sflow_port *osp; + bool options_changed; + SFLReceiver *receiver; + unsigned int odp_port; + SFLAddress agentIP; + time_t now; + int error; + + if (!options->targets.n || !options->sampling_rate) { + /* No point in doing any work if there are no targets or nothing to + * sample. */ + ofproto_sflow_clear(os); + return; + } + + options_changed = (!os->options + || !ofproto_sflow_options_equal(options, os->options)); + + /* Configure collectors if options have changed or if we're shortchanged in + * collectors (which indicates that opening one or more of the configured + * collectors failed, so that we should retry). */ + if (options_changed + || collectors_count(os->collectors) < options->targets.n) { + collectors_destroy(os->collectors); + error = collectors_create(&options->targets, + SFL_DEFAULT_COLLECTOR_PORT, &os->collectors); + if (os->collectors == NULL) { + VLOG_WARN_RL(&rl, "no collectors could be initialized, " + "sFlow disabled"); + ofproto_sflow_clear(os); + return; + } + } + + /* Avoid reconfiguring if options didn't change. */ + if (!options_changed) { + return; + } + ofproto_sflow_options_destroy(os->options); + os->options = ofproto_sflow_options_clone(options); + + /* Choose agent IP address. */ + if (!sflow_choose_agent_address(options->agent_device, + options->control_ip, &agentIP)) { + ofproto_sflow_clear(os); + return; + } + + /* Create agent. */ + VLOG_INFO("creating sFlow agent %d", options->sub_id); + if (os->sflow_agent) { + sfl_agent_release(os->sflow_agent); + } + os->sflow_agent = xcalloc(1, sizeof *os->sflow_agent); + now = time_now(); + sfl_agent_init(os->sflow_agent, + &agentIP, + options->sub_id, + now, /* Boot time. */ + now, /* Current time. */ + os, /* Pointer supplied to callbacks. */ + sflow_agent_alloc_cb, + sflow_agent_free_cb, + sflow_agent_error_cb, + sflow_agent_send_packet_cb); + + receiver = sfl_agent_addReceiver(os->sflow_agent); + sfl_receiver_set_sFlowRcvrOwner(receiver, "Open vSwitch sFlow"); + sfl_receiver_set_sFlowRcvrTimeout(receiver, 0xffffffff); + + /* Set the sampling_rate down in the datapath. */ + dpif_set_sflow_probability(os->dpif, + MAX(1, UINT32_MAX / options->sampling_rate)); + + /* Add samplers and pollers for the currently known ports. */ + PORT_ARRAY_FOR_EACH (osp, &os->ports, odp_port) { + ofproto_sflow_add_sampler(os, osp, + options->sampling_rate, options->header_len); + } +} + +static int +ofproto_sflow_odp_port_to_ifindex(const struct ofproto_sflow *os, + uint16_t odp_port) +{ + struct ofproto_sflow_port *osp = port_array_get(&os->ports, odp_port); + return osp ? SFL_DS_INDEX(osp->dsi) : 0; +} + +void +ofproto_sflow_received(struct ofproto_sflow *os, struct odp_msg *msg) +{ + SFL_FLOW_SAMPLE_TYPE fs; + SFLFlow_sample_element hdrElem; + SFLSampled_header *header; + SFLFlow_sample_element switchElem; + SFLSampler *sampler; + const struct odp_sflow_sample_header *hdr; + const union odp_action *actions; + struct ofpbuf payload; + size_t n_actions, n_outputs; + size_t min_size; + flow_t flow; + size_t i; + + /* Get odp_sflow_sample_header. */ + min_size = sizeof *msg + sizeof *hdr; + if (min_size > msg->length) { + VLOG_WARN_RL(&rl, "sFlow packet too small (%"PRIu32" < %zu)", + msg->length, min_size); + return; + } + hdr = (const struct odp_sflow_sample_header *) (msg + 1); + + /* Get actions. */ + n_actions = hdr->n_actions; + if (n_actions > 65536 / sizeof *actions) { + VLOG_WARN_RL(&rl, "too many actions in sFlow packet (%zu > %zu)", + 65536 / sizeof *actions, n_actions); + return; + } + min_size += n_actions * sizeof *actions; + if (min_size > msg->length) { + VLOG_WARN_RL(&rl, "sFlow packet with %zu actions too small " + "(%"PRIu32" < %zu)", + n_actions, msg->length, min_size); + return; + } + actions = (const union odp_action *) (hdr + 1); + + /* Get packet payload and extract flow. */ + payload.data = (union odp_action *) (actions + n_actions); + payload.size = msg->length - min_size; + flow_extract(&payload, msg->port, &flow); + + /* Build a flow sample */ + memset(&fs, 0, sizeof fs); + fs.input = ofproto_sflow_odp_port_to_ifindex(os, msg->port); + fs.output = 0; /* Filled in correctly below. */ + fs.sample_pool = hdr->sample_pool; + + /* We are going to give it to the sampler that represents this input port. + * By implementing "ingress-only" sampling like this we ensure that we + * never have to offer the same sample to more than one sampler. */ + sampler = sfl_agent_getSamplerByIfIndex(os->sflow_agent, fs.input); + if (!sampler) { + VLOG_WARN_RL(&rl, "no sampler for input ifIndex (%"PRIu32")", + fs.input); + return; + } + + /* Sampled header. */ + memset(&hdrElem, 0, sizeof hdrElem); + hdrElem.tag = SFLFLOW_HEADER; + header = &hdrElem.flowType.header; + header->header_protocol = SFLHEADER_ETHERNET_ISO8023; + header->frame_length = payload.size; + header->stripped = 4; /* Ethernet FCS stripped off. */ + header->header_length = MIN(payload.size, + sampler->sFlowFsMaximumHeaderSize); + header->header_bytes = payload.data; + + /* Add extended switch element. */ + memset(&switchElem, 0, sizeof(switchElem)); + switchElem.tag = SFLFLOW_EX_SWITCH; + switchElem.flowType.sw.src_vlan = ntohs(flow.dl_vlan); + switchElem.flowType.sw.src_priority = -1; /* XXX */ + switchElem.flowType.sw.dst_vlan = -1; /* Filled in correctly below. */ + switchElem.flowType.sw.dst_priority = switchElem.flowType.sw.src_priority; + + /* Figure out the output ports. */ + n_outputs = 0; + for (i = 0; i < n_actions; i++) { + const union odp_action *a = &actions[i]; + + switch (a->type) { + case ODPAT_OUTPUT: + fs.output = ofproto_sflow_odp_port_to_ifindex(os, a->output.port); + n_outputs++; + break; + + case ODPAT_OUTPUT_GROUP: + n_outputs += (a->output_group.group == DP_GROUP_FLOOD ? os->n_flood + : a->output_group.group == DP_GROUP_ALL ? os->n_all + : 0); + break; + + case ODPAT_SET_VLAN_VID: + switchElem.flowType.sw.dst_vlan = ntohs(a->vlan_vid.vlan_vid); + break; + + case ODPAT_SET_VLAN_PCP: + switchElem.flowType.sw.dst_priority = a->vlan_pcp.vlan_pcp; + break; + + default: + break; + } + } + + /* Set output port, as defined by http://www.sflow.org/sflow_version_5.txt + (search for "Input/output port information"). */ + if (!n_outputs) { + /* This value indicates that the packet was dropped for an unknown + * reason. */ + fs.output = 0x40000000 | 256; + } else if (n_outputs > 1 || !fs.output) { + /* Setting the high bit means "multiple output ports". */ + fs.output = 0x80000000 | n_outputs; + } + + /* Submit the flow sample to be encoded into the next datagram. */ + SFLADD_ELEMENT(&fs, &hdrElem); + SFLADD_ELEMENT(&fs, &switchElem); + sfl_sampler_writeFlowSample(sampler, &fs); +} + +void +ofproto_sflow_set_group_sizes(struct ofproto_sflow *os, + size_t n_flood, size_t n_all) +{ + os->n_flood = n_flood; + os->n_all = n_all; +} + +void +ofproto_sflow_run(struct ofproto_sflow *os) +{ + if (ofproto_sflow_is_enabled(os)) { + time_t now = time_now(); + if (now >= os->next_tick) { + sfl_agent_tick(os->sflow_agent, now); + os->next_tick = now + 1; + } + } +} + +void +ofproto_sflow_wait(struct ofproto_sflow *os) +{ + if (ofproto_sflow_is_enabled(os)) { + poll_timer_wait(os->next_tick * 1000 - time_msec()); + } +} diff --git a/ofproto/ofproto-sflow.h b/ofproto/ofproto-sflow.h new file mode 100644 index 00000000..ec86d115 --- /dev/null +++ b/ofproto/ofproto-sflow.h @@ -0,0 +1,46 @@ +/* + * Copyright (c) 2009 InMon Corp. + * Copyright (c) 2009 Nicira Networks. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef OFPROTO_SFLOW_H +#define OFPROTO_SFLOW_H 1 + +#include <stdint.h> +#include "svec.h" + +struct dpif; +struct odp_msg; +struct ofproto_sflow_options; + +struct ofproto_sflow *ofproto_sflow_create(struct dpif *); +void ofproto_sflow_destroy(struct ofproto_sflow *); +void ofproto_sflow_set_options(struct ofproto_sflow *, + const struct ofproto_sflow_options *); +void ofproto_sflow_clear(struct ofproto_sflow *); +bool ofproto_sflow_is_enabled(const struct ofproto_sflow *); + +void ofproto_sflow_add_port(struct ofproto_sflow *, uint16_t odp_port, + const char *netdev_name); +void ofproto_sflow_del_port(struct ofproto_sflow *, uint16_t odp_port); +void ofproto_sflow_set_group_sizes(struct ofproto_sflow *, + size_t n_flood, size_t n_all); + +void ofproto_sflow_run(struct ofproto_sflow *); +void ofproto_sflow_wait(struct ofproto_sflow *); + +void ofproto_sflow_received(struct ofproto_sflow *, struct odp_msg *); + +#endif /* ofproto/ofproto-sflow.h */ diff --git a/ofproto/ofproto.c b/ofproto/ofproto.c index 4995bbec..43054fa3 100644 --- a/ofproto/ofproto.c +++ b/ofproto/ofproto.c @@ -35,6 +35,7 @@ #include "netflow.h" #include "odp-util.h" #include "ofp-print.h" +#include "ofproto-sflow.h" #include "ofpbuf.h" #include "openflow/nicira-ext.h" #include "openflow/openflow.h" @@ -60,10 +61,7 @@ #define THIS_MODULE VLM_ofproto #include "vlog.h" -enum { - DP_GROUP_FLOOD = 0, - DP_GROUP_ALL = 1 -}; +#include "sflow_api.h" enum { TABLEID_HASH = 0, @@ -209,6 +207,7 @@ struct ofproto { struct pinsched *miss_sched, *action_sched; struct executer *executer; struct netflow *netflow; + struct ofproto_sflow *sflow; /* Flow table. */ struct classifier cls; @@ -253,7 +252,8 @@ static void handle_odp_msg(struct ofproto *, struct ofpbuf *); static void handle_openflow(struct ofconn *, struct ofproto *, struct ofpbuf *); -static void refresh_port_group(struct ofproto *, unsigned int group); +static void refresh_port_groups(struct ofproto *); + static void update_port(struct ofproto *, const char *devname); static int init_ports(struct ofproto *); static void reinit_ports(struct ofproto *); @@ -282,7 +282,7 @@ ofproto_create(const char *datapath, const struct ofhooks *ofhooks, void *aux, dpif_close(dpif); return error; } - error = dpif_recv_set_mask(dpif, ODPL_MISS | ODPL_ACTION); + error = dpif_recv_set_mask(dpif, ODPL_MISS | ODPL_ACTION | ODPL_SFLOW); if (error) { VLOG_ERR("failed to listen on datapath %s: %s", datapath, strerror(error)); @@ -316,6 +316,7 @@ ofproto_create(const char *datapath, const struct ofhooks *ofhooks, void *aux, p->miss_sched = p->action_sched = NULL; p->executer = NULL; p->netflow = NULL; + p->sflow = NULL; /* Initialize flow table. */ classifier_init(&p->cls); @@ -549,6 +550,30 @@ ofproto_set_netflow(struct ofproto *ofproto, } void +ofproto_set_sflow(struct ofproto *ofproto, + const struct ofproto_sflow_options *oso) +{ + struct ofproto_sflow *os = ofproto->sflow; + if (oso) { + if (!os) { + struct ofport *ofport; + unsigned int odp_port; + + os = ofproto->sflow = ofproto_sflow_create(ofproto->dpif); + refresh_port_groups(ofproto); + PORT_ARRAY_FOR_EACH (ofport, &ofproto->ports, odp_port) { + ofproto_sflow_add_port(os, odp_port, + netdev_get_name(ofport->netdev)); + } + } + ofproto_sflow_set_options(os, oso); + } else { + ofproto_sflow_destroy(os); + ofproto->sflow = NULL; + } +} + +void ofproto_set_failure(struct ofproto *ofproto, bool fail_open) { if (fail_open) { @@ -718,6 +743,7 @@ ofproto_destroy(struct ofproto *p) pinsched_destroy(p->action_sched); executer_destroy(p->executer); netflow_destroy(p->netflow); + ofproto_sflow_destroy(p->sflow); switch_status_unregister(p->ss_cat); @@ -870,6 +896,9 @@ ofproto_run1(struct ofproto *p) if (p->netflow) { netflow_run(p->netflow); } + if (p->sflow) { + ofproto_sflow_run(p->sflow); + } return 0; } @@ -926,6 +955,9 @@ ofproto_wait(struct ofproto *p) if (p->executer) { executer_wait(p->executer); } + if (p->sflow) { + ofproto_sflow_wait(p->sflow); + } if (!tag_set_is_empty(&p->revalidate_set)) { poll_immediate_wake(); } @@ -1066,7 +1098,7 @@ reinit_ports(struct ofproto *p) svec_destroy(&devnames); } -static void +static size_t refresh_port_group(struct ofproto *p, unsigned int group) { uint16_t *ports; @@ -1085,13 +1117,18 @@ refresh_port_group(struct ofproto *p, unsigned int group) } dpif_port_group_set(p->dpif, group, ports, n_ports); free(ports); + + return n_ports; } static void refresh_port_groups(struct ofproto *p) { - refresh_port_group(p, DP_GROUP_FLOOD); - refresh_port_group(p, DP_GROUP_ALL); + size_t n_flood = refresh_port_group(p, DP_GROUP_FLOOD); + size_t n_all = refresh_port_group(p, DP_GROUP_ALL); + if (p->sflow) { + ofproto_sflow_set_group_sizes(p->sflow, n_flood, n_all); + } } static struct ofport * @@ -1190,19 +1227,29 @@ send_port_status(struct ofproto *p, const struct ofport *ofport, static void ofport_install(struct ofproto *p, struct ofport *ofport) { + uint16_t odp_port = ofp_port_to_odp_port(ofport->opp.port_no); + const char *netdev_name = (const char *) ofport->opp.name; + netdev_monitor_add(p->netdev_monitor, ofport->netdev); - port_array_set(&p->ports, ofp_port_to_odp_port(ofport->opp.port_no), - ofport); - shash_add(&p->port_by_name, (char *) ofport->opp.name, ofport); + port_array_set(&p->ports, odp_port, ofport); + shash_add(&p->port_by_name, netdev_name, ofport); + if (p->sflow) { + ofproto_sflow_add_port(p->sflow, odp_port, netdev_name); + } } static void ofport_remove(struct ofproto *p, struct ofport *ofport) { + uint16_t odp_port = ofp_port_to_odp_port(ofport->opp.port_no); + netdev_monitor_remove(p->netdev_monitor, ofport->netdev); - port_array_set(&p->ports, ofp_port_to_odp_port(ofport->opp.port_no), NULL); + port_array_set(&p->ports, odp_port, NULL); shash_delete(&p->port_by_name, shash_find(&p->port_by_name, (char *) ofport->opp.name)); + if (p->sflow) { + ofproto_sflow_del_port(p->sflow, odp_port); + } } static void @@ -2291,7 +2338,7 @@ update_port_config(struct ofproto *p, struct ofport *port, #undef REVALIDATE_BITS if (mask & OFPPC_NO_FLOOD) { port->opp.config ^= OFPPC_NO_FLOOD; - refresh_port_group(p, DP_GROUP_FLOOD); + refresh_port_groups(p); } if (mask & OFPPC_NO_PACKET_IN) { port->opp.config ^= OFPPC_NO_PACKET_IN; @@ -3108,7 +3155,7 @@ handle_openflow(struct ofconn *ofconn, struct ofproto *p, } static void -handle_odp_msg(struct ofproto *p, struct ofpbuf *packet) +handle_odp_miss_msg(struct ofproto *p, struct ofpbuf *packet) { struct odp_msg *msg = packet->data; uint16_t in_port = odp_port_to_ofp_port(msg->port); @@ -3116,14 +3163,6 @@ handle_odp_msg(struct ofproto *p, struct ofpbuf *packet) struct ofpbuf payload; flow_t flow; - /* Handle controller actions. */ - if (msg->type == _ODPL_ACTION_NR) { - COVERAGE_INC(ofproto_ctlr_action); - pinsched_send(p->action_sched, in_port, packet, - send_packet_in_action, p); - return; - } - payload.data = msg + 1; payload.size = msg->length - sizeof *msg; flow_extract(&payload, msg->port, &flow); @@ -3193,6 +3232,36 @@ handle_odp_msg(struct ofproto *p, struct ofpbuf *packet) ofpbuf_delete(packet); } } + +static void +handle_odp_msg(struct ofproto *p, struct ofpbuf *packet) +{ + struct odp_msg *msg = packet->data; + + switch (msg->type) { + case _ODPL_ACTION_NR: + COVERAGE_INC(ofproto_ctlr_action); + pinsched_send(p->action_sched, odp_port_to_ofp_port(msg->port), packet, + send_packet_in_action, p); + break; + + case _ODPL_SFLOW_NR: + if (p->sflow) { + ofproto_sflow_received(p->sflow, msg); + } + ofpbuf_delete(packet); + break; + + case _ODPL_MISS_NR: + handle_odp_miss_msg(p, packet); + break; + + default: + VLOG_WARN_RL(&rl, "received ODP message of unexpected type %"PRIu32, + msg->type); + break; + } +} static void revalidate_cb(struct cls_rule *sub_, void *cbdata_) diff --git a/ofproto/ofproto.h b/ofproto/ofproto.h index 50dd5d5b..6377e51e 100644 --- a/ofproto/ofproto.h +++ b/ofproto/ofproto.h @@ -29,6 +29,11 @@ struct ofhooks; struct ofproto; struct svec; +enum { + DP_GROUP_FLOOD = 0, + DP_GROUP_ALL = 1 +}; + struct ofexpired { flow_t flow; uint64_t packet_count; /* Packets from subrules. */ @@ -36,6 +41,16 @@ struct ofexpired { long long int used; /* Last-used time (0 if never used). */ }; +struct ofproto_sflow_options { + struct svec targets; + uint32_t sampling_rate; + uint32_t polling_interval; + uint32_t header_len; + uint32_t sub_id; + char *agent_device; + char *control_ip; +}; + int ofproto_create(const char *datapath, const struct ofhooks *, void *aux, struct ofproto **ofprotop); void ofproto_destroy(struct ofproto *); @@ -62,6 +77,7 @@ int ofproto_set_listeners(struct ofproto *, const struct svec *listeners); int ofproto_set_snoops(struct ofproto *, const struct svec *snoops); int ofproto_set_netflow(struct ofproto *, const struct netflow_options *nf_options); +void ofproto_set_sflow(struct ofproto *, const struct ofproto_sflow_options *); void ofproto_set_failure(struct ofproto *, bool fail_open); void ofproto_set_rate_limit(struct ofproto *, int rate_limit, int burst_limit); int ofproto_set_stp(struct ofproto *, bool enable_stp); diff --git a/utilities/automake.mk b/utilities/automake.mk index 9ac12c92..1a9d4925 100644 --- a/utilities/automake.mk +++ b/utilities/automake.mk @@ -80,6 +80,7 @@ utilities_ovs_ofctl_LDADD = lib/libopenvswitch.a $(FAULT_LIBS) $(SSL_LIBS) utilities_ovs_openflowd_SOURCES = utilities/ovs-openflowd.c utilities_ovs_openflowd_LDADD = \ ofproto/libofproto.a \ + lib/libsflow.a \ lib/libopenvswitch.a \ $(FAULT_LIBS) \ $(SSL_LIBS) diff --git a/vswitchd/automake.mk b/vswitchd/automake.mk index 8e27fc2f..d810c830 100644 --- a/vswitchd/automake.mk +++ b/vswitchd/automake.mk @@ -21,6 +21,7 @@ vswitchd_ovs_vswitchd_SOURCES = \ vswitchd/xenserver.h vswitchd_ovs_vswitchd_LDADD = \ ofproto/libofproto.a \ + lib/libsflow.a \ lib/libopenvswitch.a \ $(FAULT_LIBS) \ $(SSL_LIBS) diff --git a/vswitchd/bridge.c b/vswitchd/bridge.c index dbcf3125..3b7ec51f 100644 --- a/vswitchd/bridge.c +++ b/vswitchd/bridge.c @@ -61,6 +61,7 @@ #include "vconn-ssl.h" #include "xenserver.h" #include "xtoxll.h" +#include "sflow_api.h" #define THIS_MODULE VLM_bridge #include "vlog.h" @@ -210,6 +211,7 @@ static uint64_t bridge_pick_datapath_id(struct bridge *, const uint8_t bridge_ea[ETH_ADDR_LEN], struct iface *hw_addr_iface); static struct iface *bridge_get_local_iface(struct bridge *); +static const char *bridge_get_controller(const struct bridge *br); static uint64_t dpid_from_hash(const void *, size_t nbytes); static void bridge_unixctl_fdb_show(struct unixctl_conn *, const char *args); @@ -527,6 +529,7 @@ bridge_reconfigure(void) struct svec old_br, new_br; struct bridge *br, *next; size_t i; + int sflow_bridge_number; COVERAGE_INC(bridge_reconfigure); @@ -646,6 +649,7 @@ bridge_reconfigure(void) svec_destroy(&want_ifaces); svec_destroy(&add_ifaces); } + sflow_bridge_number = 0; LIST_FOR_EACH (br, struct bridge, node, &all_bridges) { uint8_t ea[8]; uint64_t dpid; @@ -716,6 +720,42 @@ bridge_reconfigure(void) } svec_destroy(&nf_options.collectors); + if (cfg_has("sflow.%s.host", br->name)) { + struct ofproto_sflow_options oso; + + svec_init(&oso.targets); + cfg_get_all_keys(&oso.targets, "sflow.%s.host", br->name); + + oso.sampling_rate = SFL_DEFAULT_SAMPLING_RATE; + if (cfg_has("sflow.%s.sampling", br->name)) { + oso.sampling_rate = cfg_get_int(0, "sflow.%s.sampling", + br->name); + } + + oso.polling_interval = SFL_DEFAULT_POLLING_INTERVAL; + if (cfg_has("sflow.%s.polling", br->name)) { + oso.polling_interval = cfg_get_int(0, "sflow.%s.polling", + br->name); + } + + oso.header_len = SFL_DEFAULT_HEADER_SIZE; + if (cfg_has("sflow.%s.header", br->name)) { + oso.header_len = cfg_get_int(0, "sflow.%s.header", br->name); + } + + oso.sub_id = sflow_bridge_number++; + oso.agent_device = (char *) cfg_get_string(0, "sflow.%s.agent", + br->name); + oso.control_ip = (char *) cfg_get_string(0, + "bridge.%s.controller.ip", + br->name); + ofproto_set_sflow(br->ofproto, &oso); + + svec_destroy(&oso.targets); + } else { + ofproto_set_sflow(br->ofproto, NULL); + } + /* Update the controller and related settings. It would be more * straightforward to call this from bridge_reconfigure_one(), but we * can't do it there for two reasons. First, and most importantly, at diff --git a/vswitchd/ovs-vswitchd.8.in b/vswitchd/ovs-vswitchd.8.in index 431c9488..fafc3fe8 100644 --- a/vswitchd/ovs-vswitchd.8.in +++ b/vswitchd/ovs-vswitchd.8.in @@ -48,6 +48,9 @@ Port mirroring, with optional VLAN tagging. NetFlow v5 flow logging. . .IP \(bu +sFlow(R) monitoring. +. +.IP \(bu Connectivity to an external OpenFlow controller, such as NOX. . .PP diff --git a/vswitchd/ovs-vswitchd.conf.5.in b/vswitchd/ovs-vswitchd.conf.5.in index 01a1e4c6..5c4a092f 100644 --- a/vswitchd/ovs-vswitchd.conf.5.in +++ b/vswitchd/ovs-vswitchd.conf.5.in @@ -459,6 +459,38 @@ netflow.mybr.host=nflow.example.com:9995 .fi .RE +.SS "sFlow Monitoring" +sFlow(R) is a protocol for monitoring switches. A bridge may be +configured to send sFlow records to sFlow collectors by defining the +key \fBsflow.\fIbridge\fB.host\fR for each collector in the form +\fIip\fR[\fB:\fIport\fR]. Records from \fIbridge\fR will be sent to +each \fIip\fR on UDP \fIport\fR. The \fIip\fR must be specified +numerically, not as a DNS name. If \Iport\fR is omitted, port 6343 is +used. +.PP +By default, 1 out of every 400 packets is sent to the configured sFlow +collector. To override this, set \fBsflow.\fIbridge\fB.sampling\fR to +the number of switched packets out of which one, on average, will be +sent to the sFlow collector, e.g. a value of 1 sends every packet to +the collector, a value of 2 sends 50% of the packets to the collector, +and so on. +.PP +\fBovs\-vswitchd\fR also occasionally sends switch port statistics to +sFlow collectors, by default every 30 seconds. To override this, set +\fBsflow.\fIbridge\fB.polling\fR to a duration in seconds. +.PP +By default, \fBovs\-vswitchd\fR sends the first 128 bytes of sampled +packets to sFlow collectors. To override this, set +\fBsflow.\fIbridge\fB.header\fR to a size in bytes. +.PP +The sFlow module must be able to report an ``agent address'' to sFlow +collectors, which should be an IP address for the Open vSwitch that is +persistent and reachable over the network, if possible. If a +local IP is configured as \fBbridge.\fIbridge\fB.controller.ip\fR, +then that IP address is used by default. To override this default, +set \fBsflow.\fIbridge\fB.agent\fR to the name of a network device, in +which case the IP address set on that device is used. If no IP +address can be determined either way, sFlow is disabled. .SS "Remote Management" A \fBovs\-vswitchd\fR instance may be remotely managed by a controller that supports the OpenFlow Management Protocol, such as NOX. This @@ -514,7 +546,7 @@ switch will perform all configured bridging and switching locally. .TP \fBdiscover\fR Use controller discovery to find the local OpenFlow controller. -Refer to \fB\ovs\-openflowd\fR(8) for information on how to configure a DHCP +Refer to \fBovs\-openflowd\fR(8) for information on how to configure a DHCP server to support controller discovery. The following additional options control the discovery process: . |