diff options
-rw-r--r-- | platform/linux-generic/Makefile.am | 1 | ||||
-rw-r--r-- | platform/linux-generic/include/odp_ring_spsc_internal.h | 124 |
2 files changed, 125 insertions, 0 deletions
diff --git a/platform/linux-generic/Makefile.am b/platform/linux-generic/Makefile.am index 6ccc284f1..eca2ad17c 100644 --- a/platform/linux-generic/Makefile.am +++ b/platform/linux-generic/Makefile.am @@ -127,6 +127,7 @@ noinst_HEADERS = \ include/odp_queue_lf.h \ include/odp_queue_scalable_internal.h \ include/odp_ring_internal.h \ + include/odp_ring_spsc_internal.h \ include/odp_ring_st_internal.h \ include/odp_schedule_if.h \ include/odp_schedule_scalable_config.h \ diff --git a/platform/linux-generic/include/odp_ring_spsc_internal.h b/platform/linux-generic/include/odp_ring_spsc_internal.h new file mode 100644 index 000000000..e38bda1da --- /dev/null +++ b/platform/linux-generic/include/odp_ring_spsc_internal.h @@ -0,0 +1,124 @@ +/* Copyright (c) 2018, Linaro Limited + * All rights reserved. + * + * SPDX-License-Identifier: BSD-3-Clause + */ + +#ifndef ODP_RING_SPSC_INTERNAL_H_ +#define ODP_RING_SPSC_INTERNAL_H_ + +#ifdef __cplusplus +extern "C" { +#endif + +#include <stdint.h> + +#include <odp/api/atomic.h> +#include <odp/api/plat/atomic_inlines.h> + +/* Lock-free ring for single-producer / single-consumer usage. + * + * Thread doing an operation may be different each time, but the same operation + * (enq- or dequeue) must not be called concurrently. The next thread may call + * the same operation only when it's sure that the previous thread have returned + * from the call, or will never return back to finish the call when interrupted + * during the call. + * + * Enqueue and dequeue operations can be done concurrently. + */ +typedef struct { + odp_atomic_u32_t head; + odp_atomic_u32_t tail; + uint32_t mask; + uint32_t *data; + +} ring_spsc_t; + +/* Initialize ring. Ring size must be a power of two. */ +static inline void ring_spsc_init(ring_spsc_t *ring, uint32_t *data, + uint32_t size) +{ + odp_atomic_init_u32(&ring->head, 0); + odp_atomic_init_u32(&ring->tail, 0); + ring->mask = size - 1; + ring->data = data; +} + +/* Dequeue data from the ring head. Max_num is smaller than ring size.*/ +static inline uint32_t ring_spsc_deq_multi(ring_spsc_t *ring, uint32_t data[], + uint32_t max_num) +{ + uint32_t head, tail, mask, idx; + uint32_t num, i; + + tail = odp_atomic_load_acq_u32(&ring->tail); + head = odp_atomic_load_u32(&ring->head); + mask = ring->mask; + num = tail - head; + + /* Empty */ + if (num == 0) + return 0; + + if (num > max_num) + num = max_num; + + idx = head & mask; + + for (i = 0; i < num; i++) { + data[i] = ring->data[idx]; + idx = (idx + 1) & mask; + } + + odp_atomic_store_rel_u32(&ring->head, head + num); + + return num; +} + +/* Enqueue data into the ring tail. Num_data is smaller than ring size. */ +static inline uint32_t ring_spsc_enq_multi(ring_spsc_t *ring, + const uint32_t data[], + uint32_t num_data) +{ + uint32_t head, tail, mask, size, idx; + uint32_t num, i; + + head = odp_atomic_load_acq_u32(&ring->head); + tail = odp_atomic_load_u32(&ring->tail); + mask = ring->mask; + size = mask + 1; + num = size - (tail - head); + + /* Full */ + if (num == 0) + return 0; + + if (num > num_data) + num = num_data; + + idx = tail & mask; + + for (i = 0; i < num; i++) { + ring->data[idx] = data[i]; + idx = (idx + 1) & mask; + } + + odp_atomic_store_rel_u32(&ring->tail, tail + num); + + return num; +} + +/* Check if ring is empty */ +static inline int ring_spsc_is_empty(ring_spsc_t *ring) +{ + uint32_t head = odp_atomic_load_u32(&ring->head); + uint32_t tail = odp_atomic_load_u32(&ring->tail); + + return head == tail; +} + +#ifdef __cplusplus +} +#endif + +#endif |