diff options
author | Petri Savolainen <petri.savolainen@nokia.com> | 2019-05-03 15:18:44 +0300 |
---|---|---|
committer | Petri Savolainen <petri.savolainen@nokia.com> | 2019-05-09 09:43:17 +0300 |
commit | a51eaeca2eac68d65ab89a1417b98d0225ebf86a (patch) | |
tree | 76b4cba15b8b7ea166895ea574028df56ba65f1e /helper/threads.c | |
parent | 0e1e2f891bd6d68e71a8ca6475dea6a2217a95a4 (diff) |
helper: thread: implement new thread create and join
Implemented new thread create and join functions.
Signed-off-by: Petri Savolainen <petri.savolainen@nokia.com>
Reviewed-by: Matias Elo <matias.elo@nokia.com>
Reviewed-by: Stanislaw Kardach <skardach@marvell.com>
Diffstat (limited to 'helper/threads.c')
-rw-r--r-- | helper/threads.c | 263 |
1 files changed, 228 insertions, 35 deletions
diff --git a/helper/threads.c b/helper/threads.c index 01bc33eac..e30c9f29d 100644 --- a/helper/threads.c +++ b/helper/threads.c @@ -1,4 +1,5 @@ /* Copyright (c) 2013-2018, Linaro Limited + * Copyright (c) 2019, Nokia * All rights reserved. * * SPDX-License-Identifier: BSD-3-Clause @@ -23,24 +24,32 @@ #define FAILED_CPU -1 +/* Thread status codes */ +#define NOT_STARTED 0 +#define SYNC_INIT 1 +#define INIT_DONE 2 +#define STARTED 3 + static odph_helper_options_t helper_options; /* - * wrapper for odpthreads, either implemented as linux threads or processes. - * (in process mode, if start_routine returns NULL, the process return FAILURE). + * Run a thread, either as Linux pthread or process. + * In process mode, if start_routine returns NULL, the process return FAILURE. */ -static void *_odph_thread_run_start_routine(void *arg) +static void *run_thread(void *arg) { int status; int ret; + odp_instance_t instance; odph_odpthread_params_t *thr_params; - odph_odpthread_start_args_t *start_args = arg; + odph_thread_start_args_t *start_args = arg; thr_params = &start_args->thr_params; + instance = start_args->instance; /* ODP thread local init */ - if (odp_init_local(thr_params->instance, thr_params->thr_type)) { + if (odp_init_local(instance, thr_params->thr_type)) { ODPH_ERR("Local init failed\n"); if (start_args->mem_model == ODP_MEM_MODEL_PROCESS) _exit(EXIT_FAILURE); @@ -54,6 +63,9 @@ static void *_odph_thread_run_start_routine(void *arg) "pthread" : "process", (int)getpid()); + if (odp_atomic_load_u32(&start_args->status) == SYNC_INIT) + odp_atomic_store_rel_u32(&start_args->status, INIT_DONE); + status = thr_params->start(thr_params->arg); ret = odp_term_local(); @@ -69,11 +81,9 @@ static void *_odph_thread_run_start_routine(void *arg) } /* - * Create a single ODPthread as a linux process + * Create a single linux process */ -static int _odph_linux_process_create(odph_odpthread_t *thread_tbl, - int cpu, - const odph_odpthread_params_t *thr_params) +static int create_process(odph_thread_t *thread, int cpu) { cpu_set_t cpu_set; pid_t pid; @@ -81,20 +91,19 @@ static int _odph_linux_process_create(odph_odpthread_t *thread_tbl, CPU_ZERO(&cpu_set); CPU_SET(cpu, &cpu_set); - thread_tbl->start_args.thr_params = *thr_params; /* copy */ - thread_tbl->start_args.mem_model = ODP_MEM_MODEL_PROCESS; - thread_tbl->cpu = cpu; + thread->start_args.mem_model = ODP_MEM_MODEL_PROCESS; + thread->cpu = cpu; pid = fork(); if (pid < 0) { ODPH_ERR("fork() failed\n"); - thread_tbl->cpu = FAILED_CPU; + thread->cpu = FAILED_CPU; return -1; } /* Parent continues to fork */ if (pid > 0) { - thread_tbl->proc.pid = pid; + thread->proc.pid = pid; return 0; } @@ -111,17 +120,49 @@ static int _odph_linux_process_create(odph_odpthread_t *thread_tbl, return -2; } - _odph_thread_run_start_routine(&thread_tbl->start_args); + run_thread(&thread->start_args); return 0; /* never reached */ } /* - * Create a single ODPthread as a linux thread + * Wait single process to exit */ -static int odph_linux_thread_create(odph_odpthread_t *thread_tbl, - int cpu, - const odph_odpthread_params_t *thr_params) +static int wait_process(odph_thread_t *thread) +{ + pid_t pid; + int status = 0; + + pid = waitpid(thread->proc.pid, &status, 0); + + if (pid < 0) { + ODPH_ERR("waitpid() failed\n"); + return -1; + } + + /* Examine the child process' termination status */ + if (WIFEXITED(status) && + WEXITSTATUS(status) != EXIT_SUCCESS) { + ODPH_ERR("Child exit status:%d (pid:%d)\n", + WEXITSTATUS(status), (int)pid); + return -1; + } + + if (WIFSIGNALED(status)) { + int signo = WTERMSIG(status); + + ODPH_ERR("Child term signo:%d - %s (pid:%d)\n", + signo, strsignal(signo), (int)pid); + return -1; + } + + return 0; +} + +/* + * Create a single linux pthread + */ +static int create_pthread(odph_thread_t *thread, int cpu) { int ret; cpu_set_t cpu_set; @@ -129,23 +170,22 @@ static int odph_linux_thread_create(odph_odpthread_t *thread_tbl, CPU_ZERO(&cpu_set); CPU_SET(cpu, &cpu_set); - pthread_attr_init(&thread_tbl->thread.attr); + pthread_attr_init(&thread->thread.attr); - thread_tbl->cpu = cpu; + thread->cpu = cpu; - pthread_attr_setaffinity_np(&thread_tbl->thread.attr, + pthread_attr_setaffinity_np(&thread->thread.attr, sizeof(cpu_set_t), &cpu_set); - thread_tbl->start_args.thr_params = *thr_params; /* copy */ - thread_tbl->start_args.mem_model = ODP_MEM_MODEL_THREAD; + thread->start_args.mem_model = ODP_MEM_MODEL_THREAD; - ret = pthread_create(&thread_tbl->thread.thread_id, - &thread_tbl->thread.attr, - _odph_thread_run_start_routine, - &thread_tbl->start_args); + ret = pthread_create(&thread->thread.thread_id, + &thread->thread.attr, + run_thread, + &thread->start_args); if (ret != 0) { ODPH_ERR("Failed to start thread on cpu #%d\n", cpu); - thread_tbl->cpu = FAILED_CPU; + thread->cpu = FAILED_CPU; return ret; } @@ -153,6 +193,155 @@ static int odph_linux_thread_create(odph_odpthread_t *thread_tbl, } /* + * Wait single pthread to exit + */ +static int wait_pthread(odph_thread_t *thread) +{ + int ret; + void *thread_ret = NULL; + + /* Wait thread to exit */ + ret = pthread_join(thread->thread.thread_id, &thread_ret); + + if (ret) { + ODPH_ERR("pthread_join failed (%i) from cpu #%i\n", + ret, thread->cpu); + return -1; + } + + if (thread_ret) { + ODPH_ERR("Bad exit status cpu #%i %p\n", + thread->cpu, thread_ret); + return -1; + } + + ret = pthread_attr_destroy(&thread->thread.attr); + + if (ret) { + ODPH_ERR("pthread_attr_destroy failed (%i) from cpu #%i\n", + ret, thread->cpu); + return -1; + } + + return 0; +} + +int odph_thread_create(odph_thread_t thread[], + const odph_thread_common_param_t *param, + const odph_thread_param_t thr_param[], + int num) +{ + int i, num_cpu, cpu; + const odp_cpumask_t *cpumask = param->cpumask; + int use_pthread = 1; + + if (param->thread_model == 1) + use_pthread = 0; + + if (helper_options.mem_model == ODP_MEM_MODEL_PROCESS) + use_pthread = 0; + + if (num < 1) { + ODPH_ERR("Bad number of threads (%i)\n", num); + return -1; + } + + num_cpu = odp_cpumask_count(cpumask); + + if (num_cpu != num) { + ODPH_ERR("Number of threads (%i) and CPUs (%i) does not match" + "\n", num, num_cpu); + return -1; + } + + memset(thread, 0, num * sizeof(odph_thread_t)); + + cpu = odp_cpumask_first(cpumask); + for (i = 0; i < num; i++) { + odph_thread_start_args_t *start_args = &thread[i].start_args; + + /* Copy thread parameters */ + if (param->share_param) + start_args->thr_params = thr_param[0]; + else + start_args->thr_params = thr_param[i]; + + start_args->instance = param->instance; + + if (param->sync) + odp_atomic_init_u32(&start_args->status, SYNC_INIT); + else + odp_atomic_init_u32(&start_args->status, NOT_STARTED); + + if (use_pthread) { + if (create_pthread(&thread[i], cpu)) + break; + } else { + if (create_process(&thread[i], cpu)) + break; + } + + /* Wait newly created thread to update status */ + if (param->sync) { + odp_time_t t1, t2; + uint64_t diff_ns; + uint32_t status; + int timeout = 0; + odp_atomic_u32_t *atomic = &start_args->status; + + t1 = odp_time_local(); + + do { + odp_cpu_pause(); + t2 = odp_time_local(); + diff_ns = odp_time_diff_ns(t2, t1); + timeout = diff_ns > ODP_TIME_SEC_IN_NS; + status = odp_atomic_load_acq_u32(atomic); + + } while (status != INIT_DONE && timeout == 0); + + if (timeout) { + ODPH_ERR("Thread (i:%i) start up timeout\n", i); + break; + } + } + + odp_atomic_store_u32(&start_args->status, STARTED); + + cpu = odp_cpumask_next(cpumask, cpu); + } + + return i; +} + +int odph_thread_join(odph_thread_t thread[], int num) +{ + odph_thread_start_args_t *start_args; + int i; + + for (i = 0; i < num; i++) { + start_args = &thread[i].start_args; + + if (odp_atomic_load_u32(&start_args->status) != STARTED) { + ODPH_DBG("Thread (i:%i) not started.\n", i); + break; + } + + if (thread[i].start_args.mem_model == ODP_MEM_MODEL_THREAD) { + if (wait_pthread(&thread[i])) + break; + } else { + if (wait_process(&thread[i])) + break; + } + + odp_atomic_store_u32(&start_args->status, NOT_STARTED); + } + + return i; +} + +/* * create an odpthread set (as linux processes or linux threads or both) */ int odph_odpthreads_create(odph_odpthread_t *thread_tbl, @@ -179,15 +368,19 @@ int odph_odpthreads_create(odph_odpthread_t *thread_tbl, cpu = odp_cpumask_first(mask); for (i = 0; i < num; i++) { + odph_thread_start_args_t *start_args; + + start_args = &thread_tbl[i].start_args; + + /* Copy thread parameters */ + start_args->thr_params = *thr_params; + start_args->instance = thr_params->instance; + if (helper_options.mem_model == ODP_MEM_MODEL_THREAD) { - if (odph_linux_thread_create(&thread_tbl[i], - cpu, - thr_params)) + if (create_pthread(&thread_tbl[i], cpu)) break; } else { - if (_odph_linux_process_create(&thread_tbl[i], - cpu, - thr_params)) + if (create_process(&thread_tbl[i], cpu)) break; } |