/* Copyright (c) 2019, Nokia * All rights reserved. * * SPDX-License-Identifier: BSD-3-Clause */ #include #include #include #include #include #include #include #include #define POOL_PKT_NUM 8192 #define POOL_PKT_LEN 1536 #define MAX_PKT_BURST 32 /* Three threads required for RX, TX and statistics */ #define MAX_WORKERS (ODP_THREAD_COUNT_MAX - 3) #define QUEUE_SIZE 1024 #define MAX_PKTIOS 2 #define DUMMY_HASH 1234567890 /* Get rid of path in filename - only for unix-type paths using '/' */ #define NO_PATH(file_name) (strrchr((file_name), '/') ? \ strrchr((file_name), '/') + 1 : (file_name)) /* Statistics */ typedef union ODP_ALIGNED_CACHE { struct { uint64_t pps; /* Packet per second */ uint64_t rx_cnt; /* RX packets */ uint64_t tx_cnt; /* TX packets */ uint64_t rx_drops; /* Dropped packets on RX */ uint64_t tx_drops; /* Dropped packets on TX */ } s; uint8_t padding[ODP_CACHE_LINE_SIZE]; } stats_t; /* Thread specific data */ typedef struct thread_args_t { odp_queue_t rx_queue; odp_queue_t tx_queue; stats_t stats; } thread_args_t; /* Parsed command line application arguments */ typedef struct { char **if_names; /* Array of pointers to interface names */ odph_ethaddr_t dst_addr; /* Destination MAC address */ int accuracy; /* Statistics print interval in seconds */ int extra_work; /* Add extra processing to worker stage */ int dst_change; /* Change destination eth address */ int src_change; /* Change source eth address */ int dst_set; /* Custom destination eth address given */ int time; /* Time in seconds to run. */ int num_workers; /* Number of pipeline worker stages */ char *if_str; /* Storage for interface names */ } appl_args_t; /* Global application data */ typedef struct { odp_queue_t queue[ODP_THREAD_COUNT_MAX]; /* Thread specific arguments */ thread_args_t thread[ODP_THREAD_COUNT_MAX]; /* Barriers to synchronize main and workers */ odp_barrier_t init_barrier; odp_barrier_t term_barrier; /* Pktio interfaces */ odp_pktio_t if0, if1; odp_pktin_queue_t if0in, if1in; odp_pktout_queue_t if0out, if1out; odph_ethaddr_t src_addr; /* Source MAC address */ odph_ethaddr_t dst_addr; /* Destination MAC address */ odp_atomic_u32_t exit_threads; /* Application (parsed) arguments */ appl_args_t appl; } global_data_t; static global_data_t *global; static void sig_handler(int signo ODP_UNUSED) { odp_atomic_store_u32(&global->exit_threads, 1); } static odp_pktio_t create_pktio(const char *name, odp_pool_t pool, odp_pktin_queue_t *pktin, odp_pktout_queue_t *pktout) { odp_pktio_param_t pktio_param; odp_pktin_queue_param_t in_param; odp_pktout_queue_param_t out_param; odp_pktio_t pktio; odp_pktio_config_t config; odp_pktio_param_init(&pktio_param); pktio = odp_pktio_open(name, pool, &pktio_param); if (pktio == ODP_PKTIO_INVALID) { printf("Error: failed to open %s\n", name); exit(1); } odp_pktio_config_init(&config); config.parser.layer = ODP_PROTO_LAYER_L2; odp_pktio_config(pktio, &config); odp_pktin_queue_param_init(&in_param); odp_pktout_queue_param_init(&out_param); in_param.op_mode = ODP_PKTIO_OP_MT_UNSAFE; if (odp_pktin_queue_config(pktio, &in_param)) { printf("Error: failed to config input queue for %s\n", name); exit(1); } out_param.op_mode = ODP_PKTIO_OP_MT_UNSAFE; if (odp_pktout_queue_config(pktio, &out_param)) { printf("Error: failed to config output queue for %s\n", name); exit(1); } if (odp_pktin_queue(pktio, pktin, 1) != 1) { printf("Error: pktin queue query failed for %s\n", name); exit(1); } if (odp_pktout_queue(pktio, pktout, 1) != 1) { printf("Error: pktout queue query failed for %s\n", name); exit(1); } return pktio; } /* * Fill packets' eth addresses and convert packets to events * * pkt_tbl Array of packets * event_tbl[out] Array of events * num Number of packets in the array */ static inline unsigned int prep_events(odp_packet_t pkt_tbl[], odp_event_t event_tbl[], unsigned int num) { unsigned int i; unsigned int events = 0; if (!global->appl.dst_change && !global->appl.src_change) { odp_packet_to_event_multi(pkt_tbl, event_tbl, num); return num; } for (i = 0; i < num; ++i) { odp_packet_t pkt = pkt_tbl[i]; odph_ethhdr_t *eth; odp_packet_prefetch(pkt, 0, ODPH_ETHHDR_LEN); if (odp_unlikely(!odp_packet_has_eth(pkt))) { odp_packet_free(pkt); continue; } eth = odp_packet_data(pkt); if (global->appl.src_change) eth->src = global->src_addr; if (global->appl.dst_change) eth->dst = global->dst_addr; event_tbl[events++] = odp_packet_to_event(pkt); } return events; } static inline int rx_thread(void *arg) { thread_args_t *thr_args = arg; odp_event_t event_tbl[MAX_PKT_BURST]; odp_packet_t pkt_tbl[MAX_PKT_BURST]; odp_pktin_queue_t pktin_queue = global->if0in; odp_queue_t out_queue = thr_args->tx_queue; stats_t *stats = &thr_args->stats; int pkts, events, sent, drops; odp_barrier_wait(&global->init_barrier); while (!odp_atomic_load_u32(&global->exit_threads)) { pkts = odp_pktin_recv(pktin_queue, pkt_tbl, MAX_PKT_BURST); if (odp_unlikely(pkts <= 0)) continue; stats->s.rx_cnt += pkts; events = prep_events(pkt_tbl, event_tbl, pkts); drops = events - pkts; if (odp_unlikely(drops)) stats->s.rx_drops += pkts - events; sent = odp_queue_enq_multi(out_queue, event_tbl, events); if (odp_unlikely(sent < 0)) sent = 0; stats->s.tx_cnt += sent; drops = events - sent; if (odp_unlikely(drops)) { stats->s.tx_drops += drops; odp_packet_free_multi(&pkt_tbl[sent], drops); } } /* Wait until pktio devices are stopped */ odp_barrier_wait(&global->term_barrier); return 0; } static inline int tx_thread(void *arg) { thread_args_t *thr_args = arg; odp_event_t event_tbl[MAX_PKT_BURST]; odp_packet_t pkt_tbl[MAX_PKT_BURST]; odp_queue_t rx_queue = thr_args->rx_queue; odp_pktout_queue_t pktout_queue = global->if1out; stats_t *stats = &thr_args->stats; int events, sent, tx_drops; odp_barrier_wait(&global->init_barrier); while (!odp_atomic_load_u32(&global->exit_threads)) { events = odp_queue_deq_multi(rx_queue, event_tbl, MAX_PKT_BURST); if (odp_unlikely(events <= 0)) continue; stats->s.rx_cnt += events; odp_packet_from_event_multi(pkt_tbl, event_tbl, events); sent = odp_pktout_send(pktout_queue, pkt_tbl, events); if (odp_unlikely(sent < 0)) sent = 0; stats->s.tx_cnt += sent; tx_drops = events - sent; if (odp_unlikely(tx_drops)) { stats->s.tx_drops += tx_drops; odp_packet_free_multi(&pkt_tbl[sent], tx_drops); } } /* Wait until pktio devices are stopped */ odp_barrier_wait(&global->term_barrier); /* Empty queue before exiting */ events = 1; while (events > 0) { events = odp_queue_deq_multi(rx_queue, event_tbl, MAX_PKT_BURST); if (events > 0) odp_event_free_multi(event_tbl, events); } return 0; } /* * Work on packets */ static inline void work_on_events(odp_event_t event_tbl[], unsigned int num) { unsigned int i; for (i = 0; i < num; i++) { odp_packet_t pkt = odp_packet_from_event(event_tbl[i]); if (odp_hash_crc32c(odp_packet_data(pkt), odp_packet_seg_len(pkt), 123) == DUMMY_HASH) printf("Dummy hash match\n"); } } static inline int worker_thread(void *arg ODP_UNUSED) { thread_args_t *thr_args = arg; odp_event_t event_tbl[MAX_PKT_BURST]; stats_t *stats = &thr_args->stats; odp_queue_t rx_queue = thr_args->rx_queue; odp_queue_t tx_queue = thr_args->tx_queue; int events, sent, tx_drops; int extra_work = global->appl.extra_work; odp_barrier_wait(&global->init_barrier); while (!odp_atomic_load_u32(&global->exit_threads)) { events = odp_queue_deq_multi(rx_queue, event_tbl, MAX_PKT_BURST); if (odp_unlikely(events <= 0)) continue; stats->s.rx_cnt += events; if (extra_work) work_on_events(event_tbl, events); sent = odp_queue_enq_multi(tx_queue, event_tbl, events); if (odp_unlikely(sent < 0)) sent = 0; stats->s.tx_cnt += sent; tx_drops = events - sent; if (odp_unlikely(tx_drops)) { stats->s.tx_drops += tx_drops; odp_event_free_multi(&event_tbl[sent], tx_drops); } } /* Wait until pktio devices are stopped */ odp_barrier_wait(&global->term_barrier); /* Empty queue before exiting */ events = 1; while (events > 0) { events = odp_queue_deq_multi(rx_queue, event_tbl, MAX_PKT_BURST); if (events > 0) odp_event_free_multi(event_tbl, events); } return 0; } static int setup_thread_masks(odp_cpumask_t *thr_mask_rx, odp_cpumask_t *thr_mask_tx, odp_cpumask_t *thr_mask_workers, int num_workers) { odp_cpumask_t cpumask; int num_threads = 0; int i, cpu; if (num_workers > MAX_WORKERS) { printf("Worker count limited to MAX_WORKERS define (=%d)\n", MAX_WORKERS); num_workers = MAX_WORKERS; } /* Two threads required for RX and TX*/ num_threads = num_workers + 2; num_workers = odp_cpumask_default_worker(&cpumask, num_threads); if (num_workers != num_threads) { printf("Error: Not enough available CPU cores: %d/%d\n", num_workers, num_threads); exit(1); } odp_cpumask_zero(thr_mask_rx); odp_cpumask_zero(thr_mask_tx); odp_cpumask_zero(thr_mask_workers); cpu = odp_cpumask_first(&cpumask); for (i = 0; i < num_threads; i++) { if (i == 0) odp_cpumask_set(thr_mask_rx, cpu); else if (i == 1) odp_cpumask_set(thr_mask_tx, cpu); else odp_cpumask_set(thr_mask_workers, cpu); cpu = odp_cpumask_next(&cpumask, cpu); } return num_threads; } /* * Print statistics * * num_workers Number of worker threads * thr_stats Pointers to stats storage * duration Number of seconds to loop in * timeout Number of seconds for stats calculation */ static int print_speed_stats(int num_workers, stats_t **thr_stats, int duration, int timeout) { uint64_t total_pkts = 0; uint64_t pkts_prev = 0; uint64_t maximum_pps = 0; stats_t thr_stats_prev[num_workers]; int i; int elapsed = 0; int stats_enabled = 1; int loop_forever = (duration == 0); memset(thr_stats_prev, 0, sizeof(thr_stats_prev)); if (timeout <= 0) { stats_enabled = 0; timeout = 1; } /* Wait for all threads to be ready*/ odp_barrier_wait(&global->init_barrier); do { uint64_t total_rx_drops = 0; uint64_t total_tx_drops = 0; uint64_t pps; sleep(timeout); for (i = 0; i < num_workers; i++) { uint64_t rx_cnt = thr_stats[i]->s.rx_cnt; uint64_t tx_cnt = thr_stats[i]->s.tx_cnt; uint64_t rx_drops = thr_stats[i]->s.rx_drops; uint64_t tx_drops = thr_stats[i]->s.tx_drops; /* Count only transmitted packets */ if (i == (num_workers - 1)) total_pkts = tx_cnt; total_rx_drops += rx_drops; total_tx_drops += tx_drops; pps = (tx_cnt - thr_stats_prev[i].s.tx_cnt) / timeout; thr_stats_prev[i].s.pps = pps; thr_stats_prev[i].s.rx_cnt = rx_cnt; thr_stats_prev[i].s.tx_cnt = tx_cnt; thr_stats_prev[i].s.rx_drops = rx_drops; thr_stats_prev[i].s.tx_drops = tx_drops; } if (stats_enabled) { printf("----------------------------------------\n"); for (i = 0; i < num_workers; i++) { if (i == 0) printf("RX thread: "); else if (i == (num_workers - 1)) printf("TX thread: "); else printf("Worker %d: ", i - 1); printf("%" PRIu64 " pps, " "%" PRIu64 " rx drops, " "%" PRIu64 " tx drops\n", thr_stats_prev[i].s.pps, thr_stats_prev[i].s.rx_drops, thr_stats_prev[i].s.tx_drops); } pps = (total_pkts - pkts_prev) / timeout; if (pps > maximum_pps) maximum_pps = pps; printf("TOTAL: %" PRIu64 " pps, " "%" PRIu64 " rx drops, " "%" PRIu64 " tx drops, " "%" PRIu64 " max pps\n", pps, total_rx_drops, total_tx_drops, maximum_pps); pkts_prev = total_pkts; } elapsed += timeout; } while (!odp_atomic_load_u32(&global->exit_threads) && (loop_forever || (elapsed < duration))); if (stats_enabled) printf("TEST RESULT: %" PRIu64 " maximum packets per second.\n", maximum_pps); return total_pkts > 0 ? 0 : -1; } /* * Print system and application info */ static void print_info(char *progname, appl_args_t *appl_args) { odp_sys_info_print(); printf("Running ODP appl: \"%s\"\n" "-----------------\n" "Using IFs: %s %s\n" "Worker stages: %d\n" "Extra work: %d\n\n", progname, appl_args->if_names[0], appl_args->if_names[1], appl_args->num_workers, appl_args->extra_work); fflush(NULL); } /* * Print usage information */ static void usage(char *progname) { printf("\n" "OpenDataPlane simple pipeline example application.\n" "\n" "Usage: %s [options]\n" "\n" " E.g. %s -i eth0,eth1 -e -w 3\n\n" " ---- ---- ---- ---- ----\n" " | RX | -> | W1 | -> | W2 | -> | W3 | -> | TX |\n" " ---- ---- ---- ---- ----\n\n" " In the above example,\n" " each application stage is executed by a separate CPU thread and the stages\n" " are connected using plain queues. The RX stage receives packets from eth0 and\n" " enqueues them to the first worker stage (W1). The workers stages calculate\n" " CRC-32C over packet data. After the final worker stage (W3) has processed\n" " packets they are enqueued to the TX stage, which transmits the packets out\n" " from interface eth1.\n" "\n" "Mandatory OPTIONS:\n" " -i, --interface Two eth interfaces (comma-separated, no spaces)\n" "\n" "Optional OPTIONS:\n" " -a, --accuracy Time in seconds get print statistics\n" " (default is 10 seconds).\n" " -d, --dst_change 0: Don't change packets' dst eth addresses\n" " 1: Change packets' dst eth addresses (default)\n" " -s, --src_change 0: Don't change packets' src eth addresses\n" " 1: Change packets' src eth addresses (default)\n" " -r, --dst_addr Destination address\n" " Requires also the -d flag to be set\n" " -t, --time Time in seconds to run\n" " -w, --workers Number of worker stages (default 0)\n" " -e, --extra-work Calculate CRC-32C over packet data in worker stage\n" " -h, --help Display help and exit\n\n" "\n", NO_PATH(progname), NO_PATH(progname) ); } /* * Parse and store the command line arguments * * argc Argument count * argv Argument vector * appl_args[out] Storage for application arguments */ static void parse_args(int argc, char *argv[], appl_args_t *appl_args) { char *token; size_t len; int opt; int long_index; int i; int if_count = 0; static const struct option longopts[] = { {"accuracy", required_argument, NULL, 'a'}, {"extra-work", no_argument, NULL, 'e'}, {"dst_addr", required_argument, NULL, 'r'}, {"dst_change", required_argument, NULL, 'd'}, {"src_change", required_argument, NULL, 's'}, {"interface", required_argument, NULL, 'i'}, {"time", required_argument, NULL, 't'}, {"workers", required_argument, NULL, 'w'}, {"help", no_argument, NULL, 'h'}, {NULL, 0, NULL, 0} }; static const char *shortopts = "+a:d:er:s:t:i:w:h"; appl_args->accuracy = 10; /* get and print pps stats second */ appl_args->dst_change = 1; /* change eth dst address by default */ appl_args->src_change = 1; /* change eth src address by default */ appl_args->time = 0; /* loop forever if time to run is 0 */ appl_args->extra_work = 0; while (1) { opt = getopt_long(argc, argv, shortopts, longopts, &long_index); if (opt == -1) break; /* No more options */ switch (opt) { case 'a': appl_args->accuracy = atoi(optarg); break; case 'd': appl_args->dst_change = atoi(optarg); break; case 'e': appl_args->extra_work = 1; break; case 'r': len = strlen(optarg); if (len == 0) { usage(argv[0]); exit(EXIT_FAILURE); } len += 1; /* add room for '\0' */ if (odph_eth_addr_parse(&appl_args->dst_addr, optarg) != 0) { printf("invalid MAC address\n"); usage(argv[0]); exit(EXIT_FAILURE); } appl_args->dst_set = 1; break; case 's': appl_args->src_change = atoi(optarg); break; case 't': appl_args->time = atoi(optarg); break; case 'i': len = strlen(optarg); if (len == 0) { usage(argv[0]); exit(EXIT_FAILURE); } len += 1; /* add room for '\0' */ appl_args->if_str = malloc(len); if (appl_args->if_str == NULL) { usage(argv[0]); exit(EXIT_FAILURE); } /* count the number of tokens separated by ',' */ strcpy(appl_args->if_str, optarg); for (token = strtok(appl_args->if_str, ","), i = 0; token != NULL; token = strtok(NULL, ","), i++) ; if_count = i; if (if_count != 2) { usage(argv[0]); exit(EXIT_FAILURE); } /* allocate storage for the if names */ appl_args->if_names = calloc(if_count, sizeof(char *)); /* store the if names (reset names string) */ strcpy(appl_args->if_str, optarg); for (token = strtok(appl_args->if_str, ","), i = 0; token != NULL; token = strtok(NULL, ","), i++) { appl_args->if_names[i] = token; } break; case 'w': appl_args->num_workers = atoi(optarg); break; case 'h': usage(argv[0]); exit(EXIT_SUCCESS); break; default: break; } } if (if_count != 2) { usage(argv[0]); exit(EXIT_FAILURE); } optind = 1; /* reset 'extern optind' from the getopt lib */ } int main(int argc, char **argv) { odp_cpumask_t thr_mask_rx; odp_cpumask_t thr_mask_tx; odp_cpumask_t thr_mask_worker; odp_init_t init_param; odp_instance_t instance; odp_pool_t pool; odp_pool_capability_t pool_capa; odp_pool_param_t pool_param; odp_queue_capability_t queue_capa; odp_queue_param_t queue_param; odp_shm_t shm; odph_helper_options_t helper_options; odph_thread_t thr_tbl[ODP_THREAD_COUNT_MAX]; odph_thread_param_t thr_param[ODP_THREAD_COUNT_MAX]; odph_thread_common_param_t thr_common; odph_ethaddr_t new_addr; stats_t *stats[ODP_THREAD_COUNT_MAX]; thread_args_t *thr_args; uint32_t pkt_len, seg_len, pkt_num; int num_threads, num_workers; int i; int ret; /* Let helper collect its own arguments (e.g. --odph_proc) */ argc = odph_parse_options(argc, argv); if (odph_options(&helper_options)) { printf("Error: reading ODP helper options failed.\n"); exit(EXIT_FAILURE); } odp_init_param_init(&init_param); init_param.mem_model = helper_options.mem_model; if (odp_init_global(&instance, &init_param, NULL)) { printf("Error: ODP global init failed.\n"); exit(1); } if (odp_init_local(instance, ODP_THREAD_CONTROL)) { printf("Error: ODP local init failed.\n"); exit(1); } /* Reserve memory for global data */ shm = odp_shm_reserve("simple_pipeline", sizeof(global_data_t), ODP_CACHE_LINE_SIZE, 0); if (shm == ODP_SHM_INVALID) { printf("Error: shared mem reserve failed.\n"); exit(EXIT_FAILURE); } global = odp_shm_addr(shm); if (global == NULL) { printf("Error: shared mem alloc failed.\n"); exit(EXIT_FAILURE); } memset(global, 0, sizeof(global_data_t)); odp_atomic_init_u32(&global->exit_threads, 0); signal(SIGINT, sig_handler); /* Parse and store the application arguments */ parse_args(argc, argv, &global->appl); num_threads = setup_thread_masks(&thr_mask_rx, &thr_mask_tx, &thr_mask_worker, global->appl.num_workers); num_workers = num_threads - 2; /* Print both system and application information */ print_info(NO_PATH(argv[0]), &global->appl); /* Create queues for pipeline */ if (odp_queue_capability(&queue_capa)) { printf("Error: reading queue capability failed.\n"); exit(EXIT_FAILURE); } if (queue_capa.plain.max_num < (unsigned int)num_threads) { printf("Error: insufficient number of queues supported.\n"); exit(EXIT_FAILURE); } odp_queue_param_init(&queue_param); queue_param.type = ODP_QUEUE_TYPE_PLAIN; queue_param.enq_mode = ODP_QUEUE_OP_MT_UNSAFE; queue_param.deq_mode = ODP_QUEUE_OP_MT_UNSAFE; queue_param.size = QUEUE_SIZE; if (queue_capa.plain.max_size && queue_param.size > queue_capa.plain.max_size) queue_param.size = queue_capa.plain.max_size; for (i = 0; i < num_threads; i++) { odp_queue_t queue = odp_queue_create("plain_queue", &queue_param); if (queue == ODP_QUEUE_INVALID) { printf("Error: queue create failed.\n"); exit(EXIT_FAILURE); } global->queue[i] = queue; } /* Create packet pool */ if (odp_pool_capability(&pool_capa)) { printf("Error: reading pool capability failed.\n"); exit(EXIT_FAILURE); } pkt_len = POOL_PKT_LEN; seg_len = POOL_PKT_LEN; pkt_num = POOL_PKT_NUM; if (pool_capa.pkt.max_len && pkt_len > pool_capa.pkt.max_len) pkt_len = pool_capa.pkt.max_len; if (pool_capa.pkt.max_seg_len && seg_len > pool_capa.pkt.max_seg_len) seg_len = pool_capa.pkt.max_seg_len; if (pool_capa.pkt.max_num && pkt_num > pool_capa.pkt.max_num) pkt_num = pool_capa.pkt.max_num; odp_pool_param_init(&pool_param); pool_param.pkt.seg_len = seg_len; pool_param.pkt.len = pkt_len; pool_param.pkt.num = pkt_num; pool_param.type = ODP_POOL_PACKET; pool = odp_pool_create("packet pool", &pool_param); if (pool == ODP_POOL_INVALID) { printf("Error: packet pool create failed.\n"); exit(1); } global->if0 = create_pktio(global->appl.if_names[0], pool, &global->if0in, &global->if0out); global->if1 = create_pktio(global->appl.if_names[1], pool, &global->if1in, &global->if1out); /* Save TX interface Ethernet address */ if (odp_pktio_mac_addr(global->if1, global->src_addr.addr, ODPH_ETHADDR_LEN) != ODPH_ETHADDR_LEN) { printf("Error: TX interface Ethernet address unknown\n"); exit(EXIT_FAILURE); } /* Save destination Ethernet address */ if (global->appl.dst_change) { /* 02:00:00:00:00:XX */ memset(&new_addr, 0, sizeof(odph_ethaddr_t)); if (global->appl.dst_set) { memcpy(&new_addr, &global->appl.dst_addr, sizeof(odph_ethaddr_t)); } else { new_addr.addr[0] = 0x02; new_addr.addr[5] = 1; } global->dst_addr = new_addr; } if (odp_pktio_start(global->if0)) { printf("Error: unable to start input interface\n"); exit(1); } if (odp_pktio_start(global->if1)) { printf("Error: unable to start output interface\n"); exit(1); } odp_barrier_init(&global->init_barrier, num_threads + 1); odp_barrier_init(&global->term_barrier, num_threads + 1); for (i = 0; i < num_threads; i++) stats[i] = &global->thread[i].stats; memset(thr_tbl, 0, sizeof(thr_tbl)); odph_thread_common_param_init(&thr_common); thr_common.instance = instance; /* RX thread */ thr_args = &global->thread[0]; thr_args->tx_queue = global->queue[0]; odph_thread_param_init(&thr_param[0]); thr_param[0].start = rx_thread; thr_param[0].arg = thr_args; thr_param[0].thr_type = ODP_THREAD_WORKER; thr_common.cpumask = &thr_mask_rx; odph_thread_create(thr_tbl, &thr_common, thr_param, 1); /* Worker threads */ for (i = 0; i < num_workers; i++) { thr_args = &global->thread[i + 1]; thr_args->rx_queue = global->queue[i]; thr_args->tx_queue = global->queue[i + 1]; odph_thread_param_init(&thr_param[i]); thr_param[i].start = worker_thread; thr_param[i].arg = thr_args; thr_param[i].thr_type = ODP_THREAD_WORKER; } if (num_workers) { thr_common.cpumask = &thr_mask_worker; odph_thread_create(&thr_tbl[1], &thr_common, thr_param, num_workers); } /* TX thread */ thr_args = &global->thread[num_threads - 1]; thr_args->rx_queue = global->queue[num_workers]; odph_thread_param_init(&thr_param[0]); thr_param[0].start = tx_thread; thr_param[0].arg = thr_args; thr_param[0].thr_type = ODP_THREAD_WORKER; thr_common.cpumask = &thr_mask_tx; odph_thread_create(&thr_tbl[num_threads - 1], &thr_common, thr_param, 1); ret = print_speed_stats(num_threads, stats, global->appl.time, global->appl.accuracy); if (odp_pktio_stop(global->if0)) { printf("Error: failed to stop interface %s\n", argv[1]); exit(EXIT_FAILURE); } if (odp_pktio_stop(global->if1)) { printf("Error: failed to stop interface %s\n", argv[2]); exit(EXIT_FAILURE); } odp_atomic_store_u32(&global->exit_threads, 1); odp_barrier_wait(&global->term_barrier); odph_thread_join(thr_tbl, num_threads); if (odp_pktio_close(global->if0)) { printf("Error: failed to close interface %s\n", argv[1]); exit(EXIT_FAILURE); } if (odp_pktio_close(global->if1)) { printf("Error: failed to close interface %s\n", argv[2]); exit(EXIT_FAILURE); } for (i = 0; i < num_threads; i++) { if (odp_queue_destroy(global->queue[i])) { printf("Error: failed to destroy queue %d\n", i); exit(EXIT_FAILURE); } } if (odp_pool_destroy(pool)) { printf("Error: pool destroy\n"); exit(EXIT_FAILURE); } if (odp_shm_free(shm)) { printf("Error: shm free global data\n"); exit(EXIT_FAILURE); } if (odp_term_local()) { printf("Error: term local\n"); exit(EXIT_FAILURE); } if (odp_term_global(instance)) { printf("Error: term global\n"); exit(EXIT_FAILURE); } return ret; }