aboutsummaryrefslogtreecommitdiff
path: root/lib/sflow_receiver.c
diff options
context:
space:
mode:
authorBen Pfaff <blp@nicira.com>2009-11-20 09:45:26 -0800
committerBen Pfaff <blp@nicira.com>2009-12-21 13:18:35 -0800
commitc72e245a0e2c979f4c14279b56183e2200905c6c (patch)
tree5df768afcb9e5f0d51dcd8e69727bea331b626cd /lib/sflow_receiver.c
parente4bfff8f0d2c01009759acb8adcab070327b5747 (diff)
Add InMon's sFlow Agent library to the build system.
The C source and header files added in this commit is covered under the InMon sFlow license at http://www.inmon.com/technology/sflowlicense.txt The library requires -Wno-unused to compile without warnings, so this commit adds that for building the sFlow code only. Automake can only change compiler flags on a per-library or per-program basis, so sFlow is built as a separate library. The library will be used in upcoming commits.
Diffstat (limited to 'lib/sflow_receiver.c')
-rw-r--r--lib/sflow_receiver.c832
1 files changed, 832 insertions, 0 deletions
diff --git a/lib/sflow_receiver.c b/lib/sflow_receiver.c
new file mode 100644
index 00000000..7fccab30
--- /dev/null
+++ b/lib/sflow_receiver.c
@@ -0,0 +1,832 @@
+/* Copyright (c) 2002-2009 InMon Corp. Licensed under the terms of the InMon sFlow licence: */
+/* http://www.inmon.com/technology/sflowlicense.txt */
+
+#include <assert.h>
+#include "sflow_api.h"
+
+static void resetSampleCollector(SFLReceiver *receiver);
+static void sendSample(SFLReceiver *receiver);
+static void sflError(SFLReceiver *receiver, char *errm);
+inline static void putNet32(SFLReceiver *receiver, u_int32_t val);
+inline static void putAddress(SFLReceiver *receiver, SFLAddress *addr);
+#ifdef SFLOW_DO_SOCKET
+static void initSocket(SFLReceiver *receiver);
+#endif
+
+/*_________________--------------------------__________________
+ _________________ sfl_receiver_init __________________
+ -----------------__________________________------------------
+*/
+
+void sfl_receiver_init(SFLReceiver *receiver, SFLAgent *agent)
+{
+ /* first clear everything */
+ memset(receiver, 0, sizeof(*receiver));
+
+ /* now copy in the parameters */
+ receiver->agent = agent;
+
+ /* set defaults */
+ receiver->sFlowRcvrMaximumDatagramSize = SFL_DEFAULT_DATAGRAM_SIZE;
+ receiver->sFlowRcvrPort = SFL_DEFAULT_COLLECTOR_PORT;
+
+#ifdef SFLOW_DO_SOCKET
+ /* initialize the socket address */
+ initSocket(receiver);
+#endif
+
+ /* preset some of the header fields */
+ receiver->sampleCollector.datap = receiver->sampleCollector.data;
+ putNet32(receiver, SFLDATAGRAM_VERSION5);
+ putAddress(receiver, &agent->myIP);
+ putNet32(receiver, agent->subId);
+
+ /* prepare to receive the first sample */
+ resetSampleCollector(receiver);
+}
+
+/*_________________---------------------------__________________
+ _________________ reset __________________
+ -----------------___________________________------------------
+
+ called on timeout, or when owner string is cleared
+*/
+
+static void reset(SFLReceiver *receiver) {
+ // ask agent to tell samplers and pollers to stop sending samples
+ sfl_agent_resetReceiver(receiver->agent, receiver);
+ // reinitialize
+ sfl_receiver_init(receiver, receiver->agent);
+}
+
+#ifdef SFLOW_DO_SOCKET
+/*_________________---------------------------__________________
+ _________________ initSocket __________________
+ -----------------___________________________------------------
+*/
+
+static void initSocket(SFLReceiver *receiver) {
+ if(receiver->sFlowRcvrAddress.type == SFLADDRESSTYPE_IP_V6) {
+ struct sockaddr_in6 *sa6 = &receiver->receiver6;
+ sa6->sin6_port = htons((u_int16_t)receiver->sFlowRcvrPort);
+ sa6->sin6_family = AF_INET6;
+ sa6->sin6_addr = receiver->sFlowRcvrAddress.address.ip_v6;
+ }
+ else {
+ struct sockaddr_in *sa4 = &receiver->receiver4;
+ sa4->sin_port = htons((u_int16_t)receiver->sFlowRcvrPort);
+ sa4->sin_family = AF_INET;
+ sa4->sin_addr = receiver->sFlowRcvrAddress.address.ip_v4;
+ }
+}
+#endif
+
+/*_________________----------------------------------------_____________
+ _________________ MIB Vars _____________
+ -----------------________________________________________-------------
+*/
+
+char * sfl_receiver_get_sFlowRcvrOwner(SFLReceiver *receiver) {
+ return receiver->sFlowRcvrOwner;
+}
+void sfl_receiver_set_sFlowRcvrOwner(SFLReceiver *receiver, char *sFlowRcvrOwner) {
+ receiver->sFlowRcvrOwner = sFlowRcvrOwner;
+ if(sFlowRcvrOwner == NULL || sFlowRcvrOwner[0] == '\0') {
+ // reset condition! owner string was cleared
+ reset(receiver);
+ }
+}
+time_t sfl_receiver_get_sFlowRcvrTimeout(SFLReceiver *receiver) {
+ return receiver->sFlowRcvrTimeout;
+}
+void sfl_receiver_set_sFlowRcvrTimeout(SFLReceiver *receiver, time_t sFlowRcvrTimeout) {
+ receiver->sFlowRcvrTimeout =sFlowRcvrTimeout;
+}
+u_int32_t sfl_receiver_get_sFlowRcvrMaximumDatagramSize(SFLReceiver *receiver) {
+ return receiver->sFlowRcvrMaximumDatagramSize;
+}
+void sfl_receiver_set_sFlowRcvrMaximumDatagramSize(SFLReceiver *receiver, u_int32_t sFlowRcvrMaximumDatagramSize) {
+ u_int32_t mdz = sFlowRcvrMaximumDatagramSize;
+ if(mdz < SFL_MIN_DATAGRAM_SIZE) mdz = SFL_MIN_DATAGRAM_SIZE;
+ receiver->sFlowRcvrMaximumDatagramSize = mdz;
+}
+SFLAddress *sfl_receiver_get_sFlowRcvrAddress(SFLReceiver *receiver) {
+ return &receiver->sFlowRcvrAddress;
+}
+void sfl_receiver_set_sFlowRcvrAddress(SFLReceiver *receiver, SFLAddress *sFlowRcvrAddress) {
+ if(sFlowRcvrAddress) receiver->sFlowRcvrAddress = *sFlowRcvrAddress; // structure copy
+#ifdef SFLOW_DO_SOCKET
+ initSocket(receiver);
+#endif
+}
+u_int32_t sfl_receiver_get_sFlowRcvrPort(SFLReceiver *receiver) {
+ return receiver->sFlowRcvrPort;
+}
+void sfl_receiver_set_sFlowRcvrPort(SFLReceiver *receiver, u_int32_t sFlowRcvrPort) {
+ receiver->sFlowRcvrPort = sFlowRcvrPort;
+ // update the socket structure
+#ifdef SFLOW_DO_SOCKET
+ initSocket(receiver);
+#endif
+}
+
+/*_________________---------------------------__________________
+ _________________ sfl_receiver_tick __________________
+ -----------------___________________________------------------
+*/
+
+void sfl_receiver_tick(SFLReceiver *receiver, time_t now)
+{
+ // if there are any samples to send, flush them now
+ if(receiver->sampleCollector.numSamples > 0) sendSample(receiver);
+ // check the timeout
+ if(receiver->sFlowRcvrTimeout && (u_int32_t)receiver->sFlowRcvrTimeout != 0xFFFFFFFF) {
+ // count down one tick and reset if we reach 0
+ if(--receiver->sFlowRcvrTimeout == 0) reset(receiver);
+ }
+}
+
+/*_________________-----------------------------__________________
+ _________________ receiver write utilities __________________
+ -----------------_____________________________------------------
+*/
+
+inline static void put32(SFLReceiver *receiver, u_int32_t val)
+{
+ *receiver->sampleCollector.datap++ = val;
+}
+
+inline static void putNet32(SFLReceiver *receiver, u_int32_t val)
+{
+ *receiver->sampleCollector.datap++ = htonl(val);
+}
+
+inline static void putNet32_run(SFLReceiver *receiver, void *obj, size_t quads)
+{
+ u_int32_t *from = (u_int32_t *)obj;
+ while(quads--) putNet32(receiver, *from++);
+}
+
+inline static void putNet64(SFLReceiver *receiver, u_int64_t val64)
+{
+ u_int32_t *firstQuadPtr = receiver->sampleCollector.datap;
+ // first copy the bytes in
+ memcpy((u_char *)firstQuadPtr, &val64, 8);
+ if(htonl(1) != 1) {
+ // swap the bytes, and reverse the quads too
+ u_int32_t tmp = *receiver->sampleCollector.datap++;
+ *firstQuadPtr = htonl(*receiver->sampleCollector.datap);
+ *receiver->sampleCollector.datap++ = htonl(tmp);
+ }
+ else receiver->sampleCollector.datap += 2;
+}
+
+inline static void put128(SFLReceiver *receiver, u_char *val)
+{
+ memcpy(receiver->sampleCollector.datap, val, 16);
+ receiver->sampleCollector.datap += 4;
+}
+
+inline static void putString(SFLReceiver *receiver, SFLString *s)
+{
+ putNet32(receiver, s->len);
+ memcpy(receiver->sampleCollector.datap, s->str, s->len);
+ receiver->sampleCollector.datap += (s->len + 3) / 4; /* pad to 4-byte boundary */
+}
+
+inline static u_int32_t stringEncodingLength(SFLString *s) {
+ // answer in bytes, so remember to mulitply by 4 after rounding up to nearest 4-byte boundary
+ return 4 + (((s->len + 3) / 4) * 4);
+}
+
+inline static void putAddress(SFLReceiver *receiver, SFLAddress *addr)
+{
+ // encode unspecified addresses as IPV4:0.0.0.0 - or should we flag this as an error?
+ if(addr->type == 0) {
+ putNet32(receiver, SFLADDRESSTYPE_IP_V4);
+ put32(receiver, 0);
+ }
+ else {
+ putNet32(receiver, addr->type);
+ if(addr->type == SFLADDRESSTYPE_IP_V4) put32(receiver, addr->address.ip_v4.addr);
+ else put128(receiver, addr->address.ip_v6.addr);
+ }
+}
+
+inline static u_int32_t addressEncodingLength(SFLAddress *addr) {
+ return (addr->type == SFLADDRESSTYPE_IP_V6) ? 20 : 8; // type + address (unspecified == IPV4)
+}
+
+inline static void putMACAddress(SFLReceiver *receiver, u_int8_t *mac)
+{
+ memcpy(receiver->sampleCollector.datap, mac, 6);
+ receiver->sampleCollector.datap += 2;
+}
+
+inline static void putSwitch(SFLReceiver *receiver, SFLExtended_switch *sw)
+{
+ putNet32(receiver, sw->src_vlan);
+ putNet32(receiver, sw->src_priority);
+ putNet32(receiver, sw->dst_vlan);
+ putNet32(receiver, sw->dst_priority);
+}
+
+inline static void putRouter(SFLReceiver *receiver, SFLExtended_router *router)
+{
+ putAddress(receiver, &router->nexthop);
+ putNet32(receiver, router->src_mask);
+ putNet32(receiver, router->dst_mask);
+}
+
+inline static u_int32_t routerEncodingLength(SFLExtended_router *router) {
+ return addressEncodingLength(&router->nexthop) + 8;
+}
+
+inline static void putGateway(SFLReceiver *receiver, SFLExtended_gateway *gw)
+{
+ putAddress(receiver, &gw->nexthop);
+ putNet32(receiver, gw->as);
+ putNet32(receiver, gw->src_as);
+ putNet32(receiver, gw->src_peer_as);
+ putNet32(receiver, gw->dst_as_path_segments);
+ {
+ u_int32_t seg = 0;
+ for(; seg < gw->dst_as_path_segments; seg++) {
+ putNet32(receiver, gw->dst_as_path[seg].type);
+ putNet32(receiver, gw->dst_as_path[seg].length);
+ putNet32_run(receiver, gw->dst_as_path[seg].as.seq, gw->dst_as_path[seg].length);
+ }
+ }
+ putNet32(receiver, gw->communities_length);
+ putNet32_run(receiver, gw->communities, gw->communities_length);
+ putNet32(receiver, gw->localpref);
+}
+
+inline static u_int32_t gatewayEncodingLength(SFLExtended_gateway *gw) {
+ u_int32_t elemSiz = addressEncodingLength(&gw->nexthop);
+ u_int32_t seg = 0;
+ elemSiz += 16; // as, src_as, src_peer_as, dst_as_path_segments
+ for(; seg < gw->dst_as_path_segments; seg++) {
+ elemSiz += 8; // type, length
+ elemSiz += 4 * gw->dst_as_path[seg].length; // set/seq bytes
+ }
+ elemSiz += 4; // communities_length
+ elemSiz += 4 * gw->communities_length; // communities
+ elemSiz += 4; // localpref
+ return elemSiz;
+}
+
+inline static void putUser(SFLReceiver *receiver, SFLExtended_user *user)
+{
+ putNet32(receiver, user->src_charset);
+ putString(receiver, &user->src_user);
+ putNet32(receiver, user->dst_charset);
+ putString(receiver, &user->dst_user);
+}
+
+inline static u_int32_t userEncodingLength(SFLExtended_user *user) {
+ return 4
+ + stringEncodingLength(&user->src_user)
+ + 4
+ + stringEncodingLength(&user->dst_user);
+}
+
+inline static void putUrl(SFLReceiver *receiver, SFLExtended_url *url)
+{
+ putNet32(receiver, url->direction);
+ putString(receiver, &url->url);
+ putString(receiver, &url->host);
+}
+
+inline static u_int32_t urlEncodingLength(SFLExtended_url *url) {
+ return 4
+ + stringEncodingLength(&url->url)
+ + stringEncodingLength(&url->host);
+}
+
+inline static void putLabelStack(SFLReceiver *receiver, SFLLabelStack *labelStack)
+{
+ putNet32(receiver, labelStack->depth);
+ putNet32_run(receiver, labelStack->stack, labelStack->depth);
+}
+
+inline static u_int32_t labelStackEncodingLength(SFLLabelStack *labelStack) {
+ return 4 + (4 * labelStack->depth);
+}
+
+inline static void putMpls(SFLReceiver *receiver, SFLExtended_mpls *mpls)
+{
+ putAddress(receiver, &mpls->nextHop);
+ putLabelStack(receiver, &mpls->in_stack);
+ putLabelStack(receiver, &mpls->out_stack);
+}
+
+inline static u_int32_t mplsEncodingLength(SFLExtended_mpls *mpls) {
+ return addressEncodingLength(&mpls->nextHop)
+ + labelStackEncodingLength(&mpls->in_stack)
+ + labelStackEncodingLength(&mpls->out_stack);
+}
+
+inline static void putNat(SFLReceiver *receiver, SFLExtended_nat *nat)
+{
+ putAddress(receiver, &nat->src);
+ putAddress(receiver, &nat->dst);
+}
+
+inline static u_int32_t natEncodingLength(SFLExtended_nat *nat) {
+ return addressEncodingLength(&nat->src)
+ + addressEncodingLength(&nat->dst);
+}
+
+inline static void putMplsTunnel(SFLReceiver *receiver, SFLExtended_mpls_tunnel *tunnel)
+{
+ putString(receiver, &tunnel->tunnel_lsp_name);
+ putNet32(receiver, tunnel->tunnel_id);
+ putNet32(receiver, tunnel->tunnel_cos);
+}
+
+inline static u_int32_t mplsTunnelEncodingLength(SFLExtended_mpls_tunnel *tunnel) {
+ return stringEncodingLength(&tunnel->tunnel_lsp_name) + 8;
+}
+
+inline static void putMplsVc(SFLReceiver *receiver, SFLExtended_mpls_vc *vc)
+{
+ putString(receiver, &vc->vc_instance_name);
+ putNet32(receiver, vc->vll_vc_id);
+ putNet32(receiver, vc->vc_label_cos);
+}
+
+inline static u_int32_t mplsVcEncodingLength(SFLExtended_mpls_vc *vc) {
+ return stringEncodingLength( &vc->vc_instance_name) + 8;
+}
+
+inline static void putMplsFtn(SFLReceiver *receiver, SFLExtended_mpls_FTN *ftn)
+{
+ putString(receiver, &ftn->mplsFTNDescr);
+ putNet32(receiver, ftn->mplsFTNMask);
+}
+
+inline static u_int32_t mplsFtnEncodingLength(SFLExtended_mpls_FTN *ftn) {
+ return stringEncodingLength( &ftn->mplsFTNDescr) + 4;
+}
+
+inline static void putMplsLdpFec(SFLReceiver *receiver, SFLExtended_mpls_LDP_FEC *ldpfec)
+{
+ putNet32(receiver, ldpfec->mplsFecAddrPrefixLength);
+}
+
+inline static u_int32_t mplsLdpFecEncodingLength(SFLExtended_mpls_LDP_FEC *ldpfec) {
+ return 4;
+}
+
+inline static void putVlanTunnel(SFLReceiver *receiver, SFLExtended_vlan_tunnel *vlanTunnel)
+{
+ putLabelStack(receiver, &vlanTunnel->stack);
+}
+
+inline static u_int32_t vlanTunnelEncodingLength(SFLExtended_vlan_tunnel *vlanTunnel) {
+ return labelStackEncodingLength(&vlanTunnel->stack);
+}
+
+
+inline static void putGenericCounters(SFLReceiver *receiver, SFLIf_counters *counters)
+{
+ putNet32(receiver, counters->ifIndex);
+ putNet32(receiver, counters->ifType);
+ putNet64(receiver, counters->ifSpeed);
+ putNet32(receiver, counters->ifDirection);
+ putNet32(receiver, counters->ifStatus);
+ putNet64(receiver, counters->ifInOctets);
+ putNet32(receiver, counters->ifInUcastPkts);
+ putNet32(receiver, counters->ifInMulticastPkts);
+ putNet32(receiver, counters->ifInBroadcastPkts);
+ putNet32(receiver, counters->ifInDiscards);
+ putNet32(receiver, counters->ifInErrors);
+ putNet32(receiver, counters->ifInUnknownProtos);
+ putNet64(receiver, counters->ifOutOctets);
+ putNet32(receiver, counters->ifOutUcastPkts);
+ putNet32(receiver, counters->ifOutMulticastPkts);
+ putNet32(receiver, counters->ifOutBroadcastPkts);
+ putNet32(receiver, counters->ifOutDiscards);
+ putNet32(receiver, counters->ifOutErrors);
+ putNet32(receiver, counters->ifPromiscuousMode);
+}
+
+
+/*_________________-----------------------------__________________
+ _________________ computeFlowSampleSize __________________
+ -----------------_____________________________------------------
+*/
+
+static int computeFlowSampleSize(SFLReceiver *receiver, SFL_FLOW_SAMPLE_TYPE *fs)
+{
+ SFLFlow_sample_element *elem = fs->elements;
+#ifdef SFL_USE_32BIT_INDEX
+ u_int siz = 52; /* tag, length, sequence_number, ds_class, ds_index, sampling_rate,
+ sample_pool, drops, inputFormat, input, outputFormat, output, number of elements */
+#else
+ u_int siz = 40; /* tag, length, sequence_number, source_id, sampling_rate,
+ sample_pool, drops, input, output, number of elements */
+#endif
+
+ fs->num_elements = 0; /* we're going to count them again even if this was set by the client */
+ for(; elem != NULL; elem = elem->nxt) {
+ u_int elemSiz = 0;
+ fs->num_elements++;
+ siz += 8; /* tag, length */
+ switch(elem->tag) {
+ case SFLFLOW_HEADER:
+ elemSiz = 16; /* header_protocol, frame_length, stripped, header_length */
+ elemSiz += ((elem->flowType.header.header_length + 3) / 4) * 4; /* header, rounded up to nearest 4 bytes */
+ break;
+ case SFLFLOW_ETHERNET: elemSiz = sizeof(SFLSampled_ethernet); break;
+ case SFLFLOW_IPV4: elemSiz = sizeof(SFLSampled_ipv4); break;
+ case SFLFLOW_IPV6: elemSiz = sizeof(SFLSampled_ipv6); break;
+ case SFLFLOW_EX_SWITCH: elemSiz = sizeof(SFLExtended_switch); break;
+ case SFLFLOW_EX_ROUTER: elemSiz = routerEncodingLength(&elem->flowType.router); break;
+ case SFLFLOW_EX_GATEWAY: elemSiz = gatewayEncodingLength(&elem->flowType.gateway); break;
+ case SFLFLOW_EX_USER: elemSiz = userEncodingLength(&elem->flowType.user); break;
+ case SFLFLOW_EX_URL: elemSiz = urlEncodingLength(&elem->flowType.url); break;
+ case SFLFLOW_EX_MPLS: elemSiz = mplsEncodingLength(&elem->flowType.mpls); break;
+ case SFLFLOW_EX_NAT: elemSiz = natEncodingLength(&elem->flowType.nat); break;
+ case SFLFLOW_EX_MPLS_TUNNEL: elemSiz = mplsTunnelEncodingLength(&elem->flowType.mpls_tunnel); break;
+ case SFLFLOW_EX_MPLS_VC: elemSiz = mplsVcEncodingLength(&elem->flowType.mpls_vc); break;
+ case SFLFLOW_EX_MPLS_FTN: elemSiz = mplsFtnEncodingLength(&elem->flowType.mpls_ftn); break;
+ case SFLFLOW_EX_MPLS_LDP_FEC: elemSiz = mplsLdpFecEncodingLength(&elem->flowType.mpls_ldp_fec); break;
+ case SFLFLOW_EX_VLAN_TUNNEL: elemSiz = vlanTunnelEncodingLength(&elem->flowType.vlan_tunnel); break;
+ default:
+ sflError(receiver, "unexpected packet_data_tag");
+ return -1;
+ break;
+ }
+ // cache the element size, and accumulate it into the overall FlowSample size
+ elem->length = elemSiz;
+ siz += elemSiz;
+ }
+
+ return siz;
+}
+
+/*_________________-------------------------------__________________
+ _________________ sfl_receiver_writeFlowSample __________________
+ -----------------_______________________________------------------
+*/
+
+int sfl_receiver_writeFlowSample(SFLReceiver *receiver, SFL_FLOW_SAMPLE_TYPE *fs)
+{
+ int packedSize;
+ if(fs == NULL) return -1;
+ if((packedSize = computeFlowSampleSize(receiver, fs)) == -1) return -1;
+
+ // check in case this one sample alone is too big for the datagram
+ // in fact - if it is even half as big then we should ditch it. Very
+ // important to avoid overruning the packet buffer.
+ if(packedSize > (int)(receiver->sFlowRcvrMaximumDatagramSize / 2)) {
+ sflError(receiver, "flow sample too big for datagram");
+ return -1;
+ }
+
+ // if the sample pkt is full enough so that this sample might put
+ // it over the limit, then we should send it now before going on.
+ if((receiver->sampleCollector.pktlen + packedSize) >= receiver->sFlowRcvrMaximumDatagramSize)
+ sendSample(receiver);
+
+ receiver->sampleCollector.numSamples++;
+
+#ifdef SFL_USE_32BIT_INDEX
+ putNet32(receiver, SFLFLOW_SAMPLE_EXPANDED);
+#else
+ putNet32(receiver, SFLFLOW_SAMPLE);
+#endif
+
+ putNet32(receiver, packedSize - 8); // don't include tag and len
+ putNet32(receiver, fs->sequence_number);
+
+#ifdef SFL_USE_32BIT_INDEX
+ putNet32(receiver, fs->ds_class);
+ putNet32(receiver, fs->ds_index);
+#else
+ putNet32(receiver, fs->source_id);
+#endif
+
+ putNet32(receiver, fs->sampling_rate);
+ putNet32(receiver, fs->sample_pool);
+ putNet32(receiver, fs->drops);
+
+#ifdef SFL_USE_32BIT_INDEX
+ putNet32(receiver, fs->inputFormat);
+ putNet32(receiver, fs->input);
+ putNet32(receiver, fs->outputFormat);
+ putNet32(receiver, fs->output);
+#else
+ putNet32(receiver, fs->input);
+ putNet32(receiver, fs->output);
+#endif
+
+ putNet32(receiver, fs->num_elements);
+
+ {
+ SFLFlow_sample_element *elem = fs->elements;
+ for(; elem != NULL; elem = elem->nxt) {
+
+ putNet32(receiver, elem->tag);
+ putNet32(receiver, elem->length); // length cached in computeFlowSampleSize()
+
+ switch(elem->tag) {
+ case SFLFLOW_HEADER:
+ putNet32(receiver, elem->flowType.header.header_protocol);
+ putNet32(receiver, elem->flowType.header.frame_length);
+ putNet32(receiver, elem->flowType.header.stripped);
+ putNet32(receiver, elem->flowType.header.header_length);
+ /* the header */
+ memcpy(receiver->sampleCollector.datap, elem->flowType.header.header_bytes, elem->flowType.header.header_length);
+ /* round up to multiple of 4 to preserve alignment */
+ receiver->sampleCollector.datap += ((elem->flowType.header.header_length + 3) / 4);
+ break;
+ case SFLFLOW_ETHERNET:
+ putNet32(receiver, elem->flowType.ethernet.eth_len);
+ putMACAddress(receiver, elem->flowType.ethernet.src_mac);
+ putMACAddress(receiver, elem->flowType.ethernet.dst_mac);
+ putNet32(receiver, elem->flowType.ethernet.eth_type);
+ break;
+ case SFLFLOW_IPV4:
+ putNet32(receiver, elem->flowType.ipv4.length);
+ putNet32(receiver, elem->flowType.ipv4.protocol);
+ put32(receiver, elem->flowType.ipv4.src_ip.addr);
+ put32(receiver, elem->flowType.ipv4.dst_ip.addr);
+ putNet32(receiver, elem->flowType.ipv4.src_port);
+ putNet32(receiver, elem->flowType.ipv4.dst_port);
+ putNet32(receiver, elem->flowType.ipv4.tcp_flags);
+ putNet32(receiver, elem->flowType.ipv4.tos);
+ break;
+ case SFLFLOW_IPV6:
+ putNet32(receiver, elem->flowType.ipv6.length);
+ putNet32(receiver, elem->flowType.ipv6.protocol);
+ put128(receiver, elem->flowType.ipv6.src_ip.addr);
+ put128(receiver, elem->flowType.ipv6.dst_ip.addr);
+ putNet32(receiver, elem->flowType.ipv6.src_port);
+ putNet32(receiver, elem->flowType.ipv6.dst_port);
+ putNet32(receiver, elem->flowType.ipv6.tcp_flags);
+ putNet32(receiver, elem->flowType.ipv6.priority);
+ break;
+ case SFLFLOW_EX_SWITCH: putSwitch(receiver, &elem->flowType.sw); break;
+ case SFLFLOW_EX_ROUTER: putRouter(receiver, &elem->flowType.router); break;
+ case SFLFLOW_EX_GATEWAY: putGateway(receiver, &elem->flowType.gateway); break;
+ case SFLFLOW_EX_USER: putUser(receiver, &elem->flowType.user); break;
+ case SFLFLOW_EX_URL: putUrl(receiver, &elem->flowType.url); break;
+ case SFLFLOW_EX_MPLS: putMpls(receiver, &elem->flowType.mpls); break;
+ case SFLFLOW_EX_NAT: putNat(receiver, &elem->flowType.nat); break;
+ case SFLFLOW_EX_MPLS_TUNNEL: putMplsTunnel(receiver, &elem->flowType.mpls_tunnel); break;
+ case SFLFLOW_EX_MPLS_VC: putMplsVc(receiver, &elem->flowType.mpls_vc); break;
+ case SFLFLOW_EX_MPLS_FTN: putMplsFtn(receiver, &elem->flowType.mpls_ftn); break;
+ case SFLFLOW_EX_MPLS_LDP_FEC: putMplsLdpFec(receiver, &elem->flowType.mpls_ldp_fec); break;
+ case SFLFLOW_EX_VLAN_TUNNEL: putVlanTunnel(receiver, &elem->flowType.vlan_tunnel); break;
+ default:
+ sflError(receiver, "unexpected packet_data_tag");
+ return -1;
+ break;
+ }
+ }
+ }
+
+ // sanity check
+ assert(((u_char *)receiver->sampleCollector.datap
+ - (u_char *)receiver->sampleCollector.data
+ - receiver->sampleCollector.pktlen) == (u_int32_t)packedSize);
+
+ // update the pktlen
+ receiver->sampleCollector.pktlen = (u_char *)receiver->sampleCollector.datap - (u_char *)receiver->sampleCollector.data;
+ return packedSize;
+}
+
+/*_________________-----------------------------__________________
+ _________________ computeCountersSampleSize __________________
+ -----------------_____________________________------------------
+*/
+
+static int computeCountersSampleSize(SFLReceiver *receiver, SFL_COUNTERS_SAMPLE_TYPE *cs)
+{
+ SFLCounters_sample_element *elem = cs->elements;
+#ifdef SFL_USE_32BIT_INDEX
+ u_int siz = 24; /* tag, length, sequence_number, ds_class, ds_index, number of elements */
+#else
+ u_int siz = 20; /* tag, length, sequence_number, source_id, number of elements */
+#endif
+
+ cs->num_elements = 0; /* we're going to count them again even if this was set by the client */
+ for(; elem != NULL; elem = elem->nxt) {
+ u_int elemSiz = 0;
+ cs->num_elements++;
+ siz += 8; /* tag, length */
+ switch(elem->tag) {
+ case SFLCOUNTERS_GENERIC: elemSiz = sizeof(elem->counterBlock.generic); break;
+ case SFLCOUNTERS_ETHERNET: elemSiz = sizeof(elem->counterBlock.ethernet); break;
+ case SFLCOUNTERS_TOKENRING: elemSiz = sizeof(elem->counterBlock.tokenring); break;
+ case SFLCOUNTERS_VG: elemSiz = sizeof(elem->counterBlock.vg); break;
+ case SFLCOUNTERS_VLAN: elemSiz = sizeof(elem->counterBlock.vlan); break;
+ default:
+ sflError(receiver, "unexpected counters_tag");
+ return -1;
+ break;
+ }
+ // cache the element size, and accumulate it into the overall FlowSample size
+ elem->length = elemSiz;
+ siz += elemSiz;
+ }
+ return siz;
+}
+
+/*_________________----------------------------------__________________
+ _________________ sfl_receiver_writeCountersSample __________________
+ -----------------__________________________________------------------
+*/
+
+int sfl_receiver_writeCountersSample(SFLReceiver *receiver, SFL_COUNTERS_SAMPLE_TYPE *cs)
+{
+ int packedSize;
+ if(cs == NULL) return -1;
+ // if the sample pkt is full enough so that this sample might put
+ // it over the limit, then we should send it now.
+ if((packedSize = computeCountersSampleSize(receiver, cs)) == -1) return -1;
+
+ // check in case this one sample alone is too big for the datagram
+ // in fact - if it is even half as big then we should ditch it. Very
+ // important to avoid overruning the packet buffer.
+ if(packedSize > (int)(receiver->sFlowRcvrMaximumDatagramSize / 2)) {
+ sflError(receiver, "counters sample too big for datagram");
+ return -1;
+ }
+
+ if((receiver->sampleCollector.pktlen + packedSize) >= receiver->sFlowRcvrMaximumDatagramSize)
+ sendSample(receiver);
+
+ receiver->sampleCollector.numSamples++;
+
+#ifdef SFL_USE_32BIT_INDEX
+ putNet32(receiver, SFLCOUNTERS_SAMPLE_EXPANDED);
+#else
+ putNet32(receiver, SFLCOUNTERS_SAMPLE);
+#endif
+
+ putNet32(receiver, packedSize - 8); // tag and length not included
+ putNet32(receiver, cs->sequence_number);
+
+#ifdef SFL_USE_32BIT_INDEX
+ putNet32(receiver, cs->ds_class);
+ putNet32(receiver, cs->ds_index);
+#else
+ putNet32(receiver, cs->source_id);
+#endif
+
+ putNet32(receiver, cs->num_elements);
+
+ {
+ SFLCounters_sample_element *elem = cs->elements;
+ for(; elem != NULL; elem = elem->nxt) {
+
+ putNet32(receiver, elem->tag);
+ putNet32(receiver, elem->length); // length cached in computeCountersSampleSize()
+
+ switch(elem->tag) {
+ case SFLCOUNTERS_GENERIC:
+ putGenericCounters(receiver, &(elem->counterBlock.generic));
+ break;
+ case SFLCOUNTERS_ETHERNET:
+ // all these counters are 32-bit
+ putNet32_run(receiver, &elem->counterBlock.ethernet, sizeof(elem->counterBlock.ethernet) / 4);
+ break;
+ case SFLCOUNTERS_TOKENRING:
+ // all these counters are 32-bit
+ putNet32_run(receiver, &elem->counterBlock.tokenring, sizeof(elem->counterBlock.tokenring) / 4);
+ break;
+ case SFLCOUNTERS_VG:
+ // mixed sizes
+ putNet32(receiver, elem->counterBlock.vg.dot12InHighPriorityFrames);
+ putNet64(receiver, elem->counterBlock.vg.dot12InHighPriorityOctets);
+ putNet32(receiver, elem->counterBlock.vg.dot12InNormPriorityFrames);
+ putNet64(receiver, elem->counterBlock.vg.dot12InNormPriorityOctets);
+ putNet32(receiver, elem->counterBlock.vg.dot12InIPMErrors);
+ putNet32(receiver, elem->counterBlock.vg.dot12InOversizeFrameErrors);
+ putNet32(receiver, elem->counterBlock.vg.dot12InDataErrors);
+ putNet32(receiver, elem->counterBlock.vg.dot12InNullAddressedFrames);
+ putNet32(receiver, elem->counterBlock.vg.dot12OutHighPriorityFrames);
+ putNet64(receiver, elem->counterBlock.vg.dot12OutHighPriorityOctets);
+ putNet32(receiver, elem->counterBlock.vg.dot12TransitionIntoTrainings);
+ putNet64(receiver, elem->counterBlock.vg.dot12HCInHighPriorityOctets);
+ putNet64(receiver, elem->counterBlock.vg.dot12HCInNormPriorityOctets);
+ putNet64(receiver, elem->counterBlock.vg.dot12HCOutHighPriorityOctets);
+ break;
+ case SFLCOUNTERS_VLAN:
+ // mixed sizes
+ putNet32(receiver, elem->counterBlock.vlan.vlan_id);
+ putNet64(receiver, elem->counterBlock.vlan.octets);
+ putNet32(receiver, elem->counterBlock.vlan.ucastPkts);
+ putNet32(receiver, elem->counterBlock.vlan.multicastPkts);
+ putNet32(receiver, elem->counterBlock.vlan.broadcastPkts);
+ putNet32(receiver, elem->counterBlock.vlan.discards);
+ break;
+ default:
+ sflError(receiver, "unexpected counters_tag");
+ return -1;
+ break;
+ }
+ }
+ }
+ // sanity check
+ assert(((u_char *)receiver->sampleCollector.datap
+ - (u_char *)receiver->sampleCollector.data
+ - receiver->sampleCollector.pktlen) == (u_int32_t)packedSize);
+
+ // update the pktlen
+ receiver->sampleCollector.pktlen = (u_char *)receiver->sampleCollector.datap - (u_char *)receiver->sampleCollector.data;
+ return packedSize;
+}
+
+/*_________________---------------------------------__________________
+ _________________ sfl_receiver_samplePacketsSent __________________
+ -----------------_________________________________------------------
+*/
+
+u_int32_t sfl_receiver_samplePacketsSent(SFLReceiver *receiver)
+{
+ return receiver->sampleCollector.packetSeqNo;
+}
+
+/*_________________---------------------------__________________
+ _________________ sendSample __________________
+ -----------------___________________________------------------
+*/
+
+static void sendSample(SFLReceiver *receiver)
+{
+ /* construct and send out the sample, then reset for the next one... */
+ /* first fill in the header with the latest values */
+ /* version, agent_address and sub_agent_id were pre-set. */
+ u_int32_t hdrIdx = (receiver->agent->myIP.type == SFLADDRESSTYPE_IP_V6) ? 7 : 4;
+ receiver->sampleCollector.data[hdrIdx++] = htonl(++receiver->sampleCollector.packetSeqNo); /* seq no */
+ receiver->sampleCollector.data[hdrIdx++] = htonl((receiver->agent->now - receiver->agent->bootTime) * 1000); /* uptime */
+ receiver->sampleCollector.data[hdrIdx++] = htonl(receiver->sampleCollector.numSamples); /* num samples */
+ /* send */
+ if(receiver->agent->sendFn) (*receiver->agent->sendFn)(receiver->agent->magic,
+ receiver->agent,
+ receiver,
+ (u_char *)receiver->sampleCollector.data,
+ receiver->sampleCollector.pktlen);
+ else {
+#ifdef SFLOW_DO_SOCKET
+ /* send it myself */
+ if (receiver->sFlowRcvrAddress.type == SFLADDRESSTYPE_IP_V6) {
+ u_int32_t soclen = sizeof(struct sockaddr_in6);
+ int result = sendto(receiver->agent->receiverSocket6,
+ receiver->sampleCollector.data,
+ receiver->sampleCollector.pktlen,
+ 0,
+ (struct sockaddr *)&receiver->receiver6,
+ soclen);
+ if(result == -1 && errno != EINTR) sfl_agent_sysError(receiver->agent, "receiver", "IPv6 socket sendto error");
+ if(result == 0) sfl_agent_error(receiver->agent, "receiver", "IPv6 socket sendto returned 0");
+ }
+ else {
+ u_int32_t soclen = sizeof(struct sockaddr_in);
+ int result = sendto(receiver->agent->receiverSocket4,
+ receiver->sampleCollector.data,
+ receiver->sampleCollector.pktlen,
+ 0,
+ (struct sockaddr *)&receiver->receiver4,
+ soclen);
+ if(result == -1 && errno != EINTR) sfl_agent_sysError(receiver->agent, "receiver", "socket sendto error");
+ if(result == 0) sfl_agent_error(receiver->agent, "receiver", "socket sendto returned 0");
+ }
+#endif
+ }
+
+ /* reset for the next time */
+ resetSampleCollector(receiver);
+}
+
+/*_________________---------------------------__________________
+ _________________ resetSampleCollector __________________
+ -----------------___________________________------------------
+*/
+
+static void resetSampleCollector(SFLReceiver *receiver)
+{
+ receiver->sampleCollector.pktlen = 0;
+ receiver->sampleCollector.numSamples = 0;
+ /* point the datap to just after the header */
+ receiver->sampleCollector.datap = (receiver->agent->myIP.type == SFLADDRESSTYPE_IP_V6) ?
+ (receiver->sampleCollector.data + 10) : (receiver->sampleCollector.data + 7);
+
+ receiver->sampleCollector.pktlen = (u_char *)receiver->sampleCollector.datap - (u_char *)receiver->sampleCollector.data;
+}
+
+/*_________________---------------------------__________________
+ _________________ sflError __________________
+ -----------------___________________________------------------
+*/
+
+static void sflError(SFLReceiver *receiver, char *msg)
+{
+ sfl_agent_error(receiver->agent, "receiver", msg);
+ resetSampleCollector(receiver);
+}