diff options
author | Matias Elo <matias.elo@nokia.com> | 2023-01-16 17:02:12 +0200 |
---|---|---|
committer | Matias Elo <matias.elo@nokia.com> | 2023-02-03 13:05:05 +0200 |
commit | 9079409fcd9754ce0086c5967fb5a7c32183c583 (patch) | |
tree | 75d2f848d5e43d6b76c40ada7dda59bed2c10535 | |
parent | db873a539267da82d9ede8ba358a4f219fa43cbe (diff) |
linux-gen: ring: add 64-bit variant of mpmc ring
Add new 64-bit variant of implementation internal multi-producer multi-
consumer ring.
Signed-off-by: Matias Elo <matias.elo@nokia.com>
Reviewed-by: Tuomas Taipale <tuomas.taipale@nokia.com>
-rw-r--r-- | platform/linux-generic/Makefile.am | 2 | ||||
-rw-r--r-- | platform/linux-generic/include/odp_queue_basic_internal.h | 5 | ||||
-rw-r--r-- | platform/linux-generic/include/odp_ring_mpmc_internal.h | 115 | ||||
-rw-r--r-- | platform/linux-generic/include/odp_ring_mpmc_u32_internal.h | 25 | ||||
-rw-r--r-- | platform/linux-generic/include/odp_ring_mpmc_u64_internal.h | 25 | ||||
-rw-r--r-- | platform/linux-generic/odp_ipsec_sad.c | 38 | ||||
-rw-r--r-- | platform/linux-generic/odp_queue_basic.c | 22 |
7 files changed, 165 insertions, 67 deletions
diff --git a/platform/linux-generic/Makefile.am b/platform/linux-generic/Makefile.am index 6e6cfa5f6..d49a2138b 100644 --- a/platform/linux-generic/Makefile.am +++ b/platform/linux-generic/Makefile.am @@ -169,6 +169,8 @@ noinst_HEADERS = \ include/odp_ring_common.h \ include/odp_ring_internal.h \ include/odp_ring_mpmc_internal.h \ + include/odp_ring_mpmc_u32_internal.h \ + include/odp_ring_mpmc_u64_internal.h \ include/odp_ring_ptr_internal.h \ include/odp_ring_spsc_internal.h \ include/odp_ring_st_internal.h \ diff --git a/platform/linux-generic/include/odp_queue_basic_internal.h b/platform/linux-generic/include/odp_queue_basic_internal.h index 830f50a9d..3cdcf8600 100644 --- a/platform/linux-generic/include/odp_queue_basic_internal.h +++ b/platform/linux-generic/include/odp_queue_basic_internal.h @@ -1,4 +1,5 @@ /* Copyright (c) 2013-2018, Linaro Limited + * Copyright (c) 2023, Nokia * All rights reserved. * * SPDX-License-Identifier: BSD-3-Clause @@ -23,7 +24,7 @@ extern "C" { #include <odp/api/ticketlock.h> #include <odp_config_internal.h> #include <odp_macros_internal.h> -#include <odp_ring_mpmc_internal.h> +#include <odp_ring_mpmc_u32_internal.h> #include <odp_ring_st_internal.h> #include <odp_ring_spsc_internal.h> #include <odp_queue_lf.h> @@ -47,7 +48,7 @@ typedef struct ODP_ALIGNED_CACHE queue_entry_s { odp_queue_type_t type; /* MPMC ring (2 cache lines). */ - ring_mpmc_t ring_mpmc; + ring_mpmc_u32_t ring_mpmc; odp_ticketlock_t lock; union { diff --git a/platform/linux-generic/include/odp_ring_mpmc_internal.h b/platform/linux-generic/include/odp_ring_mpmc_internal.h index 6ed4dd4d1..37b7780b5 100644 --- a/platform/linux-generic/include/odp_ring_mpmc_internal.h +++ b/platform/linux-generic/include/odp_ring_mpmc_internal.h @@ -1,4 +1,5 @@ /* Copyright (c) 2018, Linaro Limited + * Copyright (c) 2023, Nokia * All rights reserved. * * SPDX-License-Identifier: BSD-3-Clause @@ -19,6 +20,8 @@ extern "C" { #include <odp/api/plat/atomic_inlines.h> #include <odp/api/plat/cpu_inlines.h> +#include <odp_ring_common.h> + /* Ring of uint32_t data * * Ring stores head and tail counters. Ring indexes are formed from these @@ -34,14 +37,22 @@ extern "C" { * r_tail r_head w_tail w_head * */ -typedef struct { + +struct ring_mpmc_common { odp_atomic_u32_t r_head ODP_ALIGNED_CACHE; odp_atomic_u32_t r_tail; odp_atomic_u32_t w_head ODP_ALIGNED_CACHE; odp_atomic_u32_t w_tail; +}; + +typedef struct ODP_ALIGNED_CACHE { + struct ring_mpmc_common r; +} ring_mpmc_u32_t; -} ring_mpmc_t; +typedef struct ODP_ALIGNED_CACHE { + struct ring_mpmc_common r; +} ring_mpmc_u64_t; static inline int ring_mpmc_cas_u32(odp_atomic_u32_t *atom, uint32_t *old_val, uint32_t new_val) @@ -52,21 +63,57 @@ static inline int ring_mpmc_cas_u32(odp_atomic_u32_t *atom, __ATOMIC_RELAXED); } +#endif /* End of include guards */ + +#undef _ring_mpmc_gen_t +#undef _ring_mpmc_data_t +#undef _RING_MPMC_INIT +#undef _RING_MPMC_DEQ_MULTI +#undef _RING_MPMC_ENQ_MULTI +#undef _RING_MPMC_IS_EMPTY +#undef _RING_MPMC_LEN + +/* This header should NOT be included directly. There are no include guards for + * the following types and function definitions! */ +#ifndef _ODP_RING_TYPE +#error Include type specific (u32/u64) ring header instead of this common file. +#endif + +#if _ODP_RING_TYPE == _ODP_RING_TYPE_U32 + #define _ring_mpmc_gen_t ring_mpmc_u32_t + #define _ring_mpmc_data_t uint32_t + + #define _RING_MPMC_INIT ring_mpmc_u32_init + #define _RING_MPMC_DEQ_MULTI ring_mpmc_u32_deq_multi + #define _RING_MPMC_ENQ_MULTI ring_mpmc_u32_enq_multi + #define _RING_MPMC_IS_EMPTY ring_mpmc_u32_is_empty + #define _RING_MPMC_LEN ring_mpmc_u32_len +#elif _ODP_RING_TYPE == _ODP_RING_TYPE_U64 + #define _ring_mpmc_gen_t ring_mpmc_u64_t + #define _ring_mpmc_data_t uint64_t + + #define _RING_MPMC_INIT ring_mpmc_u64_init + #define _RING_MPMC_DEQ_MULTI ring_mpmc_u64_deq_multi + #define _RING_MPMC_ENQ_MULTI ring_mpmc_u64_enq_multi + #define _RING_MPMC_IS_EMPTY ring_mpmc_u64_is_empty + #define _RING_MPMC_LEN ring_mpmc_u64_len +#endif + /* Initialize ring */ -static inline void ring_mpmc_init(ring_mpmc_t *ring) +static inline void _RING_MPMC_INIT(_ring_mpmc_gen_t *ring) { - odp_atomic_init_u32(&ring->w_head, 0); - odp_atomic_init_u32(&ring->w_tail, 0); - odp_atomic_init_u32(&ring->r_head, 0); - odp_atomic_init_u32(&ring->r_tail, 0); + odp_atomic_init_u32(&ring->r.w_head, 0); + odp_atomic_init_u32(&ring->r.w_tail, 0); + odp_atomic_init_u32(&ring->r.r_head, 0); + odp_atomic_init_u32(&ring->r.r_tail, 0); } /* Dequeue data from the ring head. Num is smaller than ring size. */ -static inline uint32_t ring_mpmc_deq_multi(ring_mpmc_t *ring, - uint32_t *ring_data, - uint32_t ring_mask, - uint32_t data[], - uint32_t num) +static inline uint32_t _RING_MPMC_DEQ_MULTI(_ring_mpmc_gen_t *ring, + _ring_mpmc_data_t *ring_data, + uint32_t ring_mask, + _ring_mpmc_data_t data[], + uint32_t num) { uint32_t old_head, new_head, w_tail, num_data, i; @@ -75,9 +122,9 @@ static inline uint32_t ring_mpmc_deq_multi(ring_mpmc_t *ring, * When CAS operation succeeds, this thread owns data between old * and new r_head. */ do { - old_head = odp_atomic_load_acq_u32(&ring->r_head); + old_head = odp_atomic_load_acq_u32(&ring->r.r_head); odp_prefetch(&ring_data[(old_head + 1) & ring_mask]); - w_tail = odp_atomic_load_acq_u32(&ring->w_tail); + w_tail = odp_atomic_load_acq_u32(&ring->r.w_tail); num_data = w_tail - old_head; /* Ring is empty */ @@ -90,7 +137,7 @@ static inline uint32_t ring_mpmc_deq_multi(ring_mpmc_t *ring, new_head = old_head + num; - } while (odp_unlikely(ring_mpmc_cas_u32(&ring->r_head, &old_head, + } while (odp_unlikely(ring_mpmc_cas_u32(&ring->r.r_head, &old_head, new_head) == 0)); /* Read data. This will not move above load acquire of r_head. */ @@ -98,21 +145,21 @@ static inline uint32_t ring_mpmc_deq_multi(ring_mpmc_t *ring, data[i] = ring_data[(old_head + 1 + i) & ring_mask]; /* Wait until other readers have updated the tail */ - while (odp_unlikely(odp_atomic_load_u32(&ring->r_tail) != old_head)) + while (odp_unlikely(odp_atomic_load_u32(&ring->r.r_tail) != old_head)) odp_cpu_pause(); /* Release the new reader tail, writers acquire it. */ - odp_atomic_store_rel_u32(&ring->r_tail, new_head); + odp_atomic_store_rel_u32(&ring->r.r_tail, new_head); return num; } /* Enqueue multiple data into the ring tail. Num is smaller than ring size. */ -static inline uint32_t ring_mpmc_enq_multi(ring_mpmc_t *ring, - uint32_t *ring_data, - uint32_t ring_mask, - const uint32_t data[], - uint32_t num) +static inline uint32_t _RING_MPMC_ENQ_MULTI(_ring_mpmc_gen_t *ring, + _ring_mpmc_data_t *ring_data, + uint32_t ring_mask, + const _ring_mpmc_data_t data[], + uint32_t num) { uint32_t old_head, new_head, r_tail, num_free, i; uint32_t size = ring_mask + 1; @@ -122,8 +169,8 @@ static inline uint32_t ring_mpmc_enq_multi(ring_mpmc_t *ring, * When CAS operation succeeds, this thread owns data between old * and new w_head. */ do { - r_tail = odp_atomic_load_acq_u32(&ring->r_tail); - old_head = odp_atomic_load_acq_u32(&ring->w_head); + r_tail = odp_atomic_load_acq_u32(&ring->r.r_tail); + old_head = odp_atomic_load_acq_u32(&ring->r.w_head); num_free = size - (old_head - r_tail); @@ -137,7 +184,7 @@ static inline uint32_t ring_mpmc_enq_multi(ring_mpmc_t *ring, new_head = old_head + num; - } while (odp_unlikely(ring_mpmc_cas_u32(&ring->w_head, &old_head, + } while (odp_unlikely(ring_mpmc_cas_u32(&ring->r.w_head, &old_head, new_head) == 0)); /* Write data. This will not move above load acquire of w_head. */ @@ -145,29 +192,29 @@ static inline uint32_t ring_mpmc_enq_multi(ring_mpmc_t *ring, ring_data[(old_head + 1 + i) & ring_mask] = data[i]; /* Wait until other writers have updated the tail */ - while (odp_unlikely(odp_atomic_load_u32(&ring->w_tail) != old_head)) + while (odp_unlikely(odp_atomic_load_u32(&ring->r.w_tail) != old_head)) odp_cpu_pause(); /* Release the new writer tail, readers acquire it. */ - odp_atomic_store_rel_u32(&ring->w_tail, new_head); + odp_atomic_store_rel_u32(&ring->r.w_tail, new_head); return num; } /* Check if ring is empty */ -static inline int ring_mpmc_is_empty(ring_mpmc_t *ring) +static inline int _RING_MPMC_IS_EMPTY(_ring_mpmc_gen_t *ring) { - uint32_t head = odp_atomic_load_u32(&ring->r_head); - uint32_t tail = odp_atomic_load_u32(&ring->w_tail); + uint32_t head = odp_atomic_load_u32(&ring->r.r_head); + uint32_t tail = odp_atomic_load_u32(&ring->r.w_tail); return head == tail; } /* Return current ring length */ -static inline uint32_t ring_mpmc_length(ring_mpmc_t *ring) +static inline uint32_t _RING_MPMC_LEN(_ring_mpmc_gen_t *ring) { - uint32_t head = odp_atomic_load_u32(&ring->r_head); - uint32_t tail = odp_atomic_load_u32(&ring->w_tail); + uint32_t head = odp_atomic_load_u32(&ring->r.r_head); + uint32_t tail = odp_atomic_load_u32(&ring->r.w_tail); return tail - head; } @@ -175,5 +222,3 @@ static inline uint32_t ring_mpmc_length(ring_mpmc_t *ring) #ifdef __cplusplus } #endif - -#endif diff --git a/platform/linux-generic/include/odp_ring_mpmc_u32_internal.h b/platform/linux-generic/include/odp_ring_mpmc_u32_internal.h new file mode 100644 index 000000000..4699b5b47 --- /dev/null +++ b/platform/linux-generic/include/odp_ring_mpmc_u32_internal.h @@ -0,0 +1,25 @@ +/* Copyright (c) 2023, Nokia + * All rights reserved. + * + * SPDX-License-Identifier: BSD-3-Clause + */ + +#ifndef ODP_RING_MPMC_U32_INTERNAL_H_ +#define ODP_RING_MPMC_U32_INTERNAL_H_ + +#ifdef __cplusplus +extern "C" { +#endif + +#include <odp_ring_common.h> + +#undef _ODP_RING_TYPE +#define _ODP_RING_TYPE _ODP_RING_TYPE_U32 + +#include <odp_ring_mpmc_internal.h> + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/platform/linux-generic/include/odp_ring_mpmc_u64_internal.h b/platform/linux-generic/include/odp_ring_mpmc_u64_internal.h new file mode 100644 index 000000000..e7bf31a94 --- /dev/null +++ b/platform/linux-generic/include/odp_ring_mpmc_u64_internal.h @@ -0,0 +1,25 @@ +/* Copyright (c) 2023, Nokia + * All rights reserved. + * + * SPDX-License-Identifier: BSD-3-Clause + */ + +#ifndef ODP_RING_MPMC_U64_INTERNAL_H_ +#define ODP_RING_MPMC_U64_INTERNAL_H_ + +#ifdef __cplusplus +extern "C" { +#endif + +#include <odp_ring_common.h> + +#undef _ODP_RING_TYPE +#define _ODP_RING_TYPE _ODP_RING_TYPE_U64 + +#include <odp_ring_mpmc_internal.h> + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/platform/linux-generic/odp_ipsec_sad.c b/platform/linux-generic/odp_ipsec_sad.c index 9bb89ad8b..4cf265694 100644 --- a/platform/linux-generic/odp_ipsec_sad.c +++ b/platform/linux-generic/odp_ipsec_sad.c @@ -1,5 +1,5 @@ /* Copyright (c) 2017-2018, Linaro Limited - * Copyright (c) 2018-2022, Nokia + * Copyright (c) 2018-2023, Nokia * All rights reserved. * * SPDX-License-Identifier: BSD-3-Clause @@ -19,7 +19,7 @@ #include <odp_debug_internal.h> #include <odp_ipsec_internal.h> #include <odp_macros_internal.h> -#include <odp_ring_mpmc_internal.h> +#include <odp_ring_mpmc_u32_internal.h> #include <odp_global_data.h> #include <string.h> @@ -103,7 +103,7 @@ typedef struct ODP_ALIGNED_CACHE ipsec_thread_local_s { typedef struct ipsec_sa_table_t { ipsec_sa_t ipsec_sa[CONFIG_IPSEC_MAX_NUM_SA]; struct ODP_ALIGNED_CACHE { - ring_mpmc_t ipv4_id_ring; + ring_mpmc_u32_t ipv4_id_ring; uint32_t ipv4_id_data[IPV4_ID_RING_SIZE] ODP_ALIGNED_CACHE; } hot; struct { @@ -193,7 +193,7 @@ int _odp_ipsec_sad_init_global(void) ipsec_sa_tbl->shm = shm; ipsec_sa_tbl->max_num_sa = max_num_sa; - ring_mpmc_init(&ipsec_sa_tbl->hot.ipv4_id_ring); + ring_mpmc_u32_init(&ipsec_sa_tbl->hot.ipv4_id_ring); for (i = 0; i < thread_count_max; i++) { /* * Make the current ID block fully used, forcing allocation @@ -211,11 +211,11 @@ int _odp_ipsec_sad_init_global(void) for (i = 0; i < IPV4_ID_RING_SIZE - 1; i++) { uint32_t data = i * IPV4_ID_BLOCK_SIZE; - ring_mpmc_enq_multi(&ipsec_sa_tbl->hot.ipv4_id_ring, - ipsec_sa_tbl->hot.ipv4_id_data, - IPV4_ID_RING_MASK, - &data, - 1); + ring_mpmc_u32_enq_multi(&ipsec_sa_tbl->hot.ipv4_id_ring, + ipsec_sa_tbl->hot.ipv4_id_data, + IPV4_ID_RING_MASK, + &data, + 1); } for (i = 0; i < ipsec_sa_tbl->max_num_sa; i++) { @@ -1119,17 +1119,17 @@ uint16_t _odp_ipsec_sa_alloc_ipv4_id(ipsec_sa_t *ipsec_sa) tl->first_ipv4_id + IPV4_ID_BLOCK_SIZE)) { /* Return used ID block to the ring */ data = tl->first_ipv4_id; - ring_mpmc_enq_multi(&ipsec_sa_tbl->hot.ipv4_id_ring, - ipsec_sa_tbl->hot.ipv4_id_data, - IPV4_ID_RING_MASK, - &data, - 1); + ring_mpmc_u32_enq_multi(&ipsec_sa_tbl->hot.ipv4_id_ring, + ipsec_sa_tbl->hot.ipv4_id_data, + IPV4_ID_RING_MASK, + &data, + 1); /* Get new ID block */ - ring_mpmc_deq_multi(&ipsec_sa_tbl->hot.ipv4_id_ring, - ipsec_sa_tbl->hot.ipv4_id_data, - IPV4_ID_RING_MASK, - &data, - 1); + ring_mpmc_u32_deq_multi(&ipsec_sa_tbl->hot.ipv4_id_ring, + ipsec_sa_tbl->hot.ipv4_id_data, + IPV4_ID_RING_MASK, + &data, + 1); tl->first_ipv4_id = data; tl->next_ipv4_id = data; } diff --git a/platform/linux-generic/odp_queue_basic.c b/platform/linux-generic/odp_queue_basic.c index eebfb92ab..83694f84f 100644 --- a/platform/linux-generic/odp_queue_basic.c +++ b/platform/linux-generic/odp_queue_basic.c @@ -1,5 +1,5 @@ /* Copyright (c) 2013-2018, Linaro Limited - * Copyright (c) 2021-2022, Nokia + * Copyright (c) 2021-2023, Nokia * All rights reserved. * * SPDX-License-Identifier: BSD-3-Clause @@ -408,7 +408,7 @@ static int queue_destroy(odp_queue_t handle) else if (queue->type == ODP_QUEUE_TYPE_SCHED) empty = ring_st_is_empty(&queue->ring_st); else - empty = ring_mpmc_is_empty(&queue->ring_mpmc); + empty = ring_mpmc_u32_is_empty(&queue->ring_mpmc); if (!empty) { UNLOCK(queue); @@ -497,7 +497,7 @@ static inline int _plain_queue_enq_multi(odp_queue_t handle, { queue_entry_t *queue; int ret, num_enq; - ring_mpmc_t *ring_mpmc; + ring_mpmc_u32_t *ring_mpmc; uint32_t event_idx[num]; queue = qentry_from_handle(handle); @@ -508,8 +508,8 @@ static inline int _plain_queue_enq_multi(odp_queue_t handle, event_index_from_hdr(event_idx, event_hdr, num); - num_enq = ring_mpmc_enq_multi(ring_mpmc, queue->ring_data, - queue->ring_mask, event_idx, num); + num_enq = ring_mpmc_u32_enq_multi(ring_mpmc, queue->ring_data, + queue->ring_mask, event_idx, num); return num_enq; } @@ -519,14 +519,14 @@ static inline int _plain_queue_deq_multi(odp_queue_t handle, { int num_deq; queue_entry_t *queue; - ring_mpmc_t *ring_mpmc; + ring_mpmc_u32_t *ring_mpmc; uint32_t event_idx[num]; queue = qentry_from_handle(handle); ring_mpmc = &queue->ring_mpmc; - num_deq = ring_mpmc_deq_multi(ring_mpmc, queue->ring_data, - queue->ring_mask, event_idx, num); + num_deq = ring_mpmc_u32_deq_multi(ring_mpmc, queue->ring_data, + queue->ring_mask, event_idx, num); if (num_deq == 0) return 0; @@ -754,7 +754,7 @@ static void queue_print(odp_queue_t handle) } else { _ODP_PRINT(" implementation ring_mpmc\n"); _ODP_PRINT(" length %" PRIu32 "/%" PRIu32 "\n", - ring_mpmc_length(&queue->ring_mpmc), queue->ring_mask + 1); + ring_mpmc_u32_len(&queue->ring_mpmc), queue->ring_mask + 1); } _ODP_PRINT("\n"); @@ -820,7 +820,7 @@ static void queue_print_all(void) if (_odp_sched_id == _ODP_SCHED_ID_BASIC) spr = _odp_sched_basic_get_spread(index); } else { - len = ring_mpmc_length(&queue->ring_mpmc); + len = ring_mpmc_u32_len(&queue->ring_mpmc); max_len = queue->ring_mask + 1; } @@ -1073,7 +1073,7 @@ static int queue_init(queue_entry_t *queue, const char *name, queue->ring_data = &_odp_queue_glb->ring_data[offset]; queue->ring_mask = queue_size - 1; - ring_mpmc_init(&queue->ring_mpmc); + ring_mpmc_u32_init(&queue->ring_mpmc); } else { queue->enqueue = sched_queue_enq; |