aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatias Elo <matias.elo@nokia.com>2023-01-16 17:02:12 +0200
committerMatias Elo <matias.elo@nokia.com>2023-02-03 13:05:05 +0200
commit9079409fcd9754ce0086c5967fb5a7c32183c583 (patch)
tree75d2f848d5e43d6b76c40ada7dda59bed2c10535
parentdb873a539267da82d9ede8ba358a4f219fa43cbe (diff)
linux-gen: ring: add 64-bit variant of mpmc ring
Add new 64-bit variant of implementation internal multi-producer multi- consumer ring. Signed-off-by: Matias Elo <matias.elo@nokia.com> Reviewed-by: Tuomas Taipale <tuomas.taipale@nokia.com>
-rw-r--r--platform/linux-generic/Makefile.am2
-rw-r--r--platform/linux-generic/include/odp_queue_basic_internal.h5
-rw-r--r--platform/linux-generic/include/odp_ring_mpmc_internal.h115
-rw-r--r--platform/linux-generic/include/odp_ring_mpmc_u32_internal.h25
-rw-r--r--platform/linux-generic/include/odp_ring_mpmc_u64_internal.h25
-rw-r--r--platform/linux-generic/odp_ipsec_sad.c38
-rw-r--r--platform/linux-generic/odp_queue_basic.c22
7 files changed, 165 insertions, 67 deletions
diff --git a/platform/linux-generic/Makefile.am b/platform/linux-generic/Makefile.am
index 6e6cfa5f6..d49a2138b 100644
--- a/platform/linux-generic/Makefile.am
+++ b/platform/linux-generic/Makefile.am
@@ -169,6 +169,8 @@ noinst_HEADERS = \
include/odp_ring_common.h \
include/odp_ring_internal.h \
include/odp_ring_mpmc_internal.h \
+ include/odp_ring_mpmc_u32_internal.h \
+ include/odp_ring_mpmc_u64_internal.h \
include/odp_ring_ptr_internal.h \
include/odp_ring_spsc_internal.h \
include/odp_ring_st_internal.h \
diff --git a/platform/linux-generic/include/odp_queue_basic_internal.h b/platform/linux-generic/include/odp_queue_basic_internal.h
index 830f50a9d..3cdcf8600 100644
--- a/platform/linux-generic/include/odp_queue_basic_internal.h
+++ b/platform/linux-generic/include/odp_queue_basic_internal.h
@@ -1,4 +1,5 @@
/* Copyright (c) 2013-2018, Linaro Limited
+ * Copyright (c) 2023, Nokia
* All rights reserved.
*
* SPDX-License-Identifier: BSD-3-Clause
@@ -23,7 +24,7 @@ extern "C" {
#include <odp/api/ticketlock.h>
#include <odp_config_internal.h>
#include <odp_macros_internal.h>
-#include <odp_ring_mpmc_internal.h>
+#include <odp_ring_mpmc_u32_internal.h>
#include <odp_ring_st_internal.h>
#include <odp_ring_spsc_internal.h>
#include <odp_queue_lf.h>
@@ -47,7 +48,7 @@ typedef struct ODP_ALIGNED_CACHE queue_entry_s {
odp_queue_type_t type;
/* MPMC ring (2 cache lines). */
- ring_mpmc_t ring_mpmc;
+ ring_mpmc_u32_t ring_mpmc;
odp_ticketlock_t lock;
union {
diff --git a/platform/linux-generic/include/odp_ring_mpmc_internal.h b/platform/linux-generic/include/odp_ring_mpmc_internal.h
index 6ed4dd4d1..37b7780b5 100644
--- a/platform/linux-generic/include/odp_ring_mpmc_internal.h
+++ b/platform/linux-generic/include/odp_ring_mpmc_internal.h
@@ -1,4 +1,5 @@
/* Copyright (c) 2018, Linaro Limited
+ * Copyright (c) 2023, Nokia
* All rights reserved.
*
* SPDX-License-Identifier: BSD-3-Clause
@@ -19,6 +20,8 @@ extern "C" {
#include <odp/api/plat/atomic_inlines.h>
#include <odp/api/plat/cpu_inlines.h>
+#include <odp_ring_common.h>
+
/* Ring of uint32_t data
*
* Ring stores head and tail counters. Ring indexes are formed from these
@@ -34,14 +37,22 @@ extern "C" {
* r_tail r_head w_tail w_head
*
*/
-typedef struct {
+
+struct ring_mpmc_common {
odp_atomic_u32_t r_head ODP_ALIGNED_CACHE;
odp_atomic_u32_t r_tail;
odp_atomic_u32_t w_head ODP_ALIGNED_CACHE;
odp_atomic_u32_t w_tail;
+};
+
+typedef struct ODP_ALIGNED_CACHE {
+ struct ring_mpmc_common r;
+} ring_mpmc_u32_t;
-} ring_mpmc_t;
+typedef struct ODP_ALIGNED_CACHE {
+ struct ring_mpmc_common r;
+} ring_mpmc_u64_t;
static inline int ring_mpmc_cas_u32(odp_atomic_u32_t *atom,
uint32_t *old_val, uint32_t new_val)
@@ -52,21 +63,57 @@ static inline int ring_mpmc_cas_u32(odp_atomic_u32_t *atom,
__ATOMIC_RELAXED);
}
+#endif /* End of include guards */
+
+#undef _ring_mpmc_gen_t
+#undef _ring_mpmc_data_t
+#undef _RING_MPMC_INIT
+#undef _RING_MPMC_DEQ_MULTI
+#undef _RING_MPMC_ENQ_MULTI
+#undef _RING_MPMC_IS_EMPTY
+#undef _RING_MPMC_LEN
+
+/* This header should NOT be included directly. There are no include guards for
+ * the following types and function definitions! */
+#ifndef _ODP_RING_TYPE
+#error Include type specific (u32/u64) ring header instead of this common file.
+#endif
+
+#if _ODP_RING_TYPE == _ODP_RING_TYPE_U32
+ #define _ring_mpmc_gen_t ring_mpmc_u32_t
+ #define _ring_mpmc_data_t uint32_t
+
+ #define _RING_MPMC_INIT ring_mpmc_u32_init
+ #define _RING_MPMC_DEQ_MULTI ring_mpmc_u32_deq_multi
+ #define _RING_MPMC_ENQ_MULTI ring_mpmc_u32_enq_multi
+ #define _RING_MPMC_IS_EMPTY ring_mpmc_u32_is_empty
+ #define _RING_MPMC_LEN ring_mpmc_u32_len
+#elif _ODP_RING_TYPE == _ODP_RING_TYPE_U64
+ #define _ring_mpmc_gen_t ring_mpmc_u64_t
+ #define _ring_mpmc_data_t uint64_t
+
+ #define _RING_MPMC_INIT ring_mpmc_u64_init
+ #define _RING_MPMC_DEQ_MULTI ring_mpmc_u64_deq_multi
+ #define _RING_MPMC_ENQ_MULTI ring_mpmc_u64_enq_multi
+ #define _RING_MPMC_IS_EMPTY ring_mpmc_u64_is_empty
+ #define _RING_MPMC_LEN ring_mpmc_u64_len
+#endif
+
/* Initialize ring */
-static inline void ring_mpmc_init(ring_mpmc_t *ring)
+static inline void _RING_MPMC_INIT(_ring_mpmc_gen_t *ring)
{
- odp_atomic_init_u32(&ring->w_head, 0);
- odp_atomic_init_u32(&ring->w_tail, 0);
- odp_atomic_init_u32(&ring->r_head, 0);
- odp_atomic_init_u32(&ring->r_tail, 0);
+ odp_atomic_init_u32(&ring->r.w_head, 0);
+ odp_atomic_init_u32(&ring->r.w_tail, 0);
+ odp_atomic_init_u32(&ring->r.r_head, 0);
+ odp_atomic_init_u32(&ring->r.r_tail, 0);
}
/* Dequeue data from the ring head. Num is smaller than ring size. */
-static inline uint32_t ring_mpmc_deq_multi(ring_mpmc_t *ring,
- uint32_t *ring_data,
- uint32_t ring_mask,
- uint32_t data[],
- uint32_t num)
+static inline uint32_t _RING_MPMC_DEQ_MULTI(_ring_mpmc_gen_t *ring,
+ _ring_mpmc_data_t *ring_data,
+ uint32_t ring_mask,
+ _ring_mpmc_data_t data[],
+ uint32_t num)
{
uint32_t old_head, new_head, w_tail, num_data, i;
@@ -75,9 +122,9 @@ static inline uint32_t ring_mpmc_deq_multi(ring_mpmc_t *ring,
* When CAS operation succeeds, this thread owns data between old
* and new r_head. */
do {
- old_head = odp_atomic_load_acq_u32(&ring->r_head);
+ old_head = odp_atomic_load_acq_u32(&ring->r.r_head);
odp_prefetch(&ring_data[(old_head + 1) & ring_mask]);
- w_tail = odp_atomic_load_acq_u32(&ring->w_tail);
+ w_tail = odp_atomic_load_acq_u32(&ring->r.w_tail);
num_data = w_tail - old_head;
/* Ring is empty */
@@ -90,7 +137,7 @@ static inline uint32_t ring_mpmc_deq_multi(ring_mpmc_t *ring,
new_head = old_head + num;
- } while (odp_unlikely(ring_mpmc_cas_u32(&ring->r_head, &old_head,
+ } while (odp_unlikely(ring_mpmc_cas_u32(&ring->r.r_head, &old_head,
new_head) == 0));
/* Read data. This will not move above load acquire of r_head. */
@@ -98,21 +145,21 @@ static inline uint32_t ring_mpmc_deq_multi(ring_mpmc_t *ring,
data[i] = ring_data[(old_head + 1 + i) & ring_mask];
/* Wait until other readers have updated the tail */
- while (odp_unlikely(odp_atomic_load_u32(&ring->r_tail) != old_head))
+ while (odp_unlikely(odp_atomic_load_u32(&ring->r.r_tail) != old_head))
odp_cpu_pause();
/* Release the new reader tail, writers acquire it. */
- odp_atomic_store_rel_u32(&ring->r_tail, new_head);
+ odp_atomic_store_rel_u32(&ring->r.r_tail, new_head);
return num;
}
/* Enqueue multiple data into the ring tail. Num is smaller than ring size. */
-static inline uint32_t ring_mpmc_enq_multi(ring_mpmc_t *ring,
- uint32_t *ring_data,
- uint32_t ring_mask,
- const uint32_t data[],
- uint32_t num)
+static inline uint32_t _RING_MPMC_ENQ_MULTI(_ring_mpmc_gen_t *ring,
+ _ring_mpmc_data_t *ring_data,
+ uint32_t ring_mask,
+ const _ring_mpmc_data_t data[],
+ uint32_t num)
{
uint32_t old_head, new_head, r_tail, num_free, i;
uint32_t size = ring_mask + 1;
@@ -122,8 +169,8 @@ static inline uint32_t ring_mpmc_enq_multi(ring_mpmc_t *ring,
* When CAS operation succeeds, this thread owns data between old
* and new w_head. */
do {
- r_tail = odp_atomic_load_acq_u32(&ring->r_tail);
- old_head = odp_atomic_load_acq_u32(&ring->w_head);
+ r_tail = odp_atomic_load_acq_u32(&ring->r.r_tail);
+ old_head = odp_atomic_load_acq_u32(&ring->r.w_head);
num_free = size - (old_head - r_tail);
@@ -137,7 +184,7 @@ static inline uint32_t ring_mpmc_enq_multi(ring_mpmc_t *ring,
new_head = old_head + num;
- } while (odp_unlikely(ring_mpmc_cas_u32(&ring->w_head, &old_head,
+ } while (odp_unlikely(ring_mpmc_cas_u32(&ring->r.w_head, &old_head,
new_head) == 0));
/* Write data. This will not move above load acquire of w_head. */
@@ -145,29 +192,29 @@ static inline uint32_t ring_mpmc_enq_multi(ring_mpmc_t *ring,
ring_data[(old_head + 1 + i) & ring_mask] = data[i];
/* Wait until other writers have updated the tail */
- while (odp_unlikely(odp_atomic_load_u32(&ring->w_tail) != old_head))
+ while (odp_unlikely(odp_atomic_load_u32(&ring->r.w_tail) != old_head))
odp_cpu_pause();
/* Release the new writer tail, readers acquire it. */
- odp_atomic_store_rel_u32(&ring->w_tail, new_head);
+ odp_atomic_store_rel_u32(&ring->r.w_tail, new_head);
return num;
}
/* Check if ring is empty */
-static inline int ring_mpmc_is_empty(ring_mpmc_t *ring)
+static inline int _RING_MPMC_IS_EMPTY(_ring_mpmc_gen_t *ring)
{
- uint32_t head = odp_atomic_load_u32(&ring->r_head);
- uint32_t tail = odp_atomic_load_u32(&ring->w_tail);
+ uint32_t head = odp_atomic_load_u32(&ring->r.r_head);
+ uint32_t tail = odp_atomic_load_u32(&ring->r.w_tail);
return head == tail;
}
/* Return current ring length */
-static inline uint32_t ring_mpmc_length(ring_mpmc_t *ring)
+static inline uint32_t _RING_MPMC_LEN(_ring_mpmc_gen_t *ring)
{
- uint32_t head = odp_atomic_load_u32(&ring->r_head);
- uint32_t tail = odp_atomic_load_u32(&ring->w_tail);
+ uint32_t head = odp_atomic_load_u32(&ring->r.r_head);
+ uint32_t tail = odp_atomic_load_u32(&ring->r.w_tail);
return tail - head;
}
@@ -175,5 +222,3 @@ static inline uint32_t ring_mpmc_length(ring_mpmc_t *ring)
#ifdef __cplusplus
}
#endif
-
-#endif
diff --git a/platform/linux-generic/include/odp_ring_mpmc_u32_internal.h b/platform/linux-generic/include/odp_ring_mpmc_u32_internal.h
new file mode 100644
index 000000000..4699b5b47
--- /dev/null
+++ b/platform/linux-generic/include/odp_ring_mpmc_u32_internal.h
@@ -0,0 +1,25 @@
+/* Copyright (c) 2023, Nokia
+ * All rights reserved.
+ *
+ * SPDX-License-Identifier: BSD-3-Clause
+ */
+
+#ifndef ODP_RING_MPMC_U32_INTERNAL_H_
+#define ODP_RING_MPMC_U32_INTERNAL_H_
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include <odp_ring_common.h>
+
+#undef _ODP_RING_TYPE
+#define _ODP_RING_TYPE _ODP_RING_TYPE_U32
+
+#include <odp_ring_mpmc_internal.h>
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif
diff --git a/platform/linux-generic/include/odp_ring_mpmc_u64_internal.h b/platform/linux-generic/include/odp_ring_mpmc_u64_internal.h
new file mode 100644
index 000000000..e7bf31a94
--- /dev/null
+++ b/platform/linux-generic/include/odp_ring_mpmc_u64_internal.h
@@ -0,0 +1,25 @@
+/* Copyright (c) 2023, Nokia
+ * All rights reserved.
+ *
+ * SPDX-License-Identifier: BSD-3-Clause
+ */
+
+#ifndef ODP_RING_MPMC_U64_INTERNAL_H_
+#define ODP_RING_MPMC_U64_INTERNAL_H_
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include <odp_ring_common.h>
+
+#undef _ODP_RING_TYPE
+#define _ODP_RING_TYPE _ODP_RING_TYPE_U64
+
+#include <odp_ring_mpmc_internal.h>
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif
diff --git a/platform/linux-generic/odp_ipsec_sad.c b/platform/linux-generic/odp_ipsec_sad.c
index 9bb89ad8b..4cf265694 100644
--- a/platform/linux-generic/odp_ipsec_sad.c
+++ b/platform/linux-generic/odp_ipsec_sad.c
@@ -1,5 +1,5 @@
/* Copyright (c) 2017-2018, Linaro Limited
- * Copyright (c) 2018-2022, Nokia
+ * Copyright (c) 2018-2023, Nokia
* All rights reserved.
*
* SPDX-License-Identifier: BSD-3-Clause
@@ -19,7 +19,7 @@
#include <odp_debug_internal.h>
#include <odp_ipsec_internal.h>
#include <odp_macros_internal.h>
-#include <odp_ring_mpmc_internal.h>
+#include <odp_ring_mpmc_u32_internal.h>
#include <odp_global_data.h>
#include <string.h>
@@ -103,7 +103,7 @@ typedef struct ODP_ALIGNED_CACHE ipsec_thread_local_s {
typedef struct ipsec_sa_table_t {
ipsec_sa_t ipsec_sa[CONFIG_IPSEC_MAX_NUM_SA];
struct ODP_ALIGNED_CACHE {
- ring_mpmc_t ipv4_id_ring;
+ ring_mpmc_u32_t ipv4_id_ring;
uint32_t ipv4_id_data[IPV4_ID_RING_SIZE] ODP_ALIGNED_CACHE;
} hot;
struct {
@@ -193,7 +193,7 @@ int _odp_ipsec_sad_init_global(void)
ipsec_sa_tbl->shm = shm;
ipsec_sa_tbl->max_num_sa = max_num_sa;
- ring_mpmc_init(&ipsec_sa_tbl->hot.ipv4_id_ring);
+ ring_mpmc_u32_init(&ipsec_sa_tbl->hot.ipv4_id_ring);
for (i = 0; i < thread_count_max; i++) {
/*
* Make the current ID block fully used, forcing allocation
@@ -211,11 +211,11 @@ int _odp_ipsec_sad_init_global(void)
for (i = 0; i < IPV4_ID_RING_SIZE - 1; i++) {
uint32_t data = i * IPV4_ID_BLOCK_SIZE;
- ring_mpmc_enq_multi(&ipsec_sa_tbl->hot.ipv4_id_ring,
- ipsec_sa_tbl->hot.ipv4_id_data,
- IPV4_ID_RING_MASK,
- &data,
- 1);
+ ring_mpmc_u32_enq_multi(&ipsec_sa_tbl->hot.ipv4_id_ring,
+ ipsec_sa_tbl->hot.ipv4_id_data,
+ IPV4_ID_RING_MASK,
+ &data,
+ 1);
}
for (i = 0; i < ipsec_sa_tbl->max_num_sa; i++) {
@@ -1119,17 +1119,17 @@ uint16_t _odp_ipsec_sa_alloc_ipv4_id(ipsec_sa_t *ipsec_sa)
tl->first_ipv4_id + IPV4_ID_BLOCK_SIZE)) {
/* Return used ID block to the ring */
data = tl->first_ipv4_id;
- ring_mpmc_enq_multi(&ipsec_sa_tbl->hot.ipv4_id_ring,
- ipsec_sa_tbl->hot.ipv4_id_data,
- IPV4_ID_RING_MASK,
- &data,
- 1);
+ ring_mpmc_u32_enq_multi(&ipsec_sa_tbl->hot.ipv4_id_ring,
+ ipsec_sa_tbl->hot.ipv4_id_data,
+ IPV4_ID_RING_MASK,
+ &data,
+ 1);
/* Get new ID block */
- ring_mpmc_deq_multi(&ipsec_sa_tbl->hot.ipv4_id_ring,
- ipsec_sa_tbl->hot.ipv4_id_data,
- IPV4_ID_RING_MASK,
- &data,
- 1);
+ ring_mpmc_u32_deq_multi(&ipsec_sa_tbl->hot.ipv4_id_ring,
+ ipsec_sa_tbl->hot.ipv4_id_data,
+ IPV4_ID_RING_MASK,
+ &data,
+ 1);
tl->first_ipv4_id = data;
tl->next_ipv4_id = data;
}
diff --git a/platform/linux-generic/odp_queue_basic.c b/platform/linux-generic/odp_queue_basic.c
index eebfb92ab..83694f84f 100644
--- a/platform/linux-generic/odp_queue_basic.c
+++ b/platform/linux-generic/odp_queue_basic.c
@@ -1,5 +1,5 @@
/* Copyright (c) 2013-2018, Linaro Limited
- * Copyright (c) 2021-2022, Nokia
+ * Copyright (c) 2021-2023, Nokia
* All rights reserved.
*
* SPDX-License-Identifier: BSD-3-Clause
@@ -408,7 +408,7 @@ static int queue_destroy(odp_queue_t handle)
else if (queue->type == ODP_QUEUE_TYPE_SCHED)
empty = ring_st_is_empty(&queue->ring_st);
else
- empty = ring_mpmc_is_empty(&queue->ring_mpmc);
+ empty = ring_mpmc_u32_is_empty(&queue->ring_mpmc);
if (!empty) {
UNLOCK(queue);
@@ -497,7 +497,7 @@ static inline int _plain_queue_enq_multi(odp_queue_t handle,
{
queue_entry_t *queue;
int ret, num_enq;
- ring_mpmc_t *ring_mpmc;
+ ring_mpmc_u32_t *ring_mpmc;
uint32_t event_idx[num];
queue = qentry_from_handle(handle);
@@ -508,8 +508,8 @@ static inline int _plain_queue_enq_multi(odp_queue_t handle,
event_index_from_hdr(event_idx, event_hdr, num);
- num_enq = ring_mpmc_enq_multi(ring_mpmc, queue->ring_data,
- queue->ring_mask, event_idx, num);
+ num_enq = ring_mpmc_u32_enq_multi(ring_mpmc, queue->ring_data,
+ queue->ring_mask, event_idx, num);
return num_enq;
}
@@ -519,14 +519,14 @@ static inline int _plain_queue_deq_multi(odp_queue_t handle,
{
int num_deq;
queue_entry_t *queue;
- ring_mpmc_t *ring_mpmc;
+ ring_mpmc_u32_t *ring_mpmc;
uint32_t event_idx[num];
queue = qentry_from_handle(handle);
ring_mpmc = &queue->ring_mpmc;
- num_deq = ring_mpmc_deq_multi(ring_mpmc, queue->ring_data,
- queue->ring_mask, event_idx, num);
+ num_deq = ring_mpmc_u32_deq_multi(ring_mpmc, queue->ring_data,
+ queue->ring_mask, event_idx, num);
if (num_deq == 0)
return 0;
@@ -754,7 +754,7 @@ static void queue_print(odp_queue_t handle)
} else {
_ODP_PRINT(" implementation ring_mpmc\n");
_ODP_PRINT(" length %" PRIu32 "/%" PRIu32 "\n",
- ring_mpmc_length(&queue->ring_mpmc), queue->ring_mask + 1);
+ ring_mpmc_u32_len(&queue->ring_mpmc), queue->ring_mask + 1);
}
_ODP_PRINT("\n");
@@ -820,7 +820,7 @@ static void queue_print_all(void)
if (_odp_sched_id == _ODP_SCHED_ID_BASIC)
spr = _odp_sched_basic_get_spread(index);
} else {
- len = ring_mpmc_length(&queue->ring_mpmc);
+ len = ring_mpmc_u32_len(&queue->ring_mpmc);
max_len = queue->ring_mask + 1;
}
@@ -1073,7 +1073,7 @@ static int queue_init(queue_entry_t *queue, const char *name,
queue->ring_data = &_odp_queue_glb->ring_data[offset];
queue->ring_mask = queue_size - 1;
- ring_mpmc_init(&queue->ring_mpmc);
+ ring_mpmc_u32_init(&queue->ring_mpmc);
} else {
queue->enqueue = sched_queue_enq;