diff options
author | Petri Savolainen <petri.savolainen@linaro.org> | 2018-01-10 15:44:00 +0200 |
---|---|---|
committer | Maxim Uvarov <maxim.uvarov@linaro.org> | 2018-02-26 12:57:31 +0300 |
commit | 8c29ca02a6b8d1b5b0649be0c8cb085c4cc1d11a (patch) | |
tree | ed1116ebe5f9683abffe40f6ed08da84f3f69da2 /test/validation/api/queue | |
parent | 39fe44eff4bfdc0a4364635ff8998f04298aa629 (diff) |
validation: queue: multi-thread plain queue test
Test plain queue enqueue and dequeue with multiple concurrent
threads. Test blocking and non-blocking lock-free
implementations.
Signed-off-by: Petri Savolainen <petri.savolainen@linaro.org>
Reviewed-by: Bill Fischofer <bill.fischofer@linaro.org>
Signed-off-by: Maxim Uvarov <maxim.uvarov@linaro.org>
Diffstat (limited to 'test/validation/api/queue')
-rw-r--r-- | test/validation/api/queue/queue.c | 258 |
1 files changed, 257 insertions, 1 deletions
diff --git a/test/validation/api/queue/queue.c b/test/validation/api/queue/queue.c index 9c4803bcc..876a90bbf 100644 --- a/test/validation/api/queue/queue.c +++ b/test/validation/api/queue/queue.c @@ -13,6 +13,22 @@ #define MAX_NUM_EVENT (1 * 1024) #define MAX_ITERATION (100) #define MAX_QUEUES (64 * 1024) +#define GLOBALS_NAME "queue_test_globals" +#define DEQ_RETRIES 100 +#define ENQ_RETRIES 100 + +typedef struct { + pthrd_arg cu_thr; + int num_workers; + odp_barrier_t barrier; + odp_queue_t queue; + odp_atomic_u32_t num_event; + + struct { + uint32_t num_event; + } thread[ODP_THREAD_COUNT_MAX]; + +} test_globals_t; static int queue_context = 0xff; static odp_pool_t pool; @@ -30,7 +46,30 @@ static void generate_name(char *name, uint32_t index) static int queue_suite_init(void) { + odp_shm_t shm; + test_globals_t *globals; odp_pool_param_t params; + int num_workers; + odp_cpumask_t mask; + + shm = odp_shm_reserve(GLOBALS_NAME, sizeof(test_globals_t), + ODP_CACHE_LINE_SIZE, 0); + + if (shm == ODP_SHM_INVALID) { + printf("Shared memory reserve failed\n"); + return -1; + } + + globals = odp_shm_addr(shm); + memset(globals, 0, sizeof(test_globals_t)); + + num_workers = odp_cpumask_default_worker(&mask, 0); + + if (num_workers > MAX_WORKERS) + num_workers = MAX_WORKERS; + + globals->num_workers = num_workers; + odp_barrier_init(&globals->barrier, num_workers); odp_pool_param_init(¶ms); @@ -50,7 +89,25 @@ static int queue_suite_init(void) static int queue_suite_term(void) { - return odp_pool_destroy(pool); + odp_shm_t shm; + + shm = odp_shm_lookup(GLOBALS_NAME); + if (shm == ODP_SHM_INVALID) { + printf("SHM lookup failed.\n"); + return -1; + } + + if (odp_shm_free(shm)) { + printf("SHM free failed.\n"); + return -1; + } + + if (odp_pool_destroy(pool)) { + printf("Pool destroy failed.\n"); + return -1; + } + + return 0; } static void queue_test_capa(void) @@ -410,12 +467,211 @@ static void queue_test_info(void) CU_ASSERT(odp_queue_destroy(q_order) == 0); } +static uint32_t alloc_and_enqueue(odp_queue_t queue, odp_pool_t pool, + uint32_t num) +{ + uint32_t i, ret; + odp_buffer_t buf; + odp_event_t ev; + + for (i = 0; i < num; i++) { + buf = odp_buffer_alloc(pool); + + CU_ASSERT(buf != ODP_BUFFER_INVALID); + + ev = odp_buffer_to_event(buf); + + ret = odp_queue_enq(queue, ev); + + CU_ASSERT(ret == 0); + + if (ret) + break; + } + + return i; +} + +static uint32_t dequeue_and_free_all(odp_queue_t queue) +{ + odp_event_t ev; + uint32_t num, retries; + + num = 0; + retries = 0; + + while (1) { + ev = odp_queue_deq(queue); + + if (ev == ODP_EVENT_INVALID) { + if (retries >= DEQ_RETRIES) + return num; + + retries++; + continue; + } + + retries = 0; + num++; + + odp_event_free(ev); + } + + return num; +} + +static int enqueue_with_retry(odp_queue_t queue, odp_event_t ev) +{ + int i; + + for (i = 0; i < ENQ_RETRIES; i++) + if (odp_queue_enq(queue, ev) == 0) + return 0; + + return -1; +} + +static int queue_test_worker(void *arg) +{ + uint32_t num, retries, num_workers; + int thr_id, ret; + odp_event_t ev; + odp_queue_t queue; + test_globals_t *globals = arg; + + thr_id = odp_thread_id(); + queue = globals->queue; + num_workers = globals->num_workers; + + if (num_workers > 1) + odp_barrier_wait(&globals->barrier); + + retries = 0; + num = odp_atomic_fetch_inc_u32(&globals->num_event); + + /* On average, each worker deq-enq each event once */ + while (num < (num_workers * MAX_NUM_EVENT)) { + ev = odp_queue_deq(queue); + + if (ev == ODP_EVENT_INVALID) { + if (retries < DEQ_RETRIES) { + retries++; + continue; + } + + /* Prevent thread to starve */ + num = odp_atomic_fetch_inc_u32(&globals->num_event); + retries = 0; + continue; + } + + globals->thread[thr_id].num_event++; + + ret = enqueue_with_retry(queue, ev); + + CU_ASSERT(ret == 0); + + num = odp_atomic_fetch_inc_u32(&globals->num_event); + } + + return 0; +} + +static void reset_thread_stat(test_globals_t *globals) +{ + int i; + + odp_atomic_init_u32(&globals->num_event, 0); + + for (i = 0; i < ODP_THREAD_COUNT_MAX; i++) + globals->thread[i].num_event = 0; +} + +static void multithread_test(odp_nonblocking_t nonblocking) +{ + odp_shm_t shm; + test_globals_t *globals; + odp_queue_t queue; + odp_queue_param_t qparams; + odp_queue_capability_t capa; + uint32_t queue_size, max_size; + uint32_t num, sum, num_free, i; + + CU_ASSERT(odp_queue_capability(&capa) == 0); + + queue_size = 2 * MAX_NUM_EVENT; + + max_size = capa.plain.max_size; + + if (nonblocking == ODP_NONBLOCKING_LF) { + if (capa.plain.lockfree.max_num == 0) + return; + + max_size = capa.plain.lockfree.max_size; + } + + if (max_size && queue_size > max_size) + queue_size = max_size; + + num = MAX_NUM_EVENT; + + if (num > queue_size) + num = queue_size / 2; + + shm = odp_shm_lookup(GLOBALS_NAME); + CU_ASSERT_FATAL(shm != ODP_SHM_INVALID); + + globals = odp_shm_addr(shm); + globals->cu_thr.numthrds = globals->num_workers; + + odp_queue_param_init(&qparams); + qparams.type = ODP_QUEUE_TYPE_PLAIN; + qparams.size = queue_size; + qparams.nonblocking = nonblocking; + + queue = odp_queue_create("queue_test_mt", &qparams); + CU_ASSERT_FATAL(queue != ODP_QUEUE_INVALID); + + globals->queue = queue; + reset_thread_stat(globals); + + CU_ASSERT(alloc_and_enqueue(queue, pool, num) == num); + + odp_cunit_thread_create(queue_test_worker, (pthrd_arg *)globals); + + /* Wait for worker threads to terminate */ + odp_cunit_thread_exit((pthrd_arg *)globals); + + sum = 0; + for (i = 0; i < ODP_THREAD_COUNT_MAX; i++) + sum += globals->thread[i].num_event; + + CU_ASSERT(sum != 0); + + num_free = dequeue_and_free_all(queue); + + CU_ASSERT(num_free == num); + CU_ASSERT(odp_queue_destroy(queue) == 0); +} + +static void queue_test_mt_plain_block(void) +{ + multithread_test(ODP_BLOCKING); +} + +static void queue_test_mt_plain_nonblock_lf(void) +{ + multithread_test(ODP_NONBLOCKING_LF); +} + odp_testinfo_t queue_suite[] = { ODP_TEST_INFO(queue_test_capa), ODP_TEST_INFO(queue_test_mode), ODP_TEST_INFO(queue_test_lockfree), ODP_TEST_INFO(queue_test_param), ODP_TEST_INFO(queue_test_info), + ODP_TEST_INFO(queue_test_mt_plain_block), + ODP_TEST_INFO(queue_test_mt_plain_nonblock_lf), ODP_TEST_INFO_NULL, }; |