aboutsummaryrefslogtreecommitdiff
path: root/helper/threads.c
diff options
context:
space:
mode:
authorPetri Savolainen <petri.savolainen@nokia.com>2019-05-03 15:18:44 +0300
committerPetri Savolainen <petri.savolainen@nokia.com>2019-05-09 09:43:17 +0300
commita51eaeca2eac68d65ab89a1417b98d0225ebf86a (patch)
tree76b4cba15b8b7ea166895ea574028df56ba65f1e /helper/threads.c
parent0e1e2f891bd6d68e71a8ca6475dea6a2217a95a4 (diff)
helper: thread: implement new thread create and join
Implemented new thread create and join functions. Signed-off-by: Petri Savolainen <petri.savolainen@nokia.com> Reviewed-by: Matias Elo <matias.elo@nokia.com> Reviewed-by: Stanislaw Kardach <skardach@marvell.com>
Diffstat (limited to 'helper/threads.c')
-rw-r--r--helper/threads.c263
1 files changed, 228 insertions, 35 deletions
diff --git a/helper/threads.c b/helper/threads.c
index 01bc33eac..e30c9f29d 100644
--- a/helper/threads.c
+++ b/helper/threads.c
@@ -1,4 +1,5 @@
/* Copyright (c) 2013-2018, Linaro Limited
+ * Copyright (c) 2019, Nokia
* All rights reserved.
*
* SPDX-License-Identifier: BSD-3-Clause
@@ -23,24 +24,32 @@
#define FAILED_CPU -1
+/* Thread status codes */
+#define NOT_STARTED 0
+#define SYNC_INIT 1
+#define INIT_DONE 2
+#define STARTED 3
+
static odph_helper_options_t helper_options;
/*
- * wrapper for odpthreads, either implemented as linux threads or processes.
- * (in process mode, if start_routine returns NULL, the process return FAILURE).
+ * Run a thread, either as Linux pthread or process.
+ * In process mode, if start_routine returns NULL, the process return FAILURE.
*/
-static void *_odph_thread_run_start_routine(void *arg)
+static void *run_thread(void *arg)
{
int status;
int ret;
+ odp_instance_t instance;
odph_odpthread_params_t *thr_params;
- odph_odpthread_start_args_t *start_args = arg;
+ odph_thread_start_args_t *start_args = arg;
thr_params = &start_args->thr_params;
+ instance = start_args->instance;
/* ODP thread local init */
- if (odp_init_local(thr_params->instance, thr_params->thr_type)) {
+ if (odp_init_local(instance, thr_params->thr_type)) {
ODPH_ERR("Local init failed\n");
if (start_args->mem_model == ODP_MEM_MODEL_PROCESS)
_exit(EXIT_FAILURE);
@@ -54,6 +63,9 @@ static void *_odph_thread_run_start_routine(void *arg)
"pthread" : "process",
(int)getpid());
+ if (odp_atomic_load_u32(&start_args->status) == SYNC_INIT)
+ odp_atomic_store_rel_u32(&start_args->status, INIT_DONE);
+
status = thr_params->start(thr_params->arg);
ret = odp_term_local();
@@ -69,11 +81,9 @@ static void *_odph_thread_run_start_routine(void *arg)
}
/*
- * Create a single ODPthread as a linux process
+ * Create a single linux process
*/
-static int _odph_linux_process_create(odph_odpthread_t *thread_tbl,
- int cpu,
- const odph_odpthread_params_t *thr_params)
+static int create_process(odph_thread_t *thread, int cpu)
{
cpu_set_t cpu_set;
pid_t pid;
@@ -81,20 +91,19 @@ static int _odph_linux_process_create(odph_odpthread_t *thread_tbl,
CPU_ZERO(&cpu_set);
CPU_SET(cpu, &cpu_set);
- thread_tbl->start_args.thr_params = *thr_params; /* copy */
- thread_tbl->start_args.mem_model = ODP_MEM_MODEL_PROCESS;
- thread_tbl->cpu = cpu;
+ thread->start_args.mem_model = ODP_MEM_MODEL_PROCESS;
+ thread->cpu = cpu;
pid = fork();
if (pid < 0) {
ODPH_ERR("fork() failed\n");
- thread_tbl->cpu = FAILED_CPU;
+ thread->cpu = FAILED_CPU;
return -1;
}
/* Parent continues to fork */
if (pid > 0) {
- thread_tbl->proc.pid = pid;
+ thread->proc.pid = pid;
return 0;
}
@@ -111,17 +120,49 @@ static int _odph_linux_process_create(odph_odpthread_t *thread_tbl,
return -2;
}
- _odph_thread_run_start_routine(&thread_tbl->start_args);
+ run_thread(&thread->start_args);
return 0; /* never reached */
}
/*
- * Create a single ODPthread as a linux thread
+ * Wait single process to exit
*/
-static int odph_linux_thread_create(odph_odpthread_t *thread_tbl,
- int cpu,
- const odph_odpthread_params_t *thr_params)
+static int wait_process(odph_thread_t *thread)
+{
+ pid_t pid;
+ int status = 0;
+
+ pid = waitpid(thread->proc.pid, &status, 0);
+
+ if (pid < 0) {
+ ODPH_ERR("waitpid() failed\n");
+ return -1;
+ }
+
+ /* Examine the child process' termination status */
+ if (WIFEXITED(status) &&
+ WEXITSTATUS(status) != EXIT_SUCCESS) {
+ ODPH_ERR("Child exit status:%d (pid:%d)\n",
+ WEXITSTATUS(status), (int)pid);
+ return -1;
+ }
+
+ if (WIFSIGNALED(status)) {
+ int signo = WTERMSIG(status);
+
+ ODPH_ERR("Child term signo:%d - %s (pid:%d)\n",
+ signo, strsignal(signo), (int)pid);
+ return -1;
+ }
+
+ return 0;
+}
+
+/*
+ * Create a single linux pthread
+ */
+static int create_pthread(odph_thread_t *thread, int cpu)
{
int ret;
cpu_set_t cpu_set;
@@ -129,23 +170,22 @@ static int odph_linux_thread_create(odph_odpthread_t *thread_tbl,
CPU_ZERO(&cpu_set);
CPU_SET(cpu, &cpu_set);
- pthread_attr_init(&thread_tbl->thread.attr);
+ pthread_attr_init(&thread->thread.attr);
- thread_tbl->cpu = cpu;
+ thread->cpu = cpu;
- pthread_attr_setaffinity_np(&thread_tbl->thread.attr,
+ pthread_attr_setaffinity_np(&thread->thread.attr,
sizeof(cpu_set_t), &cpu_set);
- thread_tbl->start_args.thr_params = *thr_params; /* copy */
- thread_tbl->start_args.mem_model = ODP_MEM_MODEL_THREAD;
+ thread->start_args.mem_model = ODP_MEM_MODEL_THREAD;
- ret = pthread_create(&thread_tbl->thread.thread_id,
- &thread_tbl->thread.attr,
- _odph_thread_run_start_routine,
- &thread_tbl->start_args);
+ ret = pthread_create(&thread->thread.thread_id,
+ &thread->thread.attr,
+ run_thread,
+ &thread->start_args);
if (ret != 0) {
ODPH_ERR("Failed to start thread on cpu #%d\n", cpu);
- thread_tbl->cpu = FAILED_CPU;
+ thread->cpu = FAILED_CPU;
return ret;
}
@@ -153,6 +193,155 @@ static int odph_linux_thread_create(odph_odpthread_t *thread_tbl,
}
/*
+ * Wait single pthread to exit
+ */
+static int wait_pthread(odph_thread_t *thread)
+{
+ int ret;
+ void *thread_ret = NULL;
+
+ /* Wait thread to exit */
+ ret = pthread_join(thread->thread.thread_id, &thread_ret);
+
+ if (ret) {
+ ODPH_ERR("pthread_join failed (%i) from cpu #%i\n",
+ ret, thread->cpu);
+ return -1;
+ }
+
+ if (thread_ret) {
+ ODPH_ERR("Bad exit status cpu #%i %p\n",
+ thread->cpu, thread_ret);
+ return -1;
+ }
+
+ ret = pthread_attr_destroy(&thread->thread.attr);
+
+ if (ret) {
+ ODPH_ERR("pthread_attr_destroy failed (%i) from cpu #%i\n",
+ ret, thread->cpu);
+ return -1;
+ }
+
+ return 0;
+}
+
+int odph_thread_create(odph_thread_t thread[],
+ const odph_thread_common_param_t *param,
+ const odph_thread_param_t thr_param[],
+ int num)
+{
+ int i, num_cpu, cpu;
+ const odp_cpumask_t *cpumask = param->cpumask;
+ int use_pthread = 1;
+
+ if (param->thread_model == 1)
+ use_pthread = 0;
+
+ if (helper_options.mem_model == ODP_MEM_MODEL_PROCESS)
+ use_pthread = 0;
+
+ if (num < 1) {
+ ODPH_ERR("Bad number of threads (%i)\n", num);
+ return -1;
+ }
+
+ num_cpu = odp_cpumask_count(cpumask);
+
+ if (num_cpu != num) {
+ ODPH_ERR("Number of threads (%i) and CPUs (%i) does not match"
+ "\n", num, num_cpu);
+ return -1;
+ }
+
+ memset(thread, 0, num * sizeof(odph_thread_t));
+
+ cpu = odp_cpumask_first(cpumask);
+ for (i = 0; i < num; i++) {
+ odph_thread_start_args_t *start_args = &thread[i].start_args;
+
+ /* Copy thread parameters */
+ if (param->share_param)
+ start_args->thr_params = thr_param[0];
+ else
+ start_args->thr_params = thr_param[i];
+
+ start_args->instance = param->instance;
+
+ if (param->sync)
+ odp_atomic_init_u32(&start_args->status, SYNC_INIT);
+ else
+ odp_atomic_init_u32(&start_args->status, NOT_STARTED);
+
+ if (use_pthread) {
+ if (create_pthread(&thread[i], cpu))
+ break;
+ } else {
+ if (create_process(&thread[i], cpu))
+ break;
+ }
+
+ /* Wait newly created thread to update status */
+ if (param->sync) {
+ odp_time_t t1, t2;
+ uint64_t diff_ns;
+ uint32_t status;
+ int timeout = 0;
+ odp_atomic_u32_t *atomic = &start_args->status;
+
+ t1 = odp_time_local();
+
+ do {
+ odp_cpu_pause();
+ t2 = odp_time_local();
+ diff_ns = odp_time_diff_ns(t2, t1);
+ timeout = diff_ns > ODP_TIME_SEC_IN_NS;
+ status = odp_atomic_load_acq_u32(atomic);
+
+ } while (status != INIT_DONE && timeout == 0);
+
+ if (timeout) {
+ ODPH_ERR("Thread (i:%i) start up timeout\n", i);
+ break;
+ }
+ }
+
+ odp_atomic_store_u32(&start_args->status, STARTED);
+
+ cpu = odp_cpumask_next(cpumask, cpu);
+ }
+
+ return i;
+}
+
+int odph_thread_join(odph_thread_t thread[], int num)
+{
+ odph_thread_start_args_t *start_args;
+ int i;
+
+ for (i = 0; i < num; i++) {
+ start_args = &thread[i].start_args;
+
+ if (odp_atomic_load_u32(&start_args->status) != STARTED) {
+ ODPH_DBG("Thread (i:%i) not started.\n", i);
+ break;
+ }
+
+ if (thread[i].start_args.mem_model == ODP_MEM_MODEL_THREAD) {
+ if (wait_pthread(&thread[i]))
+ break;
+ } else {
+ if (wait_process(&thread[i]))
+ break;
+ }
+
+ odp_atomic_store_u32(&start_args->status, NOT_STARTED);
+ }
+
+ return i;
+}
+
+/*
* create an odpthread set (as linux processes or linux threads or both)
*/
int odph_odpthreads_create(odph_odpthread_t *thread_tbl,
@@ -179,15 +368,19 @@ int odph_odpthreads_create(odph_odpthread_t *thread_tbl,
cpu = odp_cpumask_first(mask);
for (i = 0; i < num; i++) {
+ odph_thread_start_args_t *start_args;
+
+ start_args = &thread_tbl[i].start_args;
+
+ /* Copy thread parameters */
+ start_args->thr_params = *thr_params;
+ start_args->instance = thr_params->instance;
+
if (helper_options.mem_model == ODP_MEM_MODEL_THREAD) {
- if (odph_linux_thread_create(&thread_tbl[i],
- cpu,
- thr_params))
+ if (create_pthread(&thread_tbl[i], cpu))
break;
} else {
- if (_odph_linux_process_create(&thread_tbl[i],
- cpu,
- thr_params))
+ if (create_process(&thread_tbl[i], cpu))
break;
}