diff options
Diffstat (limited to 'example/time/odp_time_global_test.c')
-rw-r--r-- | example/time/odp_time_global_test.c | 397 |
1 files changed, 397 insertions, 0 deletions
diff --git a/example/time/odp_time_global_test.c b/example/time/odp_time_global_test.c new file mode 100644 index 000000000..7c1409abf --- /dev/null +++ b/example/time/odp_time_global_test.c @@ -0,0 +1,397 @@ +/* SPDX-License-Identifier: BSD-3-Clause + * Copyright (c) 2015-2018 Linaro Limited + */ + +/** + * @example odp_time_global_test.c + * + * Time API test application + * + * @cond _ODP_HIDE_FROM_DOXYGEN_ + */ + +#include <inttypes.h> + +#include <odp_api.h> +#include <odp/helper/odph_api.h> + +#define MAX_WORKERS 32 +#define MAX_NUM_BUF 8192 +#define ITERATION_NUM 2048 +#define LOG_BASE 8 +#define LOG_ENTRY_SIZE 19 +#define LOG_LINE_SIZE (LOG_BASE * LOG_ENTRY_SIZE + 1) + +#define QUEUE_NAME_PREFIX "thread_queue_" + +typedef struct { + odp_time_t timestamp; + int id; +} timestamp_event_t; + +typedef struct { + uint8_t thr; + uint8_t id; + odp_time_t time; +} log_entry_t; + +typedef struct { + uint32_t iteration_num; + odp_atomic_u32_t iteration_counter; + odp_atomic_u32_t id_counter; + odp_atomic_u32_t log_counter; + odp_atomic_u32_t err_counter; + odp_barrier_t start_barrier; + odp_barrier_t end_barrier; + int thread_num; + log_entry_t *log; + int log_enries_num; +} test_globals_t; + +static void print_log(test_globals_t *gbls) +{ + uint32_t err_num; + int i, j, k, pad; + char line[LOG_LINE_SIZE]; + + memset(line, '-', LOG_LINE_SIZE - 1); + line[LOG_LINE_SIZE - 1] = 0; + for (i = 1; i <= gbls->thread_num; i++) { + printf("\n==== history of %d buffer, time,ns (thread) ====\n%s\n", + i, line); + + /* print log for buffer */ + k = 0; + for (j = 0; j < gbls->log_enries_num; j++) + if (gbls->log[j].id == i) { + printf("%10" PRIu64 " (%-3d)", + odp_time_to_ns(gbls->log[j].time), + gbls->log[j].thr); + + if (!(++k % LOG_BASE)) + printf(" |\n"); + else + printf(" =>"); + } + + if ((k % LOG_BASE)) { + pad = (LOG_BASE - k % LOG_BASE) * LOG_ENTRY_SIZE - 4; + printf(" end%*c\n%s\n", pad, '|', line); + } else { + printf("%s\n", line); + } + } + + printf("\n\n"); + + err_num = odp_atomic_load_u32(&gbls->err_counter); + if (err_num) + printf("Number of errors: %u\n", err_num); +} + +static void generate_next_queue(test_globals_t *gbls, odp_queue_t *queue, + unsigned int id) +{ + int thr; + uint8_t rand_u8; + char queue_name[sizeof(QUEUE_NAME_PREFIX) + 2]; + unsigned int rand_id = 1; + + thr = odp_thread_id(); + + /* generate next random id */ + if (gbls->thread_num > 1) { + do { + odp_random_data(&rand_u8, 1, ODP_RANDOM_BASIC); + rand_id = rand_u8 % gbls->thread_num + 1; + } while (rand_id == id); + } + + sprintf(queue_name, QUEUE_NAME_PREFIX "%d", rand_id); + *queue = odp_queue_lookup(queue_name); + + if (ODP_QUEUE_INVALID == *queue) + ODPH_ABORT("Cannot lookup thread queue \"%s\", thread %d\n", + queue_name, thr); +} + +static void test_global_timestamps(test_globals_t *gbls, + odp_queue_t queue, unsigned int id) +{ + int thr; + int log_entry; + odp_event_t ev; + odp_time_t time; + odp_buffer_t buf; + odp_queue_t queue_next; + timestamp_event_t *timestamp_ev; + + thr = odp_thread_id(); + while (odp_atomic_load_u32(&gbls->iteration_counter) < + gbls->iteration_num) { + ev = odp_queue_deq(queue); + + if (ev == ODP_EVENT_INVALID) + continue; + + buf = odp_buffer_from_event(ev); + timestamp_ev = (timestamp_event_t *)odp_buffer_addr(buf); + + time = odp_time_global(); + if (odp_time_cmp(time, timestamp_ev->timestamp) < 0) { + ODPH_ERR("timestamp is less than previous time_prev=%" + PRIu64 "ns, time_next=%" + PRIu64 "ns, thread %d\n", + odp_time_to_ns(timestamp_ev->timestamp), + odp_time_to_ns(time), thr); + odp_atomic_inc_u32(&gbls->err_counter); + } + + /* update the log */ + log_entry = odp_atomic_fetch_inc_u32(&gbls->log_counter); + gbls->log[log_entry].time = timestamp_ev->timestamp; + gbls->log[log_entry].id = timestamp_ev->id; + gbls->log[log_entry].thr = thr; + + /* assign new current time and send */ + generate_next_queue(gbls, &queue_next, id); + timestamp_ev->timestamp = time; + if (odp_queue_enq(queue_next, ev)) + ODPH_ABORT("Cannot enqueue event %" PRIu64 " on queue " + "%" PRIu64 ", thread %d\n", + odp_event_to_u64(ev), + odp_queue_to_u64(queue_next), thr); + + odp_atomic_inc_u32(&gbls->iteration_counter); + } +} + +/** + * @internal Worker thread + * + * @param ptr Pointer to test arguments + * + * @return Pointer to exit status + */ +static int run_thread(void *ptr) +{ + int thr; + uint32_t id; + odp_event_t ev; + odp_buffer_t buf; + test_globals_t *gbls; + odp_pool_t buffer_pool; + odp_queue_t queue, queue_next; + timestamp_event_t *timestamp_ev; + char queue_name[sizeof(QUEUE_NAME_PREFIX) + 2]; + + gbls = ptr; + thr = odp_thread_id(); + printf("Thread %i starts on cpu %i\n", thr, odp_cpu_id()); + + /* + * Allocate own queue for receiving timestamps. + * Own queue is needed to guarantee that next thread for receiving + * buffer is not the same thread. + */ + id = odp_atomic_fetch_inc_u32(&gbls->id_counter); + sprintf(queue_name, QUEUE_NAME_PREFIX "%d", id); + queue = odp_queue_create(queue_name, NULL); + if (queue == ODP_QUEUE_INVALID) + ODPH_ABORT("Cannot create thread queue, thread %d", thr); + + /* allocate buffer for timestamp */ + buffer_pool = odp_pool_lookup("time buffers pool"); + if (buffer_pool == ODP_POOL_INVALID) + ODPH_ABORT("Buffer pool was not found, thread %d\n", thr); + + buf = odp_buffer_alloc(buffer_pool); + if (buf == ODP_BUFFER_INVALID) + ODPH_ABORT("Buffer was not allocated, thread %d\n", thr); + + /* wait all threads allocated their queues */ + odp_barrier_wait(&gbls->start_barrier); + + /* enqueue global timestamp to some queue of some other thread */ + generate_next_queue(gbls, &queue_next, id); + + /* save global timestamp and id for tracing */ + ev = odp_buffer_to_event(buf); + timestamp_ev = (timestamp_event_t *)odp_buffer_addr(buf); + timestamp_ev->id = id; + timestamp_ev->timestamp = odp_time_global(); + if (odp_queue_enq(queue_next, ev)) + ODPH_ABORT("Cannot enqueue timestamp event %" PRIu64 " on " + "queue %" PRIu64 ", thread %d", odp_event_to_u64(ev), + odp_queue_to_u64(queue_next), thr); + + test_global_timestamps(gbls, queue, id); + + /* wait all threads are finished their jobs */ + odp_barrier_wait(&gbls->end_barrier); + + /* free all events on the allocated queue */ + while (1) { + ev = odp_queue_deq(queue); + if (ev == ODP_EVENT_INVALID) + break; + + buf = odp_buffer_from_event(ev); + odp_buffer_free(buf); + } + + /* free allocated queue */ + if (odp_queue_destroy(queue)) + ODPH_ABORT("Cannot destroy queue %" PRIu64 "", + odp_queue_to_u64(queue)); + + printf("Thread %i exits\n", thr); + fflush(NULL); + return 0; +} + +int main(int argc, char *argv[]) +{ + int err = 0; + odp_pool_t pool = ODP_POOL_INVALID; + int num_workers; + test_globals_t *gbls; + odp_cpumask_t cpumask; + odp_pool_capability_t pool_capa; + odp_pool_param_t pool_param; + odp_shm_t shm_glbls = ODP_SHM_INVALID; + odp_shm_t shm_log = ODP_SHM_INVALID; + int log_size, log_enries_num; + odph_helper_options_t helper_options; + odph_thread_t thread_tbl[MAX_WORKERS]; + odp_instance_t instance; + odp_init_t init_param; + odph_thread_common_param_t thr_common; + odph_thread_param_t thr_param; + + printf("\nODP global time test starts\n"); + + /* Let helper collect its own arguments (e.g. --odph_proc) */ + argc = odph_parse_options(argc, argv); + if (odph_options(&helper_options)) { + ODPH_ERR("Error: reading ODP helper options failed.\n"); + exit(EXIT_FAILURE); + } + + odp_init_param_init(&init_param); + init_param.mem_model = helper_options.mem_model; + + if (odp_init_global(&instance, &init_param, NULL)) { + err = 1; + ODPH_ERR("ODP global init failed.\n"); + goto end; + } + + /* Init this thread. */ + if (odp_init_local(instance, ODP_THREAD_CONTROL)) { + err = 1; + ODPH_ERR("ODP local init failed.\n"); + goto err_global; + } + + num_workers = MAX_WORKERS; + num_workers = odp_cpumask_default_worker(&cpumask, num_workers); + + shm_glbls = odp_shm_reserve("test_globals", sizeof(test_globals_t), + ODP_CACHE_LINE_SIZE, 0); + if (ODP_SHM_INVALID == shm_glbls) { + err = 1; + ODPH_ERR("Error: shared mem reserve failed.\n"); + goto err; + } + + log_enries_num = num_workers * (ITERATION_NUM + num_workers); + log_size = sizeof(log_entry_t) * log_enries_num; + shm_log = odp_shm_reserve("test_log", log_size, ODP_CACHE_LINE_SIZE, 0); + if (ODP_SHM_INVALID == shm_log) { + err = 1; + ODPH_ERR("Error: shared mem reserve failed.\n"); + goto err; + } + + gbls = odp_shm_addr(shm_glbls); + gbls->thread_num = num_workers; + gbls->iteration_num = ITERATION_NUM; + odp_atomic_store_u32(&gbls->iteration_counter, 0); + odp_atomic_store_u32(&gbls->id_counter, 1); + odp_atomic_store_u32(&gbls->log_counter, 0); + odp_atomic_store_u32(&gbls->err_counter, 0); + gbls->log_enries_num = log_enries_num; + gbls->log = odp_shm_addr(shm_log); + odp_barrier_init(&gbls->start_barrier, num_workers); + odp_barrier_init(&gbls->end_barrier, num_workers); + memset(gbls->log, 0, log_size); + + if (odp_pool_capability(&pool_capa)) { + err = 1; + ODPH_ERR("Error: pool capability failed.\n"); + goto err; + } + + odp_pool_param_init(&pool_param); + + pool_param.buf.size = sizeof(timestamp_event_t); + pool_param.buf.num = MAX_NUM_BUF; + pool_param.type = ODP_POOL_BUFFER; + + if (pool_capa.buf.max_num && MAX_NUM_BUF > pool_capa.buf.max_num) + pool_param.buf.num = pool_capa.buf.max_num; + + pool = odp_pool_create("time buffers pool", &pool_param); + if (pool == ODP_POOL_INVALID) { + err = 1; + ODPH_ERR("Pool create failed.\n"); + goto err; + } + + odph_thread_common_param_init(&thr_common); + odph_thread_param_init(&thr_param); + + thr_param.start = run_thread; + thr_param.arg = gbls; + thr_param.thr_type = ODP_THREAD_WORKER; + + thr_common.instance = instance; + thr_common.cpumask = &cpumask; + thr_common.share_param = 1; + + /* Create and launch worker threads */ + odph_thread_create(thread_tbl, &thr_common, &thr_param, num_workers); + + /* Wait for worker threads to exit */ + odph_thread_join(thread_tbl, num_workers); + + print_log(gbls); + +err: + if (pool != ODP_POOL_INVALID) + if (odp_pool_destroy(pool)) + err = 1; + + if (shm_log != ODP_SHM_INVALID) + if (odp_shm_free(shm_log)) + err = 1; + + if (shm_glbls != ODP_SHM_INVALID) + if (odp_shm_free(shm_glbls)) + err = 1; + + if (odp_term_local()) + err = 1; +err_global: + if (odp_term_global(instance)) + err = 1; +end: + if (err) { + ODPH_ERR("Err: ODP global time test failed\n\n"); + return -1; + } + + printf("ODP global time test complete\n\n"); + return 0; +} |