aboutsummaryrefslogtreecommitdiff
path: root/helper
diff options
context:
space:
mode:
authorMike Holmes <mike.holmes@linaro.org>2015-04-15 14:25:22 -0400
committerMaxim Uvarov <maxim.uvarov@linaro.org>2015-04-17 16:30:43 +0300
commitdce6428f278a88a53765a38359201880eba67d1d (patch)
treeeff683a7d0f4699ccad26b22a5be759c08e810d6 /helper
parenta971dbef41829c528e8b0dc7396c17901206a4df (diff)
linux-generic: odp_linux: migrate helpers to helper dir
The odph helper src files do not belong in linux-generic, move them out to the helper directory. From the helper directory they may be more cleanly extended to support other execution environments beyond Linux. Clean up checkpatch whitespace warning in the migrated code. Signed-off-by: Mike Holmes <mike.holmes@linaro.org> Signed-off-by: Maxim Uvarov <maxim.uvarov@linaro.org>
Diffstat (limited to 'helper')
-rw-r--r--helper/linux.c248
-rw-r--r--helper/ring.c639
2 files changed, 887 insertions, 0 deletions
diff --git a/helper/linux.c b/helper/linux.c
new file mode 100644
index 0000000..b753e4d
--- /dev/null
+++ b/helper/linux.c
@@ -0,0 +1,248 @@
+/* Copyright (c) 2013, Linaro Limited
+ * All rights reserved.
+ *
+ * SPDX-License-Identifier: BSD-3-Clause
+ */
+
+#ifndef _GNU_SOURCE
+#define _GNU_SOURCE
+#endif
+#include <sched.h>
+#include <unistd.h>
+#include <sys/types.h>
+#include <sys/wait.h>
+
+#include <stdlib.h>
+#include <string.h>
+#include <stdio.h>
+
+#include <odp/helper/linux.h>
+#include <odp_internal.h>
+#include <odp/thread.h>
+#include <odp/init.h>
+#include <odp/system_info.h>
+#include <odp_debug_internal.h>
+
+int odph_linux_cpumask_default(odp_cpumask_t *mask, int num_in)
+{
+ int i;
+ int first_cpu = 1;
+ int num = num_in;
+ int cpu_count;
+
+ cpu_count = odp_cpu_count();
+
+ /*
+ * If no user supplied number or it's too large, then attempt
+ * to use all CPUs
+ */
+ if (0 == num)
+ num = cpu_count;
+ if (cpu_count < num)
+ num = cpu_count;
+
+ /*
+ * Always force "first_cpu" to a valid CPU
+ */
+ if (first_cpu >= cpu_count)
+ first_cpu = cpu_count - 1;
+
+ /* Build the mask */
+ odp_cpumask_zero(mask);
+ for (i = 0; i < num; i++) {
+ int cpu;
+
+ cpu = (first_cpu + i) % cpu_count;
+ odp_cpumask_set(mask, cpu);
+ }
+
+ return num;
+}
+
+static void *odp_run_start_routine(void *arg)
+{
+ odp_start_args_t *start_args = arg;
+
+ /* ODP thread local init */
+ if (odp_init_local()) {
+ ODP_ERR("Local init failed\n");
+ return NULL;
+ }
+
+ void *ret_ptr = start_args->start_routine(start_args->arg);
+ int ret = odp_term_local();
+ if (ret < 0)
+ ODP_ERR("Local term failed\n");
+ else if (ret == 0 && odp_term_global())
+ ODP_ERR("Global term failed\n");
+
+ return ret_ptr;
+}
+
+
+void odph_linux_pthread_create(odph_linux_pthread_t *thread_tbl,
+ const odp_cpumask_t *mask_in,
+ void *(*start_routine) (void *), void *arg)
+{
+ int i;
+ int num;
+ odp_cpumask_t mask;
+ int cpu_count;
+ int cpu;
+
+ odp_cpumask_copy(&mask, mask_in);
+ num = odp_cpumask_count(&mask);
+
+ memset(thread_tbl, 0, num * sizeof(odph_linux_pthread_t));
+
+ cpu_count = odp_cpu_count();
+
+ if (num < 1 || num > cpu_count) {
+ ODP_ERR("Bad num\n");
+ return;
+ }
+
+ cpu = odp_cpumask_first(&mask);
+ for (i = 0; i < num; i++) {
+ odp_cpumask_t thd_mask;
+
+ odp_cpumask_zero(&thd_mask);
+ odp_cpumask_set(&thd_mask, cpu);
+
+ pthread_attr_init(&thread_tbl[i].attr);
+
+ thread_tbl[i].cpu = cpu;
+
+ pthread_attr_setaffinity_np(&thread_tbl[i].attr,
+ sizeof(cpu_set_t), &thd_mask.set);
+
+ thread_tbl[i].start_args = malloc(sizeof(odp_start_args_t));
+ if (thread_tbl[i].start_args == NULL)
+ ODP_ABORT("Malloc failed");
+
+ thread_tbl[i].start_args->start_routine = start_routine;
+ thread_tbl[i].start_args->arg = arg;
+
+ pthread_create(&thread_tbl[i].thread, &thread_tbl[i].attr,
+ odp_run_start_routine, thread_tbl[i].start_args);
+
+ cpu = odp_cpumask_next(&mask, cpu);
+ }
+}
+
+
+void odph_linux_pthread_join(odph_linux_pthread_t *thread_tbl, int num)
+{
+ int i;
+
+ for (i = 0; i < num; i++) {
+ /* Wait thread to exit */
+ pthread_join(thread_tbl[i].thread, NULL);
+ pthread_attr_destroy(&thread_tbl[i].attr);
+ free(thread_tbl[i].start_args);
+ }
+}
+
+
+int odph_linux_process_fork_n(odph_linux_process_t *proc_tbl,
+ const odp_cpumask_t *mask_in)
+{
+ odp_cpumask_t mask;
+ pid_t pid;
+ int num;
+ int cpu_count;
+ int cpu;
+ int i;
+
+ odp_cpumask_copy(&mask, mask_in);
+ num = odp_cpumask_count(&mask);
+
+ memset(proc_tbl, 0, num * sizeof(odph_linux_process_t));
+
+ cpu_count = odp_cpu_count();
+
+ if (num < 1 || num > cpu_count) {
+ ODP_ERR("Bad num\n");
+ return -1;
+ }
+
+ cpu = odp_cpumask_first(&mask);
+ for (i = 0; i < num; i++) {
+ odp_cpumask_t proc_mask;
+
+ odp_cpumask_zero(&proc_mask);
+ odp_cpumask_set(&proc_mask, cpu);
+
+ pid = fork();
+
+ if (pid < 0) {
+ ODP_ERR("fork() failed\n");
+ return -1;
+ }
+
+ /* Parent continues to fork */
+ if (pid > 0) {
+ proc_tbl[i].pid = pid;
+ proc_tbl[i].cpu = cpu;
+
+ cpu = odp_cpumask_next(&mask, cpu);
+ continue;
+ }
+
+ /* Child process */
+ if (sched_setaffinity(0, sizeof(cpu_set_t), &proc_mask.set)) {
+ ODP_ERR("sched_setaffinity() failed\n");
+ return -2;
+ }
+
+ if (odp_init_local()) {
+ ODP_ERR("Local init failed\n");
+ return -2;
+ }
+
+ return 0;
+ }
+
+ return 1;
+}
+
+
+int odph_linux_process_fork(odph_linux_process_t *proc, int cpu)
+{
+ odp_cpumask_t mask;
+
+ odp_cpumask_zero(&mask);
+ odp_cpumask_set(&mask, cpu);
+ return odph_linux_process_fork_n(proc, &mask);
+}
+
+
+int odph_linux_process_wait_n(odph_linux_process_t *proc_tbl, int num)
+{
+ pid_t pid;
+ int i, j;
+ int status;
+
+ for (i = 0; i < num; i++) {
+ pid = wait(&status);
+
+ if (pid < 0) {
+ ODP_ERR("wait() failed\n");
+ return -1;
+ }
+
+ for (j = 0; j < num; j++) {
+ if (proc_tbl[j].pid == pid) {
+ proc_tbl[j].status = status;
+ break;
+ }
+ }
+
+ if (j == num) {
+ ODP_ERR("Bad pid\n");
+ return -1;
+ }
+ }
+
+ return 0;
+}
diff --git a/helper/ring.c b/helper/ring.c
new file mode 100644
index 0000000..a24a020
--- /dev/null
+++ b/helper/ring.c
@@ -0,0 +1,639 @@
+/* Copyright (c) 2014, Linaro Limited
+ * All rights reserved.
+ *
+ * SPDX-License-Identifier: BSD-3-Clause
+ */
+
+/*-
+ * BSD LICENSE
+ *
+ * Copyright(c) 2010-2013 Intel Corporation. All rights reserved.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in
+ * the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Intel Corporation nor the names of its
+ * contributors may be used to endorse or promote products derived
+ * from this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+/*
+ * Derived from FreeBSD's bufring.c
+ *
+ **************************************************************************
+ *
+ * Copyright (c) 2007,2008 Kip Macy kmacy@freebsd.org
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ *
+ * 2. The name of Kip Macy nor the names of other
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ *
+ ***************************************************************************/
+
+#include <odp/shared_memory.h>
+#include <odp_internal.h>
+#include <odp_spin_internal.h>
+#include <odp_align_internal.h>
+#include <odp/spinlock.h>
+#include <odp/align.h>
+#include <sys/mman.h>
+#include <fcntl.h>
+#include <stdio.h>
+#include <string.h>
+#include <odp_debug_internal.h>
+#include <odp/rwlock.h>
+#include <odp/helper/ring.h>
+
+static TAILQ_HEAD(, odph_ring) odp_ring_list;
+
+/*
+ * the enqueue of pointers on the ring.
+ */
+#define ENQUEUE_PTRS() do { \
+ const uint32_t size = r->prod.size; \
+ uint32_t idx = prod_head & mask; \
+ if (odp_likely(idx + n < size)) { \
+ for (i = 0; i < (n & ((~(unsigned)0x3))); i += 4, idx += 4) { \
+ r->ring[idx] = obj_table[i]; \
+ r->ring[idx+1] = obj_table[i+1]; \
+ r->ring[idx+2] = obj_table[i+2]; \
+ r->ring[idx+3] = obj_table[i+3]; \
+ } \
+ switch (n & 0x3) { \
+ case 3: \
+ r->ring[idx++] = obj_table[i++]; \
+ case 2: \
+ r->ring[idx++] = obj_table[i++]; \
+ case 1: \
+ r->ring[idx++] = obj_table[i++]; \
+ } \
+ } else { \
+ for (i = 0; idx < size; i++, idx++)\
+ r->ring[idx] = obj_table[i]; \
+ for (idx = 0; i < n; i++, idx++) \
+ r->ring[idx] = obj_table[i]; \
+ } \
+} while (0)
+
+/*
+ * the actual copy of pointers on the ring to obj_table.
+ */
+#define DEQUEUE_PTRS() do { \
+ uint32_t idx = cons_head & mask; \
+ const uint32_t size = r->cons.size; \
+ if (odp_likely(idx + n < size)) { \
+ for (i = 0; i < (n & (~(unsigned)0x3)); i += 4, idx += 4) {\
+ obj_table[i] = r->ring[idx]; \
+ obj_table[i+1] = r->ring[idx+1]; \
+ obj_table[i+2] = r->ring[idx+2]; \
+ obj_table[i+3] = r->ring[idx+3]; \
+ } \
+ switch (n & 0x3) { \
+ case 3: \
+ obj_table[i++] = r->ring[idx++]; \
+ case 2: \
+ obj_table[i++] = r->ring[idx++]; \
+ case 1: \
+ obj_table[i++] = r->ring[idx++]; \
+ } \
+ } else { \
+ for (i = 0; idx < size; i++, idx++) \
+ obj_table[i] = r->ring[idx]; \
+ for (idx = 0; i < n; i++, idx++) \
+ obj_table[i] = r->ring[idx]; \
+ } \
+} while (0)
+
+static odp_rwlock_t qlock; /* rings tailq lock */
+
+/* init tailq_ring */
+void odph_ring_tailq_init(void)
+{
+ TAILQ_INIT(&odp_ring_list);
+ odp_rwlock_init(&qlock);
+}
+
+/* create the ring */
+odph_ring_t *
+odph_ring_create(const char *name, unsigned count, unsigned flags)
+{
+ char ring_name[ODPH_RING_NAMESIZE];
+ odph_ring_t *r;
+ size_t ring_size;
+ odp_shm_t shm;
+
+ /* count must be a power of 2 */
+ if (!ODP_VAL_IS_POWER_2(count) || (count > ODPH_RING_SZ_MASK)) {
+ ODP_ERR("Requested size is invalid, must be power of 2, and do not exceed the size limit %u\n",
+ ODPH_RING_SZ_MASK);
+ return NULL;
+ }
+
+ snprintf(ring_name, sizeof(ring_name), "%s", name);
+ ring_size = count*sizeof(void *)+sizeof(odph_ring_t);
+
+ odp_rwlock_write_lock(&qlock);
+ /* reserve a memory zone for this ring.*/
+ shm = odp_shm_reserve(ring_name, ring_size, ODP_CACHE_LINE_SIZE, 0);
+
+ r = odp_shm_addr(shm);
+
+ if (r != NULL) {
+ /* init the ring structure */
+ snprintf(r->name, sizeof(r->name), "%s", name);
+ r->flags = flags;
+ r->prod.watermark = count;
+ r->prod.sp_enqueue = !!(flags & ODPH_RING_F_SP_ENQ);
+ r->cons.sc_dequeue = !!(flags & ODPH_RING_F_SC_DEQ);
+ r->prod.size = count;
+ r->cons.size = count;
+ r->prod.mask = count-1;
+ r->cons.mask = count-1;
+ r->prod.head = 0;
+ r->cons.head = 0;
+ r->prod.tail = 0;
+ r->cons.tail = 0;
+
+ TAILQ_INSERT_TAIL(&odp_ring_list, r, next);
+ } else {
+ ODP_ERR("Cannot reserve memory\n");
+ }
+
+ odp_rwlock_write_unlock(&qlock);
+ return r;
+}
+
+/*
+ * change the high water mark. If *count* is 0, water marking is
+ * disabled
+ */
+int odph_ring_set_water_mark(odph_ring_t *r, unsigned count)
+{
+ if (count >= r->prod.size)
+ return -EINVAL;
+
+ /* if count is 0, disable the watermarking */
+ if (count == 0)
+ count = r->prod.size;
+
+ r->prod.watermark = count;
+ return 0;
+}
+
+/**
+ * Enqueue several objects on the ring (multi-producers safe).
+ */
+int __odph_ring_mp_do_enqueue(odph_ring_t *r, void * const *obj_table,
+ unsigned n, enum odph_ring_queue_behavior behavior)
+{
+ uint32_t prod_head, prod_next;
+ uint32_t cons_tail, free_entries;
+ const unsigned max = n;
+ int success;
+ unsigned i;
+ uint32_t mask = r->prod.mask;
+ int ret;
+
+ /* move prod.head atomically */
+ do {
+ /* Reset n to the initial burst count */
+ n = max;
+
+ prod_head = r->prod.head;
+ cons_tail = r->cons.tail;
+ /* The subtraction is done between two unsigned 32bits value
+ * (the result is always modulo 32 bits even if we have
+ * prod_head > cons_tail). So 'free_entries' is always between 0
+ * and size(ring)-1. */
+ free_entries = (mask + cons_tail - prod_head);
+
+ /* check that we have enough room in ring */
+ if (odp_unlikely(n > free_entries)) {
+ if (behavior == ODPH_RING_QUEUE_FIXED) {
+ return -ENOBUFS;
+ } else {
+ /* No free entry available */
+ if (odp_unlikely(free_entries == 0))
+ return 0;
+
+ n = free_entries;
+ }
+ }
+
+ prod_next = prod_head + n;
+ success = __atomic_compare_exchange_n(&r->prod.head,
+ &prod_head,
+ prod_next,
+ false/*strong*/,
+ __ATOMIC_ACQUIRE,
+ __ATOMIC_RELAXED);
+ } while (odp_unlikely(success == 0));
+
+ /* write entries in ring */
+ ENQUEUE_PTRS();
+
+ /* if we exceed the watermark */
+ if (odp_unlikely(((mask + 1) - free_entries + n) > r->prod.watermark)) {
+ ret = (behavior == ODPH_RING_QUEUE_FIXED) ? -EDQUOT :
+ (int)(n | ODPH_RING_QUOT_EXCEED);
+ } else {
+ ret = (behavior == ODPH_RING_QUEUE_FIXED) ? 0 : n;
+ }
+
+ /*
+ * If there are other enqueues in progress that preceeded us,
+ * we need to wait for them to complete
+ */
+ while (odp_unlikely(r->prod.tail != prod_head))
+ odp_spin();
+
+ /* Release our entries and the memory they refer to */
+ __atomic_thread_fence(__ATOMIC_RELEASE);
+ r->prod.tail = prod_next;
+ return ret;
+}
+
+/**
+ * Enqueue several objects on a ring (NOT multi-producers safe).
+ */
+int __odph_ring_sp_do_enqueue(odph_ring_t *r, void * const *obj_table,
+ unsigned n, enum odph_ring_queue_behavior behavior)
+{
+ uint32_t prod_head, cons_tail;
+ uint32_t prod_next, free_entries;
+ unsigned i;
+ uint32_t mask = r->prod.mask;
+ int ret;
+
+ prod_head = r->prod.head;
+ cons_tail = r->cons.tail;
+ /* The subtraction is done between two unsigned 32bits value
+ * (the result is always modulo 32 bits even if we have
+ * prod_head > cons_tail). So 'free_entries' is always between 0
+ * and size(ring)-1. */
+ free_entries = mask + cons_tail - prod_head;
+
+ /* check that we have enough room in ring */
+ if (odp_unlikely(n > free_entries)) {
+ if (behavior == ODPH_RING_QUEUE_FIXED) {
+ return -ENOBUFS;
+ } else {
+ /* No free entry available */
+ if (odp_unlikely(free_entries == 0))
+ return 0;
+
+ n = free_entries;
+ }
+ }
+
+ prod_next = prod_head + n;
+ r->prod.head = prod_next;
+
+ /* write entries in ring */
+ ENQUEUE_PTRS();
+
+ /* if we exceed the watermark */
+ if (odp_unlikely(((mask + 1) - free_entries + n) > r->prod.watermark)) {
+ ret = (behavior == ODPH_RING_QUEUE_FIXED) ? -EDQUOT :
+ (int)(n | ODPH_RING_QUOT_EXCEED);
+ } else {
+ ret = (behavior == ODPH_RING_QUEUE_FIXED) ? 0 : n;
+ }
+
+ /* Release our entries and the memory they refer to */
+ __atomic_thread_fence(__ATOMIC_RELEASE);
+ r->prod.tail = prod_next;
+ return ret;
+}
+
+/**
+ * Dequeue several objects from a ring (multi-consumers safe).
+ */
+
+int __odph_ring_mc_do_dequeue(odph_ring_t *r, void **obj_table,
+ unsigned n, enum odph_ring_queue_behavior behavior)
+{
+ uint32_t cons_head, prod_tail;
+ uint32_t cons_next, entries;
+ const unsigned max = n;
+ int success;
+ unsigned i;
+ uint32_t mask = r->prod.mask;
+
+ /* move cons.head atomically */
+ do {
+ /* Restore n as it may change every loop */
+ n = max;
+
+ cons_head = r->cons.head;
+ prod_tail = r->prod.tail;
+ /* The subtraction is done between two unsigned 32bits value
+ * (the result is always modulo 32 bits even if we have
+ * cons_head > prod_tail). So 'entries' is always between 0
+ * and size(ring)-1. */
+ entries = (prod_tail - cons_head);
+
+ /* Set the actual entries for dequeue */
+ if (n > entries) {
+ if (behavior == ODPH_RING_QUEUE_FIXED) {
+ return -ENOENT;
+ } else {
+ if (odp_unlikely(entries == 0))
+ return 0;
+
+ n = entries;
+ }
+ }
+
+ cons_next = cons_head + n;
+ success = __atomic_compare_exchange_n(&r->cons.head,
+ &cons_head,
+ cons_next,
+ false/*strong*/,
+ __ATOMIC_ACQUIRE,
+ __ATOMIC_RELAXED);
+ } while (odp_unlikely(success == 0));
+
+ /* copy in table */
+ DEQUEUE_PTRS();
+
+ /*
+ * If there are other dequeues in progress that preceded us,
+ * we need to wait for them to complete
+ */
+ while (odp_unlikely(r->cons.tail != cons_head))
+ odp_spin();
+
+ /* Release our entries and the memory they refer to */
+ __atomic_thread_fence(__ATOMIC_RELEASE);
+ r->cons.tail = cons_next;
+
+ return behavior == ODPH_RING_QUEUE_FIXED ? 0 : n;
+}
+
+/**
+ * Dequeue several objects from a ring (NOT multi-consumers safe).
+ */
+int __odph_ring_sc_do_dequeue(odph_ring_t *r, void **obj_table,
+ unsigned n, enum odph_ring_queue_behavior behavior)
+{
+ uint32_t cons_head, prod_tail;
+ uint32_t cons_next, entries;
+ unsigned i;
+ uint32_t mask = r->prod.mask;
+
+ cons_head = r->cons.head;
+ prod_tail = r->prod.tail;
+ /* The subtraction is done between two unsigned 32bits value
+ * (the result is always modulo 32 bits even if we have
+ * cons_head > prod_tail). So 'entries' is always between 0
+ * and size(ring)-1. */
+ entries = prod_tail - cons_head;
+
+ if (n > entries) {
+ if (behavior == ODPH_RING_QUEUE_FIXED) {
+ return -ENOENT;
+ } else {
+ if (odp_unlikely(entries == 0))
+ return 0;
+
+ n = entries;
+ }
+ }
+
+ cons_next = cons_head + n;
+ r->cons.head = cons_next;
+
+ /* Acquire the pointers and the memory they refer to */
+ __atomic_thread_fence(__ATOMIC_ACQUIRE);
+ /* copy in table */
+ DEQUEUE_PTRS();
+
+ r->cons.tail = cons_next;
+ return behavior == ODPH_RING_QUEUE_FIXED ? 0 : n;
+}
+
+/**
+ * Enqueue several objects on the ring (multi-producers safe).
+ */
+int odph_ring_mp_enqueue_bulk(odph_ring_t *r, void * const *obj_table,
+ unsigned n)
+{
+ return __odph_ring_mp_do_enqueue(r, obj_table, n,
+ ODPH_RING_QUEUE_FIXED);
+}
+
+/**
+ * Enqueue several objects on a ring (NOT multi-producers safe).
+ */
+int odph_ring_sp_enqueue_bulk(odph_ring_t *r, void * const *obj_table,
+ unsigned n)
+{
+ return __odph_ring_sp_do_enqueue(r, obj_table, n,
+ ODPH_RING_QUEUE_FIXED);
+}
+
+/**
+ * Dequeue several objects from a ring (multi-consumers safe).
+ */
+int odph_ring_mc_dequeue_bulk(odph_ring_t *r, void **obj_table, unsigned n)
+{
+ return __odph_ring_mc_do_dequeue(r, obj_table, n,
+ ODPH_RING_QUEUE_FIXED);
+}
+
+/**
+ * Dequeue several objects from a ring (NOT multi-consumers safe).
+ */
+int odph_ring_sc_dequeue_bulk(odph_ring_t *r, void **obj_table, unsigned n)
+{
+ return __odph_ring_sc_do_dequeue(r, obj_table, n,
+ ODPH_RING_QUEUE_FIXED);
+}
+
+/**
+ * Test if a ring is full.
+ */
+int odph_ring_full(const odph_ring_t *r)
+{
+ uint32_t prod_tail = r->prod.tail;
+ uint32_t cons_tail = r->cons.tail;
+ return (((cons_tail - prod_tail - 1) & r->prod.mask) == 0);
+}
+
+/**
+ * Test if a ring is empty.
+ */
+int odph_ring_empty(const odph_ring_t *r)
+{
+ uint32_t prod_tail = r->prod.tail;
+ uint32_t cons_tail = r->cons.tail;
+ return !!(cons_tail == prod_tail);
+}
+
+/**
+ * Return the number of entries in a ring.
+ */
+unsigned odph_ring_count(const odph_ring_t *r)
+{
+ uint32_t prod_tail = r->prod.tail;
+ uint32_t cons_tail = r->cons.tail;
+ return (prod_tail - cons_tail) & r->prod.mask;
+}
+
+/**
+ * Return the number of free entries in a ring.
+ */
+unsigned odph_ring_free_count(const odph_ring_t *r)
+{
+ uint32_t prod_tail = r->prod.tail;
+ uint32_t cons_tail = r->cons.tail;
+ return (cons_tail - prod_tail - 1) & r->prod.mask;
+}
+
+/* dump the status of the ring on the console */
+void odph_ring_dump(const odph_ring_t *r)
+{
+ ODP_DBG("ring <%s>@%p\n", r->name, r);
+ ODP_DBG(" flags=%x\n", r->flags);
+ ODP_DBG(" size=%"PRIu32"\n", r->prod.size);
+ ODP_DBG(" ct=%"PRIu32"\n", r->cons.tail);
+ ODP_DBG(" ch=%"PRIu32"\n", r->cons.head);
+ ODP_DBG(" pt=%"PRIu32"\n", r->prod.tail);
+ ODP_DBG(" ph=%"PRIu32"\n", r->prod.head);
+ ODP_DBG(" used=%u\n", odph_ring_count(r));
+ ODP_DBG(" avail=%u\n", odph_ring_free_count(r));
+ if (r->prod.watermark == r->prod.size)
+ ODP_DBG(" watermark=0\n");
+ else
+ ODP_DBG(" watermark=%"PRIu32"\n", r->prod.watermark);
+}
+
+/* dump the status of all rings on the console */
+void odph_ring_list_dump(void)
+{
+ const odph_ring_t *mp = NULL;
+
+ odp_rwlock_read_lock(&qlock);
+
+ TAILQ_FOREACH(mp, &odp_ring_list, next) {
+ odph_ring_dump(mp);
+ }
+
+ odp_rwlock_read_unlock(&qlock);
+}
+
+/* search a ring from its name */
+odph_ring_t *odph_ring_lookup(const char *name)
+{
+ odph_ring_t *r;
+
+ odp_rwlock_read_lock(&qlock);
+ TAILQ_FOREACH(r, &odp_ring_list, next) {
+ if (strncmp(name, r->name, ODPH_RING_NAMESIZE) == 0)
+ break;
+ }
+ odp_rwlock_read_unlock(&qlock);
+
+ return r;
+}
+
+/**
+ * Enqueue several objects on the ring (multi-producers safe).
+ */
+int odph_ring_mp_enqueue_burst(odph_ring_t *r, void * const *obj_table,
+ unsigned n)
+{
+ return __odph_ring_mp_do_enqueue(r, obj_table, n,
+ ODPH_RING_QUEUE_VARIABLE);
+}
+
+/**
+ * Enqueue several objects on a ring (NOT multi-producers safe).
+ */
+int odph_ring_sp_enqueue_burst(odph_ring_t *r, void * const *obj_table,
+ unsigned n)
+{
+ return __odph_ring_sp_do_enqueue(r, obj_table, n,
+ ODPH_RING_QUEUE_VARIABLE);
+}
+
+/**
+ * Enqueue several objects on a ring.
+ */
+int odph_ring_enqueue_burst(odph_ring_t *r, void * const *obj_table,
+ unsigned n)
+{
+ if (r->prod.sp_enqueue)
+ return odph_ring_sp_enqueue_burst(r, obj_table, n);
+ else
+ return odph_ring_mp_enqueue_burst(r, obj_table, n);
+}
+
+/**
+ * Dequeue several objects from a ring (multi-consumers safe).
+ */
+int odph_ring_mc_dequeue_burst(odph_ring_t *r, void **obj_table, unsigned n)
+{
+ return __odph_ring_mc_do_dequeue(r, obj_table, n,
+ ODPH_RING_QUEUE_VARIABLE);
+}
+
+/**
+ * Dequeue several objects from a ring (NOT multi-consumers safe).
+ */
+int odph_ring_sc_dequeue_burst(odph_ring_t *r, void **obj_table, unsigned n)
+{
+ return __odph_ring_sc_do_dequeue(r, obj_table, n,
+ ODPH_RING_QUEUE_VARIABLE);
+}
+
+/**
+ * Dequeue multiple objects from a ring up to a maximum number.
+ */
+int odph_ring_dequeue_burst(odph_ring_t *r, void **obj_table, unsigned n)
+{
+ if (r->cons.sc_dequeue)
+ return odph_ring_sc_dequeue_burst(r, obj_table, n);
+ else
+ return odph_ring_mc_dequeue_burst(r, obj_table, n);
+}