aboutsummaryrefslogtreecommitdiff
path: root/platform
diff options
context:
space:
mode:
authorMatias Elo <matias.elo@nokia.com>2023-01-19 09:30:25 +0200
committerMatias Elo <matias.elo@nokia.com>2023-02-03 13:05:05 +0200
commitbba00b595276b556564fa05ce1dff51f29417103 (patch)
treec43b38b821b8325b1992c8a9d338edf5c3c5cf74 /platform
parent9079409fcd9754ce0086c5967fb5a7c32183c583 (diff)
linux-gen: ring: add batch enqueue and dequeue operations to mpmc ring
Add support for batch enqueue/dequeue operations to MPMC ring (read/write 0 or N objects). Unnecessary comments about 'num' parameter being smaller than ring size have been removed. Signed-off-by: Matias Elo <matias.elo@nokia.com> Reviewed-by: Tuomas Taipale <tuomas.taipale@nokia.com>
Diffstat (limited to 'platform')
-rw-r--r--platform/linux-generic/include/odp_ring_mpmc_internal.h95
1 files changed, 93 insertions, 2 deletions
diff --git a/platform/linux-generic/include/odp_ring_mpmc_internal.h b/platform/linux-generic/include/odp_ring_mpmc_internal.h
index 37b7780b5..e35179267 100644
--- a/platform/linux-generic/include/odp_ring_mpmc_internal.h
+++ b/platform/linux-generic/include/odp_ring_mpmc_internal.h
@@ -70,6 +70,8 @@ static inline int ring_mpmc_cas_u32(odp_atomic_u32_t *atom,
#undef _RING_MPMC_INIT
#undef _RING_MPMC_DEQ_MULTI
#undef _RING_MPMC_ENQ_MULTI
+#undef _RING_MPMC_DEQ_BATCH
+#undef _RING_MPMC_ENQ_BATCH
#undef _RING_MPMC_IS_EMPTY
#undef _RING_MPMC_LEN
@@ -86,6 +88,8 @@ static inline int ring_mpmc_cas_u32(odp_atomic_u32_t *atom,
#define _RING_MPMC_INIT ring_mpmc_u32_init
#define _RING_MPMC_DEQ_MULTI ring_mpmc_u32_deq_multi
#define _RING_MPMC_ENQ_MULTI ring_mpmc_u32_enq_multi
+ #define _RING_MPMC_DEQ_BATCH ring_mpmc_u32_deq_batch
+ #define _RING_MPMC_ENQ_BATCH ring_mpmc_u32_enq_batch
#define _RING_MPMC_IS_EMPTY ring_mpmc_u32_is_empty
#define _RING_MPMC_LEN ring_mpmc_u32_len
#elif _ODP_RING_TYPE == _ODP_RING_TYPE_U64
@@ -95,6 +99,8 @@ static inline int ring_mpmc_cas_u32(odp_atomic_u32_t *atom,
#define _RING_MPMC_INIT ring_mpmc_u64_init
#define _RING_MPMC_DEQ_MULTI ring_mpmc_u64_deq_multi
#define _RING_MPMC_ENQ_MULTI ring_mpmc_u64_enq_multi
+ #define _RING_MPMC_DEQ_BATCH ring_mpmc_u64_deq_batch
+ #define _RING_MPMC_ENQ_BATCH ring_mpmc_u64_enq_batch
#define _RING_MPMC_IS_EMPTY ring_mpmc_u64_is_empty
#define _RING_MPMC_LEN ring_mpmc_u64_len
#endif
@@ -108,7 +114,7 @@ static inline void _RING_MPMC_INIT(_ring_mpmc_gen_t *ring)
odp_atomic_init_u32(&ring->r.r_tail, 0);
}
-/* Dequeue data from the ring head. Num is smaller than ring size. */
+/* Dequeue data from the ring head */
static inline uint32_t _RING_MPMC_DEQ_MULTI(_ring_mpmc_gen_t *ring,
_ring_mpmc_data_t *ring_data,
uint32_t ring_mask,
@@ -154,7 +160,49 @@ static inline uint32_t _RING_MPMC_DEQ_MULTI(_ring_mpmc_gen_t *ring,
return num;
}
-/* Enqueue multiple data into the ring tail. Num is smaller than ring size. */
+/* Dequeue num or 0 data from the ring head */
+static inline uint32_t _RING_MPMC_DEQ_BATCH(_ring_mpmc_gen_t *ring,
+ _ring_mpmc_data_t *ring_data,
+ uint32_t ring_mask,
+ _ring_mpmc_data_t data[],
+ uint32_t num)
+{
+ uint32_t old_head, new_head, w_tail, num_data, i;
+
+ /* Load acquires ensure that w_tail load happens after r_head load,
+ * and thus r_head value is always behind or equal to w_tail value.
+ * When CAS operation succeeds, this thread owns data between old
+ * and new r_head. */
+ do {
+ old_head = odp_atomic_load_acq_u32(&ring->r.r_head);
+ odp_prefetch(&ring_data[(old_head + 1) & ring_mask]);
+ w_tail = odp_atomic_load_acq_u32(&ring->r.w_tail);
+ num_data = w_tail - old_head;
+
+ /* Not enough data available */
+ if (num_data < num)
+ return 0;
+
+ new_head = old_head + num;
+
+ } while (odp_unlikely(ring_mpmc_cas_u32(&ring->r.r_head, &old_head,
+ new_head) == 0));
+
+ /* Read data. This will not move above load acquire of r_head. */
+ for (i = 0; i < num; i++)
+ data[i] = ring_data[(old_head + 1 + i) & ring_mask];
+
+ /* Wait until other readers have updated the tail */
+ while (odp_unlikely(odp_atomic_load_u32(&ring->r.r_tail) != old_head))
+ odp_cpu_pause();
+
+ /* Release the new reader tail, writers acquire it. */
+ odp_atomic_store_rel_u32(&ring->r.r_tail, new_head);
+
+ return num;
+}
+
+/* Enqueue multiple data into the ring tail */
static inline uint32_t _RING_MPMC_ENQ_MULTI(_ring_mpmc_gen_t *ring,
_ring_mpmc_data_t *ring_data,
uint32_t ring_mask,
@@ -201,6 +249,49 @@ static inline uint32_t _RING_MPMC_ENQ_MULTI(_ring_mpmc_gen_t *ring,
return num;
}
+/* Enqueue num or 0 data into the ring tail */
+static inline uint32_t _RING_MPMC_ENQ_BATCH(_ring_mpmc_gen_t *ring,
+ _ring_mpmc_data_t *ring_data,
+ uint32_t ring_mask,
+ const _ring_mpmc_data_t data[],
+ uint32_t num)
+{
+ uint32_t old_head, new_head, r_tail, num_free, i;
+ uint32_t size = ring_mask + 1;
+
+ /* Load acquires ensure that w_head load happens after r_tail load,
+ * and thus r_tail value is always behind or equal to w_head value.
+ * When CAS operation succeeds, this thread owns data between old
+ * and new w_head. */
+ do {
+ r_tail = odp_atomic_load_acq_u32(&ring->r.r_tail);
+ old_head = odp_atomic_load_acq_u32(&ring->r.w_head);
+
+ num_free = size - (old_head - r_tail);
+
+ /* Not enough free space available */
+ if (num_free < num)
+ return 0;
+
+ new_head = old_head + num;
+
+ } while (odp_unlikely(ring_mpmc_cas_u32(&ring->r.w_head, &old_head,
+ new_head) == 0));
+
+ /* Write data. This will not move above load acquire of w_head. */
+ for (i = 0; i < num; i++)
+ ring_data[(old_head + 1 + i) & ring_mask] = data[i];
+
+ /* Wait until other writers have updated the tail */
+ while (odp_unlikely(odp_atomic_load_u32(&ring->r.w_tail) != old_head))
+ odp_cpu_pause();
+
+ /* Release the new writer tail, readers acquire it. */
+ odp_atomic_store_rel_u32(&ring->r.w_tail, new_head);
+
+ return num;
+}
+
/* Check if ring is empty */
static inline int _RING_MPMC_IS_EMPTY(_ring_mpmc_gen_t *ring)
{