/* Copyright (c) 2016, Linaro Limited * All rights reserved. * * SPDX-License-Identifier: BSD-3-Clause */ /** * @file * * @example odp_pktio_ordered.c ODP ordered pktio test application */ /** enable strtok */ #ifndef _GNU_SOURCE #define _GNU_SOURCE #endif #include #include #include #include #include #include #include #include #include #include #include #include /** Jenkins hash support. * * Copyright (C) 2006 Bob Jenkins (bob_jenkins@burtleburtle.net) * * http://burtleburtle.net/bob/hash/ * * These are the credits from Bob's sources: * * lookup3.c, by Bob Jenkins, May 2006, Public Domain. * * These are functions for producing 32-bit hashes for hash table lookup. * hashword(), hashlittle(), hashlittle2(), hashbig(), mix(), and final() * are externally useful functions. Routines to test the hash are included * if SELF_TEST is defined. You can use this free for any purpose. It's in * the public domain. It has no warranty. * * $FreeBSD$ */ #define rot(x, k) (((x) << (k)) | ((x) >> (32 - (k)))) #define mix(a, b, c) \ { \ a -= c; a ^= rot(c, 4); c += b; \ b -= a; b ^= rot(a, 6); a += c; \ c -= b; c ^= rot(b, 8); b += a; \ a -= c; a ^= rot(c, 16); c += b; \ b -= a; b ^= rot(a, 19); a += c; \ c -= b; c ^= rot(b, 4); b += a; \ } #define final(a, b, c) \ { \ c ^= b; c -= rot(b, 14); \ a ^= c; a -= rot(c, 11); \ b ^= a; b -= rot(a, 25); \ c ^= b; c -= rot(b, 16); \ a ^= c; a -= rot(c, 4); \ b ^= a; b -= rot(a, 14); \ c ^= b; c -= rot(b, 24); \ } #define JHASH_GOLDEN_RATIO 0x9e3779b9 /** Maximum number of worker threads */ #define MAX_WORKERS 64 /** Number of packet buffers in the memory pool */ #define PKT_POOL_SIZE 8192 /** Buffer size of the packet pool buffer in bytes*/ #define PKT_POOL_BUF_SIZE 1856 /** Packet user area size in bytes */ #define PKT_UAREA_SIZE 32 /** Maximum number of packets in a burst */ #define MAX_PKT_BURST 32 /** Maximum number of pktio queues per interface */ #define MAX_QUEUES 32 /** Maximum number of pktio interfaces */ #define MAX_PKTIOS 8 /** Maximum number of packet flows */ #define MAX_FLOWS 128 ODP_STATIC_ASSERT(MAX_PKTIOS < MAX_FLOWS, "MAX_FLOWS must be greater than MAX_PKTIOS\n"); /** Minimum valid packet length */ #define MIN_PACKET_LEN (ODPH_ETHHDR_LEN + ODPH_IPV4HDR_LEN + ODPH_UDPHDR_LEN) /** Default number of input queues */ #define DEF_NUM_RX_QUEUES 1 /** Default number of flows */ #define DEF_NUM_FLOWS 12 /** Default number of extra processing rounds */ #define DEF_EXTRA_ROUNDS 15 /** Default statistics print interval in seconds */ #define DEF_STATS_INT 1 /** Get rid of path in filename - only for unix-type paths using '/' */ #define NO_PATH(file_name) (strrchr((file_name), '/') ? \ strrchr((file_name), '/') + 1 : (file_name)) /** * Packet input mode */ typedef enum pktin_mode_t { SCHED_ORDERED = 0, SCHED_ATOMIC, SCHED_PARALLEL } pktin_mode_t; /** * Parsed command line application arguments */ typedef struct { int cpu_count; /**< CPU count */ int if_count; /**< Number of interfaces to be used */ int addr_count; /**< Number of dst addresses to be used */ int num_rx_q; /**< Number of input queues per interface */ int num_flows; /**< Number of packet flows */ int extra_rounds; /**< Number of extra input processing rounds */ char **if_names; /**< Array of pointers to interface names */ odph_ethaddr_t addrs[MAX_PKTIOS]; /**< Array of dst addresses */ pktin_mode_t in_mode; /**< Packet input mode */ int time; /**< Time in seconds to run. */ int accuracy; /**< Statistics print interval */ char *if_str; /**< Storage for interface names */ } appl_args_t; static int exit_threads; /**< Break workers loop if set to 1 */ /** * Queue context */ typedef struct { odp_bool_t input_queue; /**< Input queue */ uint64_t idx; /**< Queue index */ uint64_t seq[MAX_FLOWS]; /**< Per flow sequence numbers */ } qcontext_t; /** * Flow info stored in the packet user area */ typedef struct { uint64_t seq; /**< Sequence number */ uint32_t crc; /**< CRC hash */ uint16_t idx; /**< Flow index */ uint8_t src_idx; /**< Source port index */ uint8_t dst_idx; /**< Destination port index */ } flow_t; ODP_STATIC_ASSERT(sizeof(flow_t) <= PKT_UAREA_SIZE, "Flow data doesn't fit in the packet user area\n"); /** * Statistics */ typedef union { struct { /** Number of forwarded packets */ uint64_t packets; /** Packets dropped due to a receive error */ uint64_t rx_drops; /** Packets dropped due to a transmit error */ uint64_t tx_drops; /** Packets with invalid sequence number */ uint64_t invalid_seq; } s; uint8_t padding[ODP_CACHE_LINE_SIZE]; } stats_t ODP_ALIGNED_CACHE; /** * IPv4 5-tuple */ typedef struct { int32_t src_ip; int32_t dst_ip; int16_t src_port; int16_t dst_port; int8_t proto; int8_t pad0; int16_t pad1; } ipv4_tuple5_t; /** * Packet headers */ typedef struct { odph_ethhdr_t *eth; odph_ipv4hdr_t *ipv4; odph_udphdr_t *udp; } packet_hdr_t; /** * Thread specific arguments */ typedef struct thread_args_t { stats_t *stats; /**< Pointer to per thread statistics */ } thread_args_t; /** * Grouping of all global data */ typedef struct { /** Per thread packet stats */ stats_t stats[MAX_WORKERS]; /** Application (parsed) arguments */ appl_args_t appl; /** Thread specific arguments */ thread_args_t thread[MAX_WORKERS]; /** Table of port ethernet addresses */ odph_ethaddr_t port_eth_addr[MAX_PKTIOS]; /** Table of dst ethernet addresses */ odph_ethaddr_t dst_eth_addr[MAX_PKTIOS]; /** Table of dst ports */ int dst_port[MAX_PKTIOS]; /** Table of atomic queues for flows */ odp_queue_t fqueue[MAX_PKTIOS][MAX_FLOWS]; /** Table of flow queue contexts */ qcontext_t flow_qcontext[MAX_PKTIOS][MAX_FLOWS]; /** Table of input queue contexts */ qcontext_t input_qcontext[MAX_PKTIOS][MAX_QUEUES]; /** Table of pktio handles */ struct { odp_pktio_t pktio; odp_pktout_queue_t pktout[MAX_FLOWS]; odp_queue_t pktin[MAX_QUEUES]; int num_rx_queue; int num_tx_queue; } pktios[MAX_PKTIOS]; } args_t; /** Global pointer to args */ static args_t *gbl_args; /** Global barrier to synchronize main and workers */ static odp_barrier_t barrier; /** * Lookup the destination port for a given packet * * @param pkt ODP packet handle */ static inline int lookup_dest_port(odp_packet_t pkt) { int i, src_idx; odp_pktio_t pktio_src; pktio_src = odp_packet_input(pkt); for (src_idx = -1, i = 0; gbl_args->pktios[i].pktio != ODP_PKTIO_INVALID; i++) if (gbl_args->pktios[i].pktio == pktio_src) src_idx = i; if (src_idx == -1) LOG_ABORT("Failed to determine pktio input\n"); return gbl_args->dst_port[src_idx]; } /** * Map required packet headers * * @param pkt Packet handle * @param hdr[out] Packet headers * * @retval 0 on success * @retval -1 on failure */ static inline int packet_hdr(odp_packet_t pkt, packet_hdr_t *hdr) { uint8_t *udp; uint16_t eth_type; uint8_t ihl; if (odp_unlikely(odp_packet_seg_len(pkt) < MIN_PACKET_LEN)) return -1; if (odp_unlikely(!odp_packet_has_eth(pkt))) return -1; hdr->eth = odp_packet_l2_ptr(pkt, NULL); eth_type = odp_be_to_cpu_16(hdr->eth->type); if (odp_unlikely(eth_type != ODPH_ETHTYPE_IPV4)) return -1; hdr->ipv4 = (odph_ipv4hdr_t *)(hdr->eth + 1); if (odp_unlikely(hdr->ipv4->proto != ODPH_IPPROTO_UDP)) return -1; ihl = ODPH_IPV4HDR_IHL(hdr->ipv4->ver_ihl); if (odp_unlikely(ihl < ODPH_IPV4HDR_IHL_MIN)) return -1; udp = (uint8_t *)hdr->ipv4 + (ihl * 4); hdr->udp = (odph_udphdr_t *)udp; return 0; } /** * Compute hash from a 5-tuple * * @param key IPv4 5-tuple * * @return 32-bit hash value */ static inline uint64_t calc_ipv4_5tuple_hash(ipv4_tuple5_t *tuple) { uint32_t a, b, c; a = tuple->proto + JHASH_GOLDEN_RATIO; b = tuple->src_ip + JHASH_GOLDEN_RATIO; c = tuple->dst_ip + JHASH_GOLDEN_RATIO; mix(a, b, c); a += (tuple->src_port << 16) + tuple->dst_port + JHASH_GOLDEN_RATIO; final(a, b, c); return c; } /** * Compute packet flow index * * @param hdr Packet headers * * @return Flow index */ static inline uint64_t calc_flow_idx(packet_hdr_t *hdr) { ipv4_tuple5_t tuple; uint64_t idx; tuple.dst_ip = odp_be_to_cpu_32(hdr->ipv4->dst_addr); tuple.src_ip = odp_be_to_cpu_32(hdr->ipv4->src_addr); tuple.proto = hdr->ipv4->proto; tuple.src_port = odp_be_to_cpu_16(hdr->udp->src_port); tuple.dst_port = odp_be_to_cpu_16(hdr->udp->dst_port); tuple.pad0 = 0; tuple.pad1 = 0; idx = calc_ipv4_5tuple_hash(&tuple); return idx % gbl_args->appl.num_flows; } /** * Fill packet's eth addresses according to the destination port * * @param hdr[out] Packet headers * @param dst_port Destination port */ static inline void fill_eth_addrs(packet_hdr_t *hdr, int dst_port) { hdr->eth->src = gbl_args->port_eth_addr[dst_port]; hdr->eth->dst = gbl_args->dst_eth_addr[dst_port]; } /** * Process flow queue * * @param ev_tbl Array of events * @param num Number of events in the array * @param stats Pointer for storing thread statistics * @param qcontext Source queue context * @param pktout Arrays of output queues */ static inline void process_flow(odp_event_t ev_tbl[], int num, stats_t *stats, qcontext_t *qcontext, odp_pktout_queue_t pktout[][MAX_FLOWS]) { odp_packet_t pkt; flow_t *flow; uint64_t queue_seq; int dst_if; int i; int sent; for (i = 0; i < num; i++) { pkt = odp_packet_from_event(ev_tbl[i]); flow = odp_packet_user_area(pkt); queue_seq = qcontext->seq[flow->src_idx]; /* Check sequence number */ if (gbl_args->appl.in_mode != SCHED_PARALLEL && odp_unlikely(flow->seq != queue_seq)) { printf("Invalid sequence number: packet_seq=%" PRIu64 "" " queue_seq=%" PRIu64 ", src_if=%" PRIu8 ", " "dst_if=%" PRIu8 ", flow=%" PRIu16 "\n", flow->seq, queue_seq, flow->src_idx, flow->dst_idx, flow->idx); qcontext->seq[flow->src_idx] = flow->seq + 1; stats->s.invalid_seq++; } else { qcontext->seq[flow->src_idx]++; } dst_if = flow->dst_idx; sent = odp_pktout_send(pktout[dst_if][flow->idx], &pkt, 1); if (odp_unlikely(sent != 1)) { stats->s.tx_drops++; odp_packet_free(pkt); } stats->s.packets++; } } /** * Process input queue * * @param ev_tbl Array of events * @param num Number of events in the array * @param stats Pointer for storing thread statistics * @param qcontext Source queue context */ static inline void process_input(odp_event_t ev_tbl[], int num, stats_t *stats, qcontext_t *qcontext) { flow_t *flow; flow_t *flow_tbl[MAX_PKT_BURST]; int ret; int i, j; int pkts = 0; for (i = 0; i < num; i++) { odp_packet_t pkt; packet_hdr_t hdr; int flow_idx; pkt = odp_packet_from_event(ev_tbl[i]); odp_packet_prefetch(pkt, 0, MIN_PACKET_LEN); ret = packet_hdr(pkt, &hdr); if (odp_unlikely(ret)) { odp_packet_free(pkt); stats->s.rx_drops++; continue; } flow_idx = calc_flow_idx(&hdr); fill_eth_addrs(&hdr, flow_idx); flow = odp_packet_user_area(pkt); flow->idx = flow_idx; flow->src_idx = qcontext->idx; flow->dst_idx = lookup_dest_port(pkt); flow_tbl[pkts] = flow; /* Simulate "fat pipe" processing by generating extra work */ for (j = 0; j < gbl_args->appl.extra_rounds; j++) flow->crc = dummy_hash_crc32c(odp_packet_data(pkt), odp_packet_len(pkt), 0); pkts++; } if (odp_unlikely(!pkts)) return; /* Set sequence numbers */ if (gbl_args->appl.in_mode == SCHED_ORDERED) odp_schedule_order_lock(0); for (i = 0; i < pkts; i++) { flow = flow_tbl[i]; flow->seq = qcontext->seq[flow->idx]++; } if (gbl_args->appl.in_mode == SCHED_ORDERED) odp_schedule_order_unlock(0); for (i = 0; i < pkts; i++) { flow = flow_tbl[i]; ret = odp_queue_enq(gbl_args->fqueue[flow->dst_idx][flow->idx], ev_tbl[i]); if (odp_unlikely(ret != 0)) { LOG_ERR("odp_queue_enq() failed\n"); stats->s.tx_drops++; odp_event_free(ev_tbl[i]); } else { stats->s.packets++; } } } /** * Worker thread * * @param arg Thread arguments of type 'thread_args_t *' */ static int run_worker(void *arg) { odp_event_t ev_tbl[MAX_PKT_BURST]; odp_queue_t queue; odp_pktout_queue_t pktout[MAX_PKTIOS][MAX_FLOWS]; qcontext_t *qcontext; thread_args_t *thr_args = arg; stats_t *stats = thr_args->stats; int pkts; int i, j; memset(pktout, 0, sizeof(pktout)); for (i = 0; i < gbl_args->appl.if_count; i++) { for (j = 0; j < gbl_args->appl.num_flows; j++) { pktout[i][j] = gbl_args->pktios[i].pktout[j % gbl_args->pktios[i].num_tx_queue]; } } odp_barrier_wait(&barrier); /* Loop packets */ while (!exit_threads) { pkts = odp_schedule_multi(&queue, ODP_SCHED_NO_WAIT, ev_tbl, MAX_PKT_BURST); if (pkts <= 0) continue; qcontext = odp_queue_context(queue); if (qcontext->input_queue) process_input(ev_tbl, pkts, stats, qcontext); else process_flow(ev_tbl, pkts, stats, qcontext, pktout); } /* Free remaining events in queues */ while (1) { odp_event_t ev; ev = odp_schedule(NULL, odp_schedule_wait_time(ODP_TIME_SEC_IN_NS)); if (ev == ODP_EVENT_INVALID) break; odp_event_free(ev); } return 0; } /** * Create a pktio handle and associate with input queues * * @param dev Name of device to open * @param index Pktio index * @param num_rx Number of input queues * @param num_tx Number of output queues * @param pool Pool to associate with device for packet RX/TX * * @retval 0 on success * @retval -1 on failure */ static int create_pktio(const char *dev, int idx, int num_rx, int num_tx, odp_pool_t pool) { odp_pktio_t pktio; odp_pktio_param_t pktio_param; odp_pktio_capability_t capa; odp_pktin_queue_param_t pktin_param; odp_pktout_queue_param_t pktout_param; odp_pktio_op_mode_t mode_rx; odp_pktio_op_mode_t mode_tx; int i; odp_pktio_param_init(&pktio_param); pktio_param.in_mode = ODP_PKTIN_MODE_SCHED; pktio = odp_pktio_open(dev, pool, &pktio_param); if (pktio == ODP_PKTIO_INVALID) { LOG_ERR("Error: failed to open %s\n", dev); return -1; } printf("Created pktio %" PRIu64 " (%s)\n", odp_pktio_to_u64(pktio), dev); if (odp_pktio_capability(pktio, &capa)) { LOG_ERR("Error: capability query failed %s\n", dev); odp_pktio_close(pktio); return -1; } odp_pktin_queue_param_init(&pktin_param); odp_pktout_queue_param_init(&pktout_param); mode_tx = ODP_PKTIO_OP_MT; mode_rx = ODP_PKTIO_OP_MT; if (gbl_args->appl.in_mode == SCHED_ATOMIC) { pktin_param.queue_param.sched.sync = ODP_SCHED_SYNC_ATOMIC; } else if (gbl_args->appl.in_mode == SCHED_PARALLEL) { pktin_param.queue_param.sched.sync = ODP_SCHED_SYNC_PARALLEL; } else { pktin_param.queue_param.sched.sync = ODP_SCHED_SYNC_ORDERED; pktin_param.queue_param.sched.lock_count = 1; } pktin_param.queue_param.sched.prio = ODP_SCHED_PRIO_DEFAULT; pktin_param.queue_param.sched.group = ODP_SCHED_GROUP_ALL; if (num_rx > (int)capa.max_input_queues) { printf("Allocating %i shared input queues, %i requested\n", capa.max_input_queues, num_rx); num_rx = capa.max_input_queues; mode_rx = ODP_PKTIO_OP_MT; } if (num_tx > (int)capa.max_output_queues) { printf("Allocating %i shared output queues, %i requested\n", capa.max_output_queues, num_tx); num_tx = capa.max_output_queues; mode_tx = ODP_PKTIO_OP_MT; } pktin_param.hash_enable = 1; pktin_param.hash_proto.proto.ipv4_udp = 1; pktin_param.num_queues = num_rx; pktin_param.op_mode = mode_rx; pktout_param.op_mode = mode_tx; pktout_param.num_queues = num_tx; if (odp_pktin_queue_config(pktio, &pktin_param)) { LOG_ERR("Error: input queue config failed %s\n", dev); return -1; } if (odp_pktout_queue_config(pktio, &pktout_param)) { LOG_ERR("Error: output queue config failed %s\n", dev); return -1; } if (odp_pktin_event_queue(pktio, gbl_args->pktios[idx].pktin, num_rx) != num_rx) { LOG_ERR("Error: pktin event queue query failed %s\n", dev); return -1; } /* Set queue contexts */ for (i = 0; i < num_rx; i++) { gbl_args->input_qcontext[idx][i].idx = idx; gbl_args->input_qcontext[idx][i].input_queue = 1; if (odp_queue_context_set(gbl_args->pktios[idx].pktin[i], &gbl_args->input_qcontext[idx][i], sizeof(qcontext_t))) { LOG_ERR("Error: pktin queue context set failed %s\n", dev); return -1; } } if (odp_pktout_queue(pktio, gbl_args->pktios[idx].pktout, num_tx) != num_tx) { LOG_ERR("Error: pktout queue query failed %s\n", dev); return -1; } printf("Created %i input and %i output queues on (%s)\n", num_rx, num_tx, dev); gbl_args->pktios[idx].num_rx_queue = num_rx; gbl_args->pktios[idx].num_tx_queue = num_tx; gbl_args->pktios[idx].pktio = pktio; return 0; } /** * Print statistics * * @param num_workers Number of worker threads * @param thr_stats Pointer to stats storage * @param duration Number of seconds to loop in * @param timeout Number of seconds for stats calculation * */ static int print_speed_stats(int num_workers, stats_t *thr_stats, int duration, int timeout) { uint64_t pkts = 0; uint64_t pkts_prev = 0; uint64_t pps; uint64_t rx_drops, tx_drops, invalid_seq; uint64_t maximum_pps = 0; int i; int elapsed = 0; int stats_enabled = 1; int loop_forever = (duration == 0); if (timeout <= 0) { stats_enabled = 0; timeout = 1; } /* Wait for all threads to be ready*/ odp_barrier_wait(&barrier); do { pkts = 0; rx_drops = 0; tx_drops = 0; invalid_seq = 0; sleep(timeout); for (i = 0; i < num_workers; i++) { pkts += thr_stats[i].s.packets; rx_drops += thr_stats[i].s.rx_drops; tx_drops += thr_stats[i].s.tx_drops; invalid_seq += thr_stats[i].s.invalid_seq; } if (stats_enabled) { pps = (pkts - pkts_prev) / timeout; if (pps > maximum_pps) maximum_pps = pps; printf("%" PRIu64 " pps, %" PRIu64 " max pps, ", pps, maximum_pps); printf("%" PRIu64 " rx drops, %" PRIu64 " tx drops, ", rx_drops, tx_drops); printf("%" PRIu64 " invalid seq\n", invalid_seq); pkts_prev = pkts; } elapsed += timeout; } while (loop_forever || (elapsed < duration)); if (stats_enabled) printf("TEST RESULT: %" PRIu64 " maximum packets per second.\n", maximum_pps); return (pkts > 100 && !invalid_seq) ? 0 : -1; } /** * Find the destination port for a given input port * * @param port Input port index */ static int find_dest_port(int port) { /* Even number of ports */ if (gbl_args->appl.if_count % 2 == 0) return (port % 2 == 0) ? port + 1 : port - 1; /* Odd number of ports */ if (port == gbl_args->appl.if_count - 1) return 0; else return port + 1; } /** * Initialize port forwarding table */ static void init_forwarding_tbl(void) { int rx_idx; for (rx_idx = 0; rx_idx < gbl_args->appl.if_count; rx_idx++) gbl_args->dst_port[rx_idx] = find_dest_port(rx_idx); } /** * Prinf usage information */ static void usage(char *progname) { printf("\n" "OpenDataPlane ordered pktio application.\n" "\n" "Usage: %s OPTIONS\n" " E.g. %s -i eth0,eth1\n" " In the above example,\n" " eth0 will send pkts to eth1 and vice versa\n" "\n" "Mandatory OPTIONS:\n" " -i, --interface Eth interfaces (comma-separated, no spaces)\n" " Interface count min 1, max %i\n" "\n" "Optional OPTIONS:\n" " -m, --mode Packet input mode\n" " 0: Scheduled ordered queues (default)\n" " 1: Scheduled atomic queues\n" " 2: Scheduled parallel queues (packet order not maintained)\n" " -r, --num_rx_q Number of RX queues per interface\n" " -f, --num_flows Number of packet flows\n" " -e, --extra_input Number of extra input processing rounds\n" " -c, --count CPU count.\n" " -t, --time Time in seconds to run.\n" " -a, --accuracy Statistics print interval in seconds\n" " (default is 1 second).\n" " -d, --dst_addr Destination addresses (comma-separated, no spaces)\n" " -h, --help Display help and exit.\n\n" "\n", NO_PATH(progname), NO_PATH(progname), MAX_PKTIOS ); } /** * Parse and store the command line arguments * * @param argc argument count * @param argv[] argument vector * @param appl_args Store application arguments here */ static void parse_args(int argc, char *argv[], appl_args_t *appl_args) { int opt; int long_index; char *token; char *addr_str; size_t len; int i; static const struct option longopts[] = { {"count", required_argument, NULL, 'c'}, {"time", required_argument, NULL, 't'}, {"accuracy", required_argument, NULL, 'a'}, {"interface", required_argument, NULL, 'i'}, {"mode", required_argument, NULL, 'm'}, {"dst_addr", required_argument, NULL, 'd'}, {"num_rx_q", required_argument, NULL, 'r'}, {"num_flows", required_argument, NULL, 'f'}, {"extra_input", required_argument, NULL, 'e'}, {"help", no_argument, NULL, 'h'}, {NULL, 0, NULL, 0} }; static const char *shortopts = "+c:+t:+a:i:m:d:r:f:e:h"; /* let helper collect its own arguments (e.g. --odph_proc) */ odph_parse_options(argc, argv, shortopts, longopts); appl_args->time = 0; /* loop forever if time to run is 0 */ appl_args->accuracy = DEF_STATS_INT; appl_args->num_rx_q = DEF_NUM_RX_QUEUES; appl_args->num_flows = DEF_NUM_FLOWS; appl_args->extra_rounds = DEF_EXTRA_ROUNDS; opterr = 0; /* do not issue errors on helper options */ while (1) { opt = getopt_long(argc, argv, shortopts, longopts, &long_index); if (opt == -1) break; /* No more options */ switch (opt) { case 'c': appl_args->cpu_count = atoi(optarg); break; case 't': appl_args->time = atoi(optarg); break; case 'a': appl_args->accuracy = atoi(optarg); break; /* parse packet-io interface names */ case 'd': len = strlen(optarg); if (len == 0) { usage(argv[0]); exit(EXIT_FAILURE); } len += 1; /* add room for '\0' */ addr_str = malloc(len); if (addr_str == NULL) { usage(argv[0]); exit(EXIT_FAILURE); } /* store the mac addresses names */ strcpy(addr_str, optarg); for (token = strtok(addr_str, ","), i = 0; token != NULL; token = strtok(NULL, ","), i++) { if (i >= MAX_PKTIOS) { printf("too many MAC addresses\n"); usage(argv[0]); exit(EXIT_FAILURE); } if (odph_eth_addr_parse(&appl_args->addrs[i], token) != 0) { printf("invalid MAC address\n"); usage(argv[0]); exit(EXIT_FAILURE); } } appl_args->addr_count = i; if (appl_args->addr_count < 1) { usage(argv[0]); exit(EXIT_FAILURE); } free(addr_str); break; case 'i': len = strlen(optarg); if (len == 0) { usage(argv[0]); exit(EXIT_FAILURE); } len += 1; /* add room for '\0' */ appl_args->if_str = malloc(len); if (appl_args->if_str == NULL) { usage(argv[0]); exit(EXIT_FAILURE); } /* count the number of tokens separated by ',' */ strcpy(appl_args->if_str, optarg); for (token = strtok(appl_args->if_str, ","), i = 0; token != NULL; token = strtok(NULL, ","), i++) ; appl_args->if_count = i; if (appl_args->if_count < 1 || appl_args->if_count > MAX_PKTIOS) { usage(argv[0]); exit(EXIT_FAILURE); } /* allocate storage for the if names */ appl_args->if_names = calloc(appl_args->if_count, sizeof(char *)); /* store the if names (reset names string) */ strcpy(appl_args->if_str, optarg); for (token = strtok(appl_args->if_str, ","), i = 0; token != NULL; token = strtok(NULL, ","), i++) { appl_args->if_names[i] = token; } break; case 'm': i = atoi(optarg); if (i == 1) appl_args->in_mode = SCHED_ATOMIC; else if (i == 2) appl_args->in_mode = SCHED_PARALLEL; else appl_args->in_mode = SCHED_ORDERED; break; case 'r': appl_args->num_rx_q = atoi(optarg); break; case 'f': appl_args->num_flows = atoi(optarg); break; case 'e': appl_args->extra_rounds = atoi(optarg); break; case 'h': usage(argv[0]); exit(EXIT_SUCCESS); break; default: break; } } if (appl_args->cpu_count > MAX_WORKERS) { printf("Too many workers requested %d, max: %d\n", appl_args->cpu_count, MAX_WORKERS); exit(EXIT_FAILURE); } if (appl_args->num_flows > MAX_FLOWS) { printf("Too many flows requested %d, max: %d\n", appl_args->num_flows, MAX_FLOWS); exit(EXIT_FAILURE); } if (appl_args->if_count == 0 || appl_args->num_flows == 0 || appl_args->num_rx_q == 0) { usage(argv[0]); exit(EXIT_FAILURE); } if (appl_args->addr_count != 0 && appl_args->addr_count != appl_args->if_count) { printf("Number of destination addresses differs from number" " of interfaces\n"); usage(argv[0]); exit(EXIT_FAILURE); } optind = 1; /* reset 'extern optind' from the getopt lib */ } /** * Print system and application info */ static void print_info(char *progname, appl_args_t *appl_args) { int i; printf("\n" "ODP system info\n" "---------------\n" "ODP API version: %s\n" "ODP impl name: %s\n" "CPU model: %s\n" "CPU freq (hz): %" PRIu64 "\n" "Cache line size: %i\n" "CPU count: %i\n" "\n", odp_version_api_str(), odp_version_impl_name(), odp_cpu_model_str(), odp_cpu_hz_max(), odp_sys_cache_line_size(), odp_cpu_count()); printf("Running ODP appl: \"%s\"\n" "-----------------\n" "IF-count: %i\n" "Using IFs: ", progname, appl_args->if_count); for (i = 0; i < appl_args->if_count; ++i) printf(" %s", appl_args->if_names[i]); printf("\n\n"); fflush(NULL); } static void gbl_args_init(args_t *args) { int pktio, queue; memset(args, 0, sizeof(args_t)); for (pktio = 0; pktio < MAX_PKTIOS; pktio++) { args->pktios[pktio].pktio = ODP_PKTIO_INVALID; for (queue = 0; queue < MAX_QUEUES; queue++) args->pktios[pktio].pktin[queue] = ODP_QUEUE_INVALID; } } /** * ODP ordered pktio application */ int main(int argc, char *argv[]) { odp_cpumask_t cpumask; odp_instance_t instance; odp_pool_t pool; odp_pool_param_t params; odp_shm_t shm; odp_queue_capability_t capa; odph_ethaddr_t new_addr; odph_odpthread_t thread_tbl[MAX_WORKERS]; stats_t *stats; char cpumaskstr[ODP_CPUMASK_STR_SIZE]; int cpu; int i, j; int if_count; int ret; int num_workers; int in_mode; /* Init ODP before calling anything else */ if (odp_init_global(&instance, NULL, NULL)) { LOG_ERR("Error: ODP global init failed.\n"); exit(EXIT_FAILURE); } /* Init this thread */ if (odp_init_local(instance, ODP_THREAD_CONTROL)) { LOG_ERR("Error: ODP local init failed.\n"); exit(EXIT_FAILURE); } /* Reserve memory for args from shared mem */ shm = odp_shm_reserve("shm_args", sizeof(args_t), ODP_CACHE_LINE_SIZE, 0); if (shm == ODP_SHM_INVALID) { LOG_ERR("Error: shared mem reserve failed.\n"); exit(EXIT_FAILURE); } gbl_args = odp_shm_addr(shm); if (gbl_args == NULL) { LOG_ERR("Error: shared mem alloc failed.\n"); odp_shm_free(shm); exit(EXIT_FAILURE); } gbl_args_init(gbl_args); /* Parse and store the application arguments */ parse_args(argc, argv, &gbl_args->appl); if (gbl_args->appl.in_mode == SCHED_ORDERED) { /* At least one ordered lock required */ odp_queue_capability(&capa); if (capa.max_ordered_locks < 1) { LOG_ERR("Error: Ordered locks not available.\n"); exit(EXIT_FAILURE); } } /* Print both system and application information */ print_info(NO_PATH(argv[0]), &gbl_args->appl); /* Default to system CPU count unless user specified */ num_workers = MAX_WORKERS; if (gbl_args->appl.cpu_count) num_workers = gbl_args->appl.cpu_count; /* Get default worker cpumask */ num_workers = odp_cpumask_default_worker(&cpumask, num_workers); (void)odp_cpumask_to_str(&cpumask, cpumaskstr, sizeof(cpumaskstr)); if_count = gbl_args->appl.if_count; printf("Num worker threads: %i\n", num_workers); printf("First CPU: %i\n", odp_cpumask_first(&cpumask)); printf("CPU mask: %s\n\n", cpumaskstr); /* Create packet pool */ odp_pool_param_init(¶ms); params.pkt.seg_len = PKT_POOL_BUF_SIZE; params.pkt.len = PKT_POOL_BUF_SIZE; params.pkt.num = PKT_POOL_SIZE; params.pkt.uarea_size = PKT_UAREA_SIZE; params.type = ODP_POOL_PACKET; pool = odp_pool_create("packet pool", ¶ms); if (pool == ODP_POOL_INVALID) { LOG_ERR("Error: packet pool create failed.\n"); exit(EXIT_FAILURE); } odp_pool_print(pool); init_forwarding_tbl(); for (i = 0; i < if_count; ++i) { const char *dev = gbl_args->appl.if_names[i]; int num_rx, num_tx; num_rx = gbl_args->appl.num_rx_q; num_tx = gbl_args->appl.num_flows; if (create_pktio(dev, i, num_rx, num_tx, pool)) exit(EXIT_FAILURE); /* Save interface ethernet address */ if (odp_pktio_mac_addr(gbl_args->pktios[i].pktio, gbl_args->port_eth_addr[i].addr, ODPH_ETHADDR_LEN) != ODPH_ETHADDR_LEN) { LOG_ERR("Error: interface ethernet address unknown\n"); exit(EXIT_FAILURE); } odp_pktio_print(gbl_args->pktios[i].pktio); /* Save destination eth address */ /* 02:00:00:00:00:XX */ memset(&new_addr, 0, sizeof(odph_ethaddr_t)); if (gbl_args->appl.addr_count) { memcpy(&new_addr, &gbl_args->appl.addrs[i], sizeof(odph_ethaddr_t)); } else { new_addr.addr[0] = 0x02; new_addr.addr[5] = i; } gbl_args->dst_eth_addr[i] = new_addr; } gbl_args->pktios[i].pktio = ODP_PKTIO_INVALID; /* Allocate the same number of flows to each interface */ for (i = 0; i < if_count; i++) { odp_pktio_capability_t capa; if (odp_pktio_capability(gbl_args->pktios[i].pktio, &capa)) { LOG_ERR("Error: pktio capability failed.\n"); exit(EXIT_FAILURE); } if ((unsigned)gbl_args->appl.num_flows > capa.max_output_queues) gbl_args->appl.num_flows = capa.max_output_queues; } /* Create atomic queues for packet tagging */ for (i = 0; i < if_count; i++) { for (j = 0; j < gbl_args->appl.num_flows; j++) { odp_queue_t queue; odp_queue_param_t qparam; char qname[ODP_QUEUE_NAME_LEN]; snprintf(qname, sizeof(qname), "flow_%d_%d", i, j); odp_queue_param_init(&qparam); qparam.type = ODP_QUEUE_TYPE_SCHED; qparam.sched.prio = ODP_SCHED_PRIO_DEFAULT; qparam.sched.sync = ODP_SCHED_SYNC_ATOMIC; qparam.sched.group = ODP_SCHED_GROUP_ALL; gbl_args->flow_qcontext[i][j].idx = i; gbl_args->flow_qcontext[i][j].input_queue = 0; qparam.context = &gbl_args->flow_qcontext[i][j]; qparam.context_len = sizeof(qcontext_t); queue = odp_queue_create(qname, &qparam); if (queue == ODP_QUEUE_INVALID) { LOG_ERR("Error: flow queue create failed.\n"); exit(EXIT_FAILURE); } gbl_args->fqueue[i][j] = queue; } } in_mode = gbl_args->appl.in_mode; printf("\nApplication parameters\n" "----------------------\n" "Input queues: %d\n" "Mode: %s\n" "Flows: %d\n" "Extra rounds: %d\n\n", gbl_args->appl.num_rx_q, (in_mode == SCHED_ATOMIC) ? "PKTIN_SCHED_ATOMIC" : (in_mode == SCHED_PARALLEL ? "PKTIN_SCHED_PARALLEL" : "PKTIN_SCHED_ORDERED"), gbl_args->appl.num_flows, gbl_args->appl.extra_rounds); memset(thread_tbl, 0, sizeof(thread_tbl)); stats = gbl_args->stats; odp_barrier_init(&barrier, num_workers + 1); /* Create worker threads */ cpu = odp_cpumask_first(&cpumask); for (i = 0; i < num_workers; ++i) { odp_cpumask_t thd_mask; odph_odpthread_params_t thr_params; memset(&thr_params, 0, sizeof(thr_params)); thr_params.start = run_worker; thr_params.arg = &gbl_args->thread[i]; thr_params.thr_type = ODP_THREAD_WORKER; thr_params.instance = instance; gbl_args->thread[i].stats = &stats[i]; odp_cpumask_zero(&thd_mask); odp_cpumask_set(&thd_mask, cpu); odph_odpthreads_create(&thread_tbl[i], &thd_mask, &thr_params); cpu = odp_cpumask_next(&cpumask, cpu); } /* Start packet receive and transmit */ for (i = 0; i < if_count; ++i) { odp_pktio_t pktio; pktio = gbl_args->pktios[i].pktio; ret = odp_pktio_start(pktio); if (ret) { LOG_ERR("Error: unable to start %s\n", gbl_args->appl.if_names[i]); exit(EXIT_FAILURE); } } ret = print_speed_stats(num_workers, stats, gbl_args->appl.time, gbl_args->appl.accuracy); /* Stop receiving new packet */ for (i = 0; i < if_count; i++) odp_pktio_stop(gbl_args->pktios[i].pktio); exit_threads = 1; /* Master thread waits for other threads to exit */ for (i = 0; i < num_workers; ++i) odph_odpthreads_join(&thread_tbl[i]); for (i = 0; i < if_count; i++) { odp_pktio_close(gbl_args->pktios[i].pktio); for (j = 0; j < gbl_args->appl.num_flows; j++) odp_queue_destroy(gbl_args->fqueue[i][j]); } free(gbl_args->appl.if_names); free(gbl_args->appl.if_str); if (odp_pool_destroy(pool)) { LOG_ERR("Error: pool destroy\n"); exit(EXIT_FAILURE); } if (odp_shm_free(shm)) { LOG_ERR("Error: shm free\n"); exit(EXIT_FAILURE); } if (odp_term_local()) { LOG_ERR("Error: term local\n"); exit(EXIT_FAILURE); } if (odp_term_global(instance)) { LOG_ERR("Error: term global\n"); exit(EXIT_FAILURE); } return ret; }