diff options
Diffstat (limited to 'platform/linux-generic/odp_ring.c')
-rw-r--r-- | platform/linux-generic/odp_ring.c | 619 |
1 files changed, 619 insertions, 0 deletions
diff --git a/platform/linux-generic/odp_ring.c b/platform/linux-generic/odp_ring.c new file mode 100644 index 000000000..25ff66a0f --- /dev/null +++ b/platform/linux-generic/odp_ring.c @@ -0,0 +1,619 @@ +/* Copyright (c) 2014, Linaro Limited + * All rights reserved. + * + * SPDX-License-Identifier: BSD-3-Clause + */ + +/*- + * BSD LICENSE + * + * Copyright(c) 2010-2013 Intel Corporation. All rights reserved. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in + * the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Intel Corporation nor the names of its + * contributors may be used to endorse or promote products derived + * from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +/* + * Derived from FreeBSD's bufring.c + * + ************************************************************************** + * + * Copyright (c) 2007,2008 Kip Macy kmacy@freebsd.org + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * + * 2. The name of Kip Macy nor the names of other + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + * + ***************************************************************************/ + +#include <odp_shared_memory.h> +#include <odp_internal.h> +#include <odp_spin_internal.h> +#include <odp_spinlock.h> +#include <odp_align.h> +#include <sys/mman.h> +#include <fcntl.h> +#include <stdio.h> +#include <string.h> +#include <odp_debug.h> +#include <odp_rwlock.h> +#include <helper/odp_ring.h> + +static TAILQ_HEAD(, odp_ring) odp_ring_list; + +/* + * the enqueue of pointers on the ring. + */ +#define ENQUEUE_PTRS() do { \ + const uint32_t size = r->prod.size; \ + uint32_t idx = prod_head & mask; \ + if (odp_likely(idx + n < size)) { \ + for (i = 0; i < (n & ((~(unsigned)0x3))); i += 4, idx += 4) { \ + r->ring[idx] = obj_table[i]; \ + r->ring[idx+1] = obj_table[i+1]; \ + r->ring[idx+2] = obj_table[i+2]; \ + r->ring[idx+3] = obj_table[i+3]; \ + } \ + switch (n & 0x3) { \ + case 3: \ + r->ring[idx++] = obj_table[i++]; \ + case 2: \ + r->ring[idx++] = obj_table[i++]; \ + case 1: \ + r->ring[idx++] = obj_table[i++]; \ + } \ + } else { \ + for (i = 0; idx < size; i++, idx++)\ + r->ring[idx] = obj_table[i]; \ + for (idx = 0; i < n; i++, idx++) \ + r->ring[idx] = obj_table[i]; \ + } \ +} while (0) + +/* + * the actual copy of pointers on the ring to obj_table. + */ +#define DEQUEUE_PTRS() do { \ + uint32_t idx = cons_head & mask; \ + const uint32_t size = r->cons.size; \ + if (odp_likely(idx + n < size)) { \ + for (i = 0; i < (n & (~(unsigned)0x3)); i += 4, idx += 4) {\ + obj_table[i] = r->ring[idx]; \ + obj_table[i+1] = r->ring[idx+1]; \ + obj_table[i+2] = r->ring[idx+2]; \ + obj_table[i+3] = r->ring[idx+3]; \ + } \ + switch (n & 0x3) { \ + case 3: \ + obj_table[i++] = r->ring[idx++]; \ + case 2: \ + obj_table[i++] = r->ring[idx++]; \ + case 1: \ + obj_table[i++] = r->ring[idx++]; \ + } \ + } else { \ + for (i = 0; idx < size; i++, idx++) \ + obj_table[i] = r->ring[idx]; \ + for (idx = 0; i < n; i++, idx++) \ + obj_table[i] = r->ring[idx]; \ + } \ +} while (0) + +static odp_rwlock_t qlock; /* rings tailq lock */ + +/* init tailq_ring */ +void odp_ring_tailq_init(void) +{ + TAILQ_INIT(&odp_ring_list); + odp_rwlock_init(&qlock); +} + +/* create the ring */ +odp_ring_t * +odp_ring_create(const char *name, unsigned count, unsigned flags) +{ + char ring_name[ODP_RING_NAMESIZE]; + odp_ring_t *r; + size_t ring_size; + + /* count must be a power of 2 */ + if (!ODP_VAL_IS_POWER_2(count) || (count > ODP_RING_SZ_MASK)) { + ODP_ERR("Requested size is invalid, must be power of 2, and do not exceed the size limit %u\n", + ODP_RING_SZ_MASK); + return NULL; + } + + snprintf(ring_name, sizeof(ring_name), "%s", name); + ring_size = count*sizeof(void *)+sizeof(odp_ring_t); + + odp_rwlock_write_lock(&qlock); + /* reserve a memory zone for this ring.*/ + r = odp_shm_reserve(ring_name, ring_size, ODP_CACHE_LINE_SIZE); + + if (r != NULL) { + /* init the ring structure */ + snprintf(r->name, sizeof(r->name), "%s", name); + r->flags = flags; + r->prod.watermark = count; + r->prod.sp_enqueue = !!(flags & ODP_RING_F_SP_ENQ); + r->cons.sc_dequeue = !!(flags & ODP_RING_F_SC_DEQ); + r->prod.size = count; + r->cons.size = count; + r->prod.mask = count-1; + r->cons.mask = count-1; + r->prod.head = 0; + r->cons.head = 0; + r->prod.tail = 0; + r->cons.tail = 0; + + TAILQ_INSERT_TAIL(&odp_ring_list, r, next); + } else { + ODP_ERR("Cannot reserve memory\n"); + } + + odp_rwlock_write_unlock(&qlock); + return r; +} + +/* + * change the high water mark. If *count* is 0, water marking is + * disabled + */ +int odp_ring_set_water_mark(odp_ring_t *r, unsigned count) +{ + if (count >= r->prod.size) + return -EINVAL; + + /* if count is 0, disable the watermarking */ + if (count == 0) + count = r->prod.size; + + r->prod.watermark = count; + return 0; +} + +/** + * Enqueue several objects on the ring (multi-producers safe). + */ +int __odp_ring_mp_do_enqueue(odp_ring_t *r, void * const *obj_table, + unsigned n, enum odp_ring_queue_behavior behavior) +{ + uint32_t prod_head, prod_next; + uint32_t cons_tail, free_entries; + const unsigned max = n; + int success; + unsigned i; + uint32_t mask = r->prod.mask; + int ret; + + /* move prod.head atomically */ + do { + /* Reset n to the initial burst count */ + n = max; + + prod_head = r->prod.head; + cons_tail = r->cons.tail; + /* The subtraction is done between two unsigned 32bits value + * (the result is always modulo 32 bits even if we have + * prod_head > cons_tail). So 'free_entries' is always between 0 + * and size(ring)-1. */ + free_entries = (mask + cons_tail - prod_head); + + /* check that we have enough room in ring */ + if (odp_unlikely(n > free_entries)) { + if (behavior == ODP_RING_QUEUE_FIXED) { + return -ENOBUFS; + } else { + /* No free entry available */ + if (odp_unlikely(free_entries == 0)) + return 0; + + n = free_entries; + } + } + + prod_next = prod_head + n; + success = odp_atomic_cmpset_u32(&r->prod.head, prod_head, + prod_next); + } while (odp_unlikely(success == 0)); + + /* write entries in ring */ + ENQUEUE_PTRS(); + odp_mem_barrier(); + + /* if we exceed the watermark */ + if (odp_unlikely(((mask + 1) - free_entries + n) > r->prod.watermark)) { + ret = (behavior == ODP_RING_QUEUE_FIXED) ? -EDQUOT : + (int)(n | ODP_RING_QUOT_EXCEED); + } else { + ret = (behavior == ODP_RING_QUEUE_FIXED) ? 0 : n; + } + + /* + * If there are other enqueues in progress that preceeded us, + * we need to wait for them to complete + */ + while (odp_unlikely(r->prod.tail != prod_head)) + odp_spin(); + + r->prod.tail = prod_next; + return ret; +} + +/** + * Enqueue several objects on a ring (NOT multi-producers safe). + */ +int __odp_ring_sp_do_enqueue(odp_ring_t *r, void * const *obj_table, + unsigned n, enum odp_ring_queue_behavior behavior) +{ + uint32_t prod_head, cons_tail; + uint32_t prod_next, free_entries; + unsigned i; + uint32_t mask = r->prod.mask; + int ret; + + prod_head = r->prod.head; + cons_tail = r->cons.tail; + /* The subtraction is done between two unsigned 32bits value + * (the result is always modulo 32 bits even if we have + * prod_head > cons_tail). So 'free_entries' is always between 0 + * and size(ring)-1. */ + free_entries = mask + cons_tail - prod_head; + + /* check that we have enough room in ring */ + if (odp_unlikely(n > free_entries)) { + if (behavior == ODP_RING_QUEUE_FIXED) { + return -ENOBUFS; + } else { + /* No free entry available */ + if (odp_unlikely(free_entries == 0)) + return 0; + + n = free_entries; + } + } + + prod_next = prod_head + n; + r->prod.head = prod_next; + + /* write entries in ring */ + ENQUEUE_PTRS(); + odp_mem_barrier(); + + /* if we exceed the watermark */ + if (odp_unlikely(((mask + 1) - free_entries + n) > r->prod.watermark)) { + ret = (behavior == ODP_RING_QUEUE_FIXED) ? -EDQUOT : + (int)(n | ODP_RING_QUOT_EXCEED); + } else { + ret = (behavior == ODP_RING_QUEUE_FIXED) ? 0 : n; + } + + r->prod.tail = prod_next; + return ret; +} + +/** + * Dequeue several objects from a ring (multi-consumers safe). + */ + +int __odp_ring_mc_do_dequeue(odp_ring_t *r, void **obj_table, + unsigned n, enum odp_ring_queue_behavior behavior) +{ + uint32_t cons_head, prod_tail; + uint32_t cons_next, entries; + const unsigned max = n; + int success; + unsigned i; + uint32_t mask = r->prod.mask; + + /* move cons.head atomically */ + do { + /* Restore n as it may change every loop */ + n = max; + + cons_head = r->cons.head; + prod_tail = r->prod.tail; + /* The subtraction is done between two unsigned 32bits value + * (the result is always modulo 32 bits even if we have + * cons_head > prod_tail). So 'entries' is always between 0 + * and size(ring)-1. */ + entries = (prod_tail - cons_head); + + /* Set the actual entries for dequeue */ + if (n > entries) { + if (behavior == ODP_RING_QUEUE_FIXED) { + return -ENOENT; + } else { + if (odp_unlikely(entries == 0)) + return 0; + + n = entries; + } + } + + cons_next = cons_head + n; + success = odp_atomic_cmpset_u32(&r->cons.head, cons_head, + cons_next); + } while (odp_unlikely(success == 0)); + + /* copy in table */ + DEQUEUE_PTRS(); + odp_mem_barrier(); + + /* + * If there are other dequeues in progress that preceded us, + * we need to wait for them to complete + */ + while (odp_unlikely(r->cons.tail != cons_head)) + odp_spin(); + + r->cons.tail = cons_next; + + return behavior == ODP_RING_QUEUE_FIXED ? 0 : n; +} + +/** + * Dequeue several objects from a ring (NOT multi-consumers safe). + */ +int __odp_ring_sc_do_dequeue(odp_ring_t *r, void **obj_table, + unsigned n, enum odp_ring_queue_behavior behavior) +{ + uint32_t cons_head, prod_tail; + uint32_t cons_next, entries; + unsigned i; + uint32_t mask = r->prod.mask; + + cons_head = r->cons.head; + prod_tail = r->prod.tail; + /* The subtraction is done between two unsigned 32bits value + * (the result is always modulo 32 bits even if we have + * cons_head > prod_tail). So 'entries' is always between 0 + * and size(ring)-1. */ + entries = prod_tail - cons_head; + + if (n > entries) { + if (behavior == ODP_RING_QUEUE_FIXED) { + return -ENOENT; + } else { + if (odp_unlikely(entries == 0)) + return 0; + + n = entries; + } + } + + cons_next = cons_head + n; + r->cons.head = cons_next; + + /* copy in table */ + DEQUEUE_PTRS(); + odp_mem_barrier(); + + r->cons.tail = cons_next; + return behavior == ODP_RING_QUEUE_FIXED ? 0 : n; +} + +/** + * Enqueue several objects on the ring (multi-producers safe). + */ +int odp_ring_mp_enqueue_bulk(odp_ring_t *r, void * const *obj_table, + unsigned n) +{ + return __odp_ring_mp_do_enqueue(r, obj_table, n, ODP_RING_QUEUE_FIXED); +} + +/** + * Enqueue several objects on a ring (NOT multi-producers safe). + */ +int odp_ring_sp_enqueue_bulk(odp_ring_t *r, void * const *obj_table, + unsigned n) +{ + return __odp_ring_sp_do_enqueue(r, obj_table, n, ODP_RING_QUEUE_FIXED); +} + +/** + * Dequeue several objects from a ring (multi-consumers safe). + */ +int odp_ring_mc_dequeue_bulk(odp_ring_t *r, void **obj_table, unsigned n) +{ + return __odp_ring_mc_do_dequeue(r, obj_table, n, ODP_RING_QUEUE_FIXED); +} + +/** + * Dequeue several objects from a ring (NOT multi-consumers safe). + */ +int odp_ring_sc_dequeue_bulk(odp_ring_t *r, void **obj_table, unsigned n) +{ + return __odp_ring_sc_do_dequeue(r, obj_table, n, ODP_RING_QUEUE_FIXED); +} + +/** + * Test if a ring is full. + */ +int odp_ring_full(const odp_ring_t *r) +{ + uint32_t prod_tail = r->prod.tail; + uint32_t cons_tail = r->cons.tail; + return (((cons_tail - prod_tail - 1) & r->prod.mask) == 0); +} + +/** + * Test if a ring is empty. + */ +int odp_ring_empty(const odp_ring_t *r) +{ + uint32_t prod_tail = r->prod.tail; + uint32_t cons_tail = r->cons.tail; + return !!(cons_tail == prod_tail); +} + +/** + * Return the number of entries in a ring. + */ +unsigned odp_ring_count(const odp_ring_t *r) +{ + uint32_t prod_tail = r->prod.tail; + uint32_t cons_tail = r->cons.tail; + return (prod_tail - cons_tail) & r->prod.mask; +} + +/** + * Return the number of free entries in a ring. + */ +unsigned odp_ring_free_count(const odp_ring_t *r) +{ + uint32_t prod_tail = r->prod.tail; + uint32_t cons_tail = r->cons.tail; + return (cons_tail - prod_tail - 1) & r->prod.mask; +} + +/* dump the status of the ring on the console */ +void odp_ring_dump(const odp_ring_t *r) +{ + ODP_DBG("ring <%s>@%p\n", r->name, r); + ODP_DBG(" flags=%x\n", r->flags); + ODP_DBG(" size=%"PRIu32"\n", r->prod.size); + ODP_DBG(" ct=%"PRIu32"\n", r->cons.tail); + ODP_DBG(" ch=%"PRIu32"\n", r->cons.head); + ODP_DBG(" pt=%"PRIu32"\n", r->prod.tail); + ODP_DBG(" ph=%"PRIu32"\n", r->prod.head); + ODP_DBG(" used=%u\n", odp_ring_count(r)); + ODP_DBG(" avail=%u\n", odp_ring_free_count(r)); + if (r->prod.watermark == r->prod.size) + ODP_DBG(" watermark=0\n"); + else + ODP_DBG(" watermark=%"PRIu32"\n", r->prod.watermark); +} + +/* dump the status of all rings on the console */ +void odp_ring_list_dump(void) +{ + const odp_ring_t *mp = NULL; + + odp_rwlock_read_lock(&qlock); + + TAILQ_FOREACH(mp, &odp_ring_list, next) { + odp_ring_dump(mp); + } + + odp_rwlock_read_unlock(&qlock); +} + +/* search a ring from its name */ +odp_ring_t *odp_ring_lookup(const char *name) +{ + odp_ring_t *r = odp_shm_lookup(name); + + odp_rwlock_read_lock(&qlock); + TAILQ_FOREACH(r, &odp_ring_list, next) { + if (strncmp(name, r->name, ODP_RING_NAMESIZE) == 0) + break; + } + odp_rwlock_read_unlock(&qlock); + + return r; +} + +/** + * Enqueue several objects on the ring (multi-producers safe). + */ +int odp_ring_mp_enqueue_burst(odp_ring_t *r, void * const *obj_table, + unsigned n) +{ + return __odp_ring_mp_do_enqueue(r, obj_table, n, + ODP_RING_QUEUE_VARIABLE); +} + +/** + * Enqueue several objects on a ring (NOT multi-producers safe). + */ +int odp_ring_sp_enqueue_burst(odp_ring_t *r, void * const *obj_table, + unsigned n) +{ + return __odp_ring_sp_do_enqueue(r, obj_table, n, + ODP_RING_QUEUE_VARIABLE); +} + +/** + * Enqueue several objects on a ring. + */ +int odp_ring_enqueue_burst(odp_ring_t *r, void * const *obj_table, + unsigned n) +{ + if (r->prod.sp_enqueue) + return odp_ring_sp_enqueue_burst(r, obj_table, n); + else + return odp_ring_mp_enqueue_burst(r, obj_table, n); +} + +/** + * Dequeue several objects from a ring (multi-consumers safe). + */ +int odp_ring_mc_dequeue_burst(odp_ring_t *r, void **obj_table, unsigned n) +{ + return __odp_ring_mc_do_dequeue(r, obj_table, n, + ODP_RING_QUEUE_VARIABLE); +} + +/** + * Dequeue several objects from a ring (NOT multi-consumers safe). + */ +int odp_ring_sc_dequeue_burst(odp_ring_t *r, void **obj_table, unsigned n) +{ + return __odp_ring_sc_do_dequeue(r, obj_table, n, + ODP_RING_QUEUE_VARIABLE); +} + +/** + * Dequeue multiple objects from a ring up to a maximum number. + */ +int odp_ring_dequeue_burst(odp_ring_t *r, void **obj_table, unsigned n) +{ + if (r->cons.sc_dequeue) + return odp_ring_sc_dequeue_burst(r, obj_table, n); + else + return odp_ring_mc_dequeue_burst(r, obj_table, n); +} |