diff options
Diffstat (limited to 'platform/linux-generic/include/odp_ring_internal.h')
-rw-r--r-- | platform/linux-generic/include/odp_ring_internal.h | 45 |
1 files changed, 44 insertions, 1 deletions
diff --git a/platform/linux-generic/include/odp_ring_internal.h b/platform/linux-generic/include/odp_ring_internal.h index dcd190d07..296a87116 100644 --- a/platform/linux-generic/include/odp_ring_internal.h +++ b/platform/linux-generic/include/odp_ring_internal.h @@ -1,5 +1,5 @@ /* Copyright (c) 2016-2018, Linaro Limited - * Copyright (c) 2019-2021, Nokia + * Copyright (c) 2019-2022, Nokia * All rights reserved. * * SPDX-License-Identifier: BSD-3-Clause @@ -80,6 +80,7 @@ static inline int cas_mo_u32(odp_atomic_u32_t *atom, uint32_t *old_val, #undef _RING_INIT #undef _RING_DEQ #undef _RING_DEQ_MULTI +#undef _RING_DEQ_BATCH #undef _RING_ENQ #undef _RING_ENQ_MULTI #undef _RING_LEN @@ -94,6 +95,7 @@ static inline int cas_mo_u32(odp_atomic_u32_t *atom, uint32_t *old_val, #define _RING_INIT ring_u32_init #define _RING_DEQ ring_u32_deq #define _RING_DEQ_MULTI ring_u32_deq_multi + #define _RING_DEQ_BATCH ring_u32_deq_batch #define _RING_ENQ ring_u32_enq #define _RING_ENQ_MULTI ring_u32_enq_multi #define _RING_LEN ring_u32_len @@ -104,6 +106,7 @@ static inline int cas_mo_u32(odp_atomic_u32_t *atom, uint32_t *old_val, #define _RING_INIT ring_u64_init #define _RING_DEQ ring_u64_deq #define _RING_DEQ_MULTI ring_u64_deq_multi + #define _RING_DEQ_BATCH ring_u64_deq_batch #define _RING_ENQ ring_u64_enq #define _RING_ENQ_MULTI ring_u64_enq_multi #define _RING_LEN ring_u64_len @@ -114,6 +117,7 @@ static inline int cas_mo_u32(odp_atomic_u32_t *atom, uint32_t *old_val, #define _RING_INIT ring_ptr_init #define _RING_DEQ ring_ptr_deq #define _RING_DEQ_MULTI ring_ptr_deq_multi + #define _RING_DEQ_BATCH ring_ptr_deq_batch #define _RING_ENQ ring_ptr_enq #define _RING_ENQ_MULTI ring_ptr_enq_multi #define _RING_LEN ring_ptr_len @@ -208,6 +212,45 @@ static inline uint32_t _RING_DEQ_MULTI(_ring_gen_t *ring, uint32_t mask, return num; } +/* Dequeue batch of data (0 or num) from the ring head. Num is smaller than ring size. */ +static inline uint32_t _RING_DEQ_BATCH(_ring_gen_t *ring, uint32_t mask, + _ring_data_t data[], uint32_t num) +{ + uint32_t head, tail, new_head, i; + + /* Load/CAS acquire of r_head ensures that w_tail load happens after + * r_head load, and thus head value is always behind or equal to tail + * value. */ + head = odp_atomic_load_acq_u32(&ring->r.r_head); + + /* Move reader head. This thread owns data at the new head. */ + do { + tail = odp_atomic_load_acq_u32(&ring->r.w_tail); + + /* Not enough data available */ + if ((tail - head) < num) + return 0; + + new_head = head + num; + + } while (odp_unlikely(cas_mo_u32(&ring->r.r_head, &head, new_head, + __ATOMIC_ACQUIRE, + __ATOMIC_ACQUIRE) == 0)); + + /* Read data. */ + for (i = 0; i < num; i++) + data[i] = ring->data[(head + 1 + i) & mask]; + + /* Wait until other readers have updated the tail */ + while (odp_unlikely(odp_atomic_load_u32(&ring->r.r_tail) != head)) + odp_cpu_pause(); + + /* Update the tail. Writers acquire it. */ + odp_atomic_store_rel_u32(&ring->r.r_tail, new_head); + + return num; +} + /* Enqueue data into the ring tail */ static inline void _RING_ENQ(_ring_gen_t *ring, uint32_t mask, _ring_data_t data) |