diff options
Diffstat (limited to 'platform/linux-generic/odp_stash.c')
-rw-r--r-- | platform/linux-generic/odp_stash.c | 665 |
1 files changed, 452 insertions, 213 deletions
diff --git a/platform/linux-generic/odp_stash.c b/platform/linux-generic/odp_stash.c index c7d4136ab..5ff499843 100644 --- a/platform/linux-generic/odp_stash.c +++ b/platform/linux-generic/odp_stash.c @@ -1,9 +1,10 @@ -/* Copyright (c) 2020-2022, Nokia +/* Copyright (c) 2020-2023, Nokia * All rights reserved. * * SPDX-License-Identifier: BSD-3-Clause */ +#include <odp/api/align.h> #include <odp/api/shared_memory.h> #include <odp/api/stash.h> #include <odp/api/std_types.h> @@ -15,7 +16,10 @@ #include <odp_debug_internal.h> #include <odp_global_data.h> #include <odp_init_internal.h> +#include <odp_libconfig_internal.h> #include <odp_macros_internal.h> +#include <odp_ring_mpmc_u32_internal.h> +#include <odp_ring_mpmc_u64_internal.h> #include <odp_ring_u32_internal.h> #include <odp_ring_u64_internal.h> @@ -26,18 +30,61 @@ ODP_STATIC_ASSERT(CONFIG_INTERNAL_STASHES < CONFIG_MAX_STASHES, "TOO_MANY_INTERNAL_STASHES"); -#define MAX_RING_SIZE (1024 * 1024) #define MIN_RING_SIZE 64 +enum { + STASH_FREE = 0, + STASH_RESERVED, + STASH_ACTIVE +}; + +typedef struct stash_t stash_t; + +typedef void (*ring_u32_init_fn_t)(stash_t *stash); +typedef int32_t (*ring_u32_enq_multi_fn_t)(stash_t *stash, const uint32_t val[], int32_t num); +typedef int32_t (*ring_u32_enq_batch_fn_t)(stash_t *stash, const uint32_t val[], int32_t num); +typedef int32_t (*ring_u32_deq_multi_fn_t)(stash_t *stash, uint32_t val[], int32_t num); +typedef int32_t (*ring_u32_deq_batch_fn_t)(stash_t *stash, uint32_t val[], int32_t num); +typedef int32_t (*ring_u32_len_fn_t)(stash_t *stash); + +typedef void (*ring_u64_init_fn_t)(stash_t *stash); +typedef int32_t (*ring_u64_enq_multi_fn_t)(stash_t *stash, const uint64_t val[], int32_t num); +typedef int32_t (*ring_u64_enq_batch_fn_t)(stash_t *stash, const uint64_t val[], int32_t num); +typedef int32_t (*ring_u64_deq_multi_fn_t)(stash_t *stash, uint64_t val[], int32_t num); +typedef int32_t (*ring_u64_deq_batch_fn_t)(stash_t *stash, uint64_t val[], int32_t num); +typedef int32_t (*ring_u64_len_fn_t)(stash_t *stash); + #pragma GCC diagnostic push #pragma GCC diagnostic ignored "-Wpedantic" -typedef struct stash_t { - char name[ODP_STASH_NAME_LEN]; - odp_shm_t shm; - int index; +typedef struct ODP_ALIGNED_CACHE stash_t { + /* Ring functions */ + union { + struct { + ring_u32_enq_multi_fn_t enq_multi; + ring_u32_enq_batch_fn_t enq_batch; + ring_u32_deq_multi_fn_t deq_multi; + ring_u32_deq_batch_fn_t deq_batch; + ring_u32_init_fn_t init; + ring_u32_len_fn_t len; + } u32; + + struct { + ring_u64_enq_multi_fn_t enq_multi; + ring_u64_enq_batch_fn_t enq_batch; + ring_u64_deq_multi_fn_t deq_multi; + ring_u64_deq_batch_fn_t deq_batch; + ring_u64_init_fn_t init; + ring_u64_len_fn_t len; + } u64; + } ring_fn; + uint32_t ring_mask; + uint32_t ring_size; uint32_t obj_size; + char name[ODP_STASH_NAME_LEN]; + int index; + /* Ring header followed by variable sized data (object handles) */ union { struct ODP_ALIGNED_CACHE { @@ -49,6 +96,16 @@ typedef struct stash_t { ring_u64_t hdr; uint64_t data[]; } ring_u64; + + struct ODP_ALIGNED_CACHE { + ring_mpmc_u32_t hdr; + uint32_t data[]; + } ring_mpmc_u32; + + struct ODP_ALIGNED_CACHE { + ring_mpmc_u64_t hdr; + uint64_t data[]; + } ring_mpmc_u64; }; } stash_t; @@ -57,23 +114,92 @@ typedef struct stash_t { typedef struct stash_global_t { odp_ticketlock_t lock; odp_shm_t shm; - uint8_t stash_reserved[CONFIG_MAX_STASHES]; + uint32_t max_num; + uint32_t max_num_obj; + uint32_t num_internal; + uint8_t strict_size; + uint8_t stash_state[CONFIG_MAX_STASHES]; stash_t *stash[CONFIG_MAX_STASHES]; + uint8_t data[] ODP_ALIGNED_CACHE; } stash_global_t; static stash_global_t *stash_global; +static inline stash_t *stash_entry(odp_stash_t st) +{ + return (stash_t *)(uintptr_t)st; +} + +static inline odp_stash_t stash_handle(stash_t *stash) +{ + return (odp_stash_t)(uintptr_t)stash; +} + int _odp_stash_init_global(void) { odp_shm_t shm; - - if (odp_global_ro.disable.stash) { + uint32_t max_num, max_num_obj; + const char *str; + uint64_t ring_max_size, stash_max_size, stash_data_size, offset; + const uint32_t internal_stashes = odp_global_ro.disable.dma ? 0 : CONFIG_INTERNAL_STASHES; + uint8_t *stash_data; + uint8_t strict_size; + int val = 0; + + if (odp_global_ro.disable.stash && odp_global_ro.disable.dma) { _ODP_PRINT("Stash is DISABLED\n"); return 0; } - shm = odp_shm_reserve("_odp_stash_global", sizeof(stash_global_t), + _ODP_PRINT("Stash config:\n"); + + str = "stash.max_num"; + if (!_odp_libconfig_lookup_int(str, &val)) { + _ODP_ERR("Config option '%s' not found.\n", str); + return -1; + } + _ODP_PRINT(" %s: %i\n", str, val); + max_num = val; + + str = "stash.max_num_obj"; + if (!_odp_libconfig_lookup_int(str, &val)) { + _ODP_ERR("Config option '%s' not found.\n", str); + return -1; + } + _ODP_PRINT(" %s: %i\n", str, val); + max_num_obj = val; + + str = "stash.strict_size"; + if (!_odp_libconfig_lookup_int(str, &val)) { + _ODP_ERR("Config option '%s' not found.\n", str); + return -1; + } + _ODP_PRINT(" %s: %i\n", str, val); + strict_size = !!val; + + _ODP_PRINT("\n"); + + /* Reserve resources for implementation internal stashes */ + if (max_num > CONFIG_MAX_STASHES - internal_stashes) { + _ODP_ERR("Maximum supported number of stashes: %d\n", + CONFIG_MAX_STASHES - internal_stashes); + return -1; + } + max_num += internal_stashes; + + /* Must have room for minimum sized ring */ + if (max_num_obj < MIN_RING_SIZE) + max_num_obj = MIN_RING_SIZE - 1; + + /* Ring size must be larger than the number of items stored */ + ring_max_size = _ODP_ROUNDUP_POWER2_U32(max_num_obj + 1); + + stash_max_size = _ODP_ROUNDUP_CACHE_LINE(sizeof(stash_t) + + (ring_max_size * sizeof(uint64_t))); + stash_data_size = max_num * stash_max_size; + + shm = odp_shm_reserve("_odp_stash_global", sizeof(stash_global_t) + stash_data_size, ODP_CACHE_LINE_SIZE, 0); stash_global = odp_shm_addr(shm); @@ -85,8 +211,21 @@ int _odp_stash_init_global(void) memset(stash_global, 0, sizeof(stash_global_t)); stash_global->shm = shm; + stash_global->max_num = max_num; + stash_global->max_num_obj = max_num_obj; + stash_global->strict_size = strict_size; + stash_global->num_internal = internal_stashes; odp_ticketlock_init(&stash_global->lock); + /* Initialize stash pointers */ + stash_data = stash_global->data; + offset = 0; + + for (uint32_t i = 0; i < max_num; i++) { + stash_global->stash[i] = (stash_t *)(uintptr_t)(stash_data + offset); + offset += stash_max_size; + } + return 0; } @@ -108,17 +247,21 @@ int _odp_stash_term_global(void) int odp_stash_capability(odp_stash_capability_t *capa, odp_stash_type_t type) { + uint32_t max_stashes; + if (odp_global_ro.disable.stash) { _ODP_ERR("Stash is disabled\n"); return -1; } (void)type; + max_stashes = stash_global->max_num - stash_global->num_internal; + memset(capa, 0, sizeof(odp_stash_capability_t)); - capa->max_stashes_any_type = CONFIG_MAX_STASHES - CONFIG_INTERNAL_STASHES; - capa->max_stashes = CONFIG_MAX_STASHES - CONFIG_INTERNAL_STASHES; - capa->max_num_obj = MAX_RING_SIZE; + capa->max_stashes_any_type = max_stashes; + capa->max_stashes = max_stashes; + capa->max_num_obj = stash_global->max_num_obj; capa->max_obj_size = sizeof(uint64_t); capa->max_get_batch = MIN_RING_SIZE; capa->max_put_batch = MIN_RING_SIZE; @@ -137,15 +280,14 @@ void odp_stash_param_init(odp_stash_param_t *param) static int reserve_index(void) { - int i; int index = -1; odp_ticketlock_lock(&stash_global->lock); - for (i = 0; i < CONFIG_MAX_STASHES; i++) { - if (stash_global->stash_reserved[i] == 0) { + for (uint32_t i = 0; i < stash_global->max_num; i++) { + if (stash_global->stash_state[i] == STASH_FREE) { index = i; - stash_global->stash_reserved[i] = 1; + stash_global->stash_state[i] = STASH_RESERVED; break; } } @@ -159,20 +301,152 @@ static void free_index(int i) { odp_ticketlock_lock(&stash_global->lock); - stash_global->stash[i] = NULL; - stash_global->stash_reserved[i] = 0; + stash_global->stash_state[i] = STASH_FREE; odp_ticketlock_unlock(&stash_global->lock); } +static inline void strict_ring_u32_init(stash_t *stash) +{ + ring_u32_init(&stash->ring_u32.hdr); + + for (uint32_t i = 0; i < stash->ring_size; i++) + stash->ring_u32.data[i] = 0; +} + +static inline void strict_ring_u64_init(stash_t *stash) +{ + ring_u64_init(&stash->ring_u64.hdr); + + for (uint32_t i = 0; i < stash->ring_size; i++) + stash->ring_u64.data[i] = 0; +} + +static inline int32_t strict_ring_u32_enq_multi(stash_t *stash, const uint32_t val[], int32_t num) +{ + /* Success always */ + ring_u32_enq_multi(&stash->ring_u32.hdr, stash->ring_mask, (uint32_t *)(uintptr_t)val, num); + + return num; +} + +static inline int32_t strict_ring_u64_enq_multi(stash_t *stash, const uint64_t val[], int32_t num) +{ + /* Success always */ + ring_u64_enq_multi(&stash->ring_u64.hdr, stash->ring_mask, (uint64_t *)(uintptr_t)val, num); + + return num; +} + +static inline int32_t strict_ring_u32_deq_multi(stash_t *stash, uint32_t val[], int32_t num) +{ + return ring_u32_deq_multi(&stash->ring_u32.hdr, stash->ring_mask, val, num); +} + +static inline int32_t strict_ring_u64_deq_multi(stash_t *stash, uint64_t val[], int32_t num) +{ + return ring_u64_deq_multi(&stash->ring_u64.hdr, stash->ring_mask, val, num); +} + +static inline int32_t strict_ring_u32_deq_batch(stash_t *stash, uint32_t val[], int32_t num) +{ + return ring_u32_deq_batch(&stash->ring_u32.hdr, stash->ring_mask, val, num); +} + +static inline int32_t strict_ring_u64_deq_batch(stash_t *stash, uint64_t val[], int32_t num) +{ + return ring_u64_deq_batch(&stash->ring_u64.hdr, stash->ring_mask, val, num); +} + +static inline int32_t strict_ring_u32_len(stash_t *stash) +{ + return ring_u32_len(&stash->ring_u32.hdr); +} + +static inline int32_t strict_ring_u64_len(stash_t *stash) +{ + return ring_u64_len(&stash->ring_u64.hdr); +} + +static inline void mpmc_ring_u32_init(stash_t *stash) +{ + ring_mpmc_u32_init(&stash->ring_mpmc_u32.hdr); + + for (uint32_t i = 0; i < stash->ring_size; i++) + stash->ring_mpmc_u32.data[i] = 0; +} + +static inline void mpmc_ring_u64_init(stash_t *stash) +{ + ring_mpmc_u64_init(&stash->ring_mpmc_u64.hdr); + + for (uint32_t i = 0; i < stash->ring_size; i++) + stash->ring_mpmc_u64.data[i] = 0; +} + +static inline int32_t mpmc_ring_u32_enq_multi(stash_t *stash, const uint32_t val[], int32_t num) +{ + return ring_mpmc_u32_enq_multi(&stash->ring_mpmc_u32.hdr, stash->ring_mpmc_u32.data, + stash->ring_mask, val, num); +} + +static inline int32_t mpmc_ring_u64_enq_multi(stash_t *stash, const uint64_t val[], int32_t num) +{ + return ring_mpmc_u64_enq_multi(&stash->ring_mpmc_u64.hdr, stash->ring_mpmc_u64.data, + stash->ring_mask, val, num); +} + +static inline int32_t mpmc_ring_u32_enq_batch(stash_t *stash, const uint32_t val[], int32_t num) +{ + return ring_mpmc_u32_enq_batch(&stash->ring_mpmc_u32.hdr, stash->ring_mpmc_u32.data, + stash->ring_mask, val, num); +} + +static inline int32_t mpmc_ring_u64_enq_batch(stash_t *stash, const uint64_t val[], int32_t num) +{ + return ring_mpmc_u64_enq_batch(&stash->ring_mpmc_u64.hdr, stash->ring_mpmc_u64.data, + stash->ring_mask, val, num); +} + +static inline int32_t mpmc_ring_u32_deq_multi(stash_t *stash, uint32_t val[], int32_t num) +{ + return ring_mpmc_u32_deq_multi(&stash->ring_mpmc_u32.hdr, stash->ring_mpmc_u32.data, + stash->ring_mask, val, num); +} + +static inline int32_t mpmc_ring_u64_deq_multi(stash_t *stash, uint64_t val[], int32_t num) +{ + return ring_mpmc_u64_deq_multi(&stash->ring_mpmc_u64.hdr, stash->ring_mpmc_u64.data, + stash->ring_mask, val, num); +} + +static inline int32_t mpmc_ring_u32_deq_batch(stash_t *stash, uint32_t val[], int32_t num) +{ + return ring_mpmc_u32_deq_batch(&stash->ring_mpmc_u32.hdr, stash->ring_mpmc_u32.data, + stash->ring_mask, val, num); +} + +static inline int32_t mpmc_ring_u64_deq_batch(stash_t *stash, uint64_t val[], int32_t num) +{ + return ring_mpmc_u64_deq_batch(&stash->ring_mpmc_u64.hdr, stash->ring_mpmc_u64.data, + stash->ring_mask, val, num); +} + +static inline int32_t mpmc_ring_u32_len(stash_t *stash) +{ + return ring_mpmc_u32_len(&stash->ring_mpmc_u32.hdr); +} + +static inline int32_t mpmc_ring_u64_len(stash_t *stash) +{ + return ring_mpmc_u64_len(&stash->ring_mpmc_u64.hdr); +} + odp_stash_t odp_stash_create(const char *name, const odp_stash_param_t *param) { - odp_shm_t shm; stash_t *stash; - uint64_t i, ring_size, shm_size; + uint64_t ring_size; int ring_u64, index; - char shm_name[ODP_STASH_NAME_LEN + 8]; - uint32_t shm_flags = 0; if (odp_global_ro.disable.stash) { _ODP_ERR("Stash is disabled\n"); @@ -184,7 +458,7 @@ odp_stash_t odp_stash_create(const char *name, const odp_stash_param_t *param) return ODP_STASH_INVALID; } - if (param->num_obj > MAX_RING_SIZE) { + if (param->num_obj > stash_global->max_num_obj) { _ODP_ERR("Too many objects.\n"); return ODP_STASH_INVALID; } @@ -213,75 +487,71 @@ odp_stash_t odp_stash_create(const char *name, const odp_stash_param_t *param) else ring_size = _ODP_ROUNDUP_POWER2_U32(ring_size + 1); - memset(shm_name, 0, sizeof(shm_name)); - snprintf(shm_name, sizeof(shm_name) - 1, "_stash_%s", name); - - if (ring_u64) - shm_size = sizeof(stash_t) + (ring_size * sizeof(uint64_t)); - else - shm_size = sizeof(stash_t) + (ring_size * sizeof(uint32_t)); - - if (odp_global_ro.shm_single_va) - shm_flags |= ODP_SHM_SINGLE_VA; - - shm = odp_shm_reserve(shm_name, shm_size, ODP_CACHE_LINE_SIZE, shm_flags); - - if (shm == ODP_SHM_INVALID) { - _ODP_ERR("SHM reserve failed.\n"); - free_index(index); - return ODP_STASH_INVALID; - } - - stash = odp_shm_addr(shm); + stash = stash_global->stash[index]; memset(stash, 0, sizeof(stash_t)); - if (ring_u64) { - ring_u64_init(&stash->ring_u64.hdr); - - for (i = 0; i < ring_size; i++) - stash->ring_u64.data[i] = 0; + /* Set ring function pointers */ + if (stash_global->strict_size) { + if (ring_u64) { + stash->ring_fn.u64.init = strict_ring_u64_init; + stash->ring_fn.u64.enq_multi = strict_ring_u64_enq_multi; + stash->ring_fn.u64.enq_batch = strict_ring_u64_enq_multi; + stash->ring_fn.u64.deq_multi = strict_ring_u64_deq_multi; + stash->ring_fn.u64.deq_batch = strict_ring_u64_deq_batch; + stash->ring_fn.u64.len = strict_ring_u64_len; + } else { + stash->ring_fn.u32.init = strict_ring_u32_init; + stash->ring_fn.u32.enq_multi = strict_ring_u32_enq_multi; + stash->ring_fn.u32.enq_batch = strict_ring_u32_enq_multi; + stash->ring_fn.u32.deq_multi = strict_ring_u32_deq_multi; + stash->ring_fn.u32.deq_batch = strict_ring_u32_deq_batch; + stash->ring_fn.u32.len = strict_ring_u32_len; + } } else { - ring_u32_init(&stash->ring_u32.hdr); - - for (i = 0; i < ring_size; i++) - stash->ring_u32.data[i] = 0; + if (ring_u64) { + stash->ring_fn.u64.init = mpmc_ring_u64_init; + stash->ring_fn.u64.enq_multi = mpmc_ring_u64_enq_multi; + stash->ring_fn.u64.enq_batch = mpmc_ring_u64_enq_batch; + stash->ring_fn.u64.deq_multi = mpmc_ring_u64_deq_multi; + stash->ring_fn.u64.deq_batch = mpmc_ring_u64_deq_batch; + stash->ring_fn.u64.len = mpmc_ring_u64_len; + } else { + stash->ring_fn.u32.init = mpmc_ring_u32_init; + stash->ring_fn.u32.enq_multi = mpmc_ring_u32_enq_multi; + stash->ring_fn.u32.enq_batch = mpmc_ring_u32_enq_batch; + stash->ring_fn.u32.deq_multi = mpmc_ring_u32_deq_multi; + stash->ring_fn.u32.deq_batch = mpmc_ring_u32_deq_batch; + stash->ring_fn.u32.len = mpmc_ring_u32_len; + } } if (name) strcpy(stash->name, name); stash->index = index; - stash->shm = shm; stash->obj_size = param->obj_size; stash->ring_mask = ring_size - 1; + stash->ring_size = ring_size; + + if (ring_u64) + stash->ring_fn.u64.init(stash); + else + stash->ring_fn.u32.init(stash); /* This makes stash visible to lookups */ odp_ticketlock_lock(&stash_global->lock); - stash_global->stash[index] = stash; + stash_global->stash_state[index] = STASH_ACTIVE; odp_ticketlock_unlock(&stash_global->lock); - return (odp_stash_t)stash; + return stash_handle(stash); } int odp_stash_destroy(odp_stash_t st) { - stash_t *stash; - int index; - odp_shm_t shm; - if (st == ODP_STASH_INVALID) return -1; - stash = (stash_t *)(uintptr_t)st; - index = stash->index; - shm = stash->shm; - - free_index(index); - - if (odp_shm_free(shm)) { - _ODP_ERR("SHM free failed.\n"); - return -1; - } + free_index(stash_entry(st)->index); return 0; } @@ -293,7 +563,6 @@ uint64_t odp_stash_to_u64(odp_stash_t st) odp_stash_t odp_stash_lookup(const char *name) { - int i; stash_t *stash; if (name == NULL) @@ -301,12 +570,13 @@ odp_stash_t odp_stash_lookup(const char *name) odp_ticketlock_lock(&stash_global->lock); - for (i = 0; i < CONFIG_MAX_STASHES; i++) { + for (uint32_t i = 0; i < stash_global->max_num; i++) { stash = stash_global->stash[i]; - if (stash && strcmp(stash->name, name) == 0) { + if (stash_global->stash_state[i] == STASH_ACTIVE && + strcmp(stash->name, name) == 0) { odp_ticketlock_unlock(&stash_global->lock); - return (odp_stash_t)stash; + return stash_handle(stash); } } @@ -315,57 +585,51 @@ odp_stash_t odp_stash_lookup(const char *name) return ODP_STASH_INVALID; } -static inline int32_t stash_put(odp_stash_t st, const void *obj, int32_t num) +static inline int32_t stash_put(odp_stash_t st, const void *obj, int32_t num, odp_bool_t is_batch) { - stash_t *stash; + int32_t (*ring_u32_enq)(stash_t *stash, const uint32_t val[], int32_t num); + int32_t (*ring_u64_enq)(stash_t *stash, const uint64_t val[], int32_t num); + stash_t *stash = stash_entry(st); uint32_t obj_size; int32_t i; - stash = (stash_t *)(uintptr_t)st; - if (odp_unlikely(st == ODP_STASH_INVALID)) return -1; - obj_size = stash->obj_size; - - if (obj_size == sizeof(uint64_t)) { - ring_u64_t *ring_u64 = &stash->ring_u64.hdr; - - ring_u64_enq_multi(ring_u64, stash->ring_mask, - (uint64_t *)(uintptr_t)obj, num); - return num; + if (is_batch) { + ring_u32_enq = stash->ring_fn.u32.enq_batch; + ring_u64_enq = stash->ring_fn.u64.enq_batch; + } else { + ring_u32_enq = stash->ring_fn.u32.enq_multi; + ring_u64_enq = stash->ring_fn.u64.enq_multi; } - if (obj_size == sizeof(uint32_t)) { - ring_u32_t *ring_u32 = &stash->ring_u32.hdr; + obj_size = stash->obj_size; - ring_u32_enq_multi(ring_u32, stash->ring_mask, - (uint32_t *)(uintptr_t)obj, num); - return num; - } + if (obj_size == sizeof(uint64_t)) + return ring_u64_enq(stash, (uint64_t *)(uintptr_t)obj, num); + + if (obj_size == sizeof(uint32_t)) + return ring_u32_enq(stash, (uint32_t *)(uintptr_t)obj, num); if (obj_size == sizeof(uint16_t)) { const uint16_t *u16_ptr = obj; - ring_u32_t *ring_u32 = &stash->ring_u32.hdr; uint32_t u32[num]; for (i = 0; i < num; i++) u32[i] = u16_ptr[i]; - ring_u32_enq_multi(ring_u32, stash->ring_mask, u32, num); - return num; + return ring_u32_enq(stash, u32, num); } if (obj_size == sizeof(uint8_t)) { const uint8_t *u8_ptr = obj; - ring_u32_t *ring_u32 = &stash->ring_u32.hdr; uint32_t u32[num]; for (i = 0; i < num; i++) u32[i] = u8_ptr[i]; - ring_u32_enq_multi(ring_u32, stash->ring_mask, u32, num); - return num; + return ring_u32_enq(stash, u32, num); } return -1; @@ -373,73 +637,66 @@ static inline int32_t stash_put(odp_stash_t st, const void *obj, int32_t num) int32_t odp_stash_put(odp_stash_t st, const void *obj, int32_t num) { - return stash_put(st, obj, num); + return stash_put(st, obj, num, false); } int32_t odp_stash_put_batch(odp_stash_t st, const void *obj, int32_t num) { - /* Returns always 'num', or -1 on failure. */ - return stash_put(st, obj, num); + return stash_put(st, obj, num, true); } -static inline int32_t stash_put_u32(odp_stash_t st, const uint32_t val[], - int32_t num) +int32_t odp_stash_put_u32(odp_stash_t st, const uint32_t val[], int32_t num) { - stash_t *stash = (stash_t *)(uintptr_t)st; + stash_t *stash = stash_entry(st); if (odp_unlikely(st == ODP_STASH_INVALID)) return -1; _ODP_ASSERT(stash->obj_size == sizeof(uint32_t)); - ring_u32_enq_multi(&stash->ring_u32.hdr, stash->ring_mask, - (uint32_t *)(uintptr_t)val, num); - return num; + return stash->ring_fn.u32.enq_multi(stash, val, num); } -int32_t odp_stash_put_u32(odp_stash_t st, const uint32_t val[], int32_t num) +int32_t odp_stash_put_u32_batch(odp_stash_t st, const uint32_t val[], int32_t num) { - return stash_put_u32(st, val, num); -} + stash_t *stash = stash_entry(st); -int32_t odp_stash_put_u32_batch(odp_stash_t st, const uint32_t val[], - int32_t num) -{ - /* Returns always 'num', or -1 on failure. */ - return stash_put_u32(st, val, num); + if (odp_unlikely(st == ODP_STASH_INVALID)) + return -1; + + _ODP_ASSERT(stash->obj_size == sizeof(uint32_t)); + + return stash->ring_fn.u32.enq_batch(stash, val, num); } -static inline int32_t stash_put_u64(odp_stash_t st, const uint64_t val[], - int32_t num) +int32_t odp_stash_put_u64(odp_stash_t st, const uint64_t val[], int32_t num) { - stash_t *stash = (stash_t *)(uintptr_t)st; + stash_t *stash = stash_entry(st); if (odp_unlikely(st == ODP_STASH_INVALID)) return -1; _ODP_ASSERT(stash->obj_size == sizeof(uint64_t)); - ring_u64_enq_multi(&stash->ring_u64.hdr, stash->ring_mask, - (uint64_t *)(uintptr_t)val, num); - return num; -} - -int32_t odp_stash_put_u64(odp_stash_t st, const uint64_t val[], int32_t num) -{ - return stash_put_u64(st, val, num); + return stash->ring_fn.u64.enq_multi(stash, (uint64_t *)(uintptr_t)val, num); } int32_t odp_stash_put_u64_batch(odp_stash_t st, const uint64_t val[], int32_t num) { - /* Returns always 'num', or -1 on failure. */ - return stash_put_u64(st, val, num); + stash_t *stash = stash_entry(st); + + if (odp_unlikely(st == ODP_STASH_INVALID)) + return -1; + + _ODP_ASSERT(stash->obj_size == sizeof(uint64_t)); + + return stash->ring_fn.u64.enq_batch(stash, (uint64_t *)(uintptr_t)val, num); } -static inline int32_t stash_put_ptr(odp_stash_t st, const uintptr_t ptr[], - int32_t num) +int32_t odp_stash_put_ptr(odp_stash_t st, const uintptr_t ptr[], int32_t num) { - stash_t *stash = (stash_t *)(uintptr_t)st; + stash_t *stash = stash_entry(st); if (odp_unlikely(st == ODP_STASH_INVALID)) return -1; @@ -447,69 +704,65 @@ static inline int32_t stash_put_ptr(odp_stash_t st, const uintptr_t ptr[], _ODP_ASSERT(stash->obj_size == sizeof(uintptr_t)); if (sizeof(uintptr_t) == sizeof(uint32_t)) - ring_u32_enq_multi(&stash->ring_u32.hdr, stash->ring_mask, - (uint32_t *)(uintptr_t)ptr, num); - else if (sizeof(uintptr_t) == sizeof(uint64_t)) - ring_u64_enq_multi(&stash->ring_u64.hdr, stash->ring_mask, - (uint64_t *)(uintptr_t)ptr, num); - else - return -1; + return stash->ring_fn.u32.enq_multi(stash, (uint32_t *)(uintptr_t)ptr, num); - return num; -} + if (sizeof(uintptr_t) == sizeof(uint64_t)) + return stash->ring_fn.u64.enq_multi(stash, (uint64_t *)(uintptr_t)ptr, num); -int32_t odp_stash_put_ptr(odp_stash_t st, const uintptr_t ptr[], int32_t num) -{ - return stash_put_ptr(st, ptr, num); + return -1; } int32_t odp_stash_put_ptr_batch(odp_stash_t st, const uintptr_t ptr[], int32_t num) { - /* Returns always 'num', or -1 on failure. */ - return stash_put_ptr(st, ptr, num); + stash_t *stash = stash_entry(st); + + if (odp_unlikely(st == ODP_STASH_INVALID)) + return -1; + + _ODP_ASSERT(stash->obj_size == sizeof(uintptr_t)); + + if (sizeof(uintptr_t) == sizeof(uint32_t)) + return stash->ring_fn.u32.enq_batch(stash, (uint32_t *)(uintptr_t)ptr, num); + + if (sizeof(uintptr_t) == sizeof(uint64_t)) + return stash->ring_fn.u64.enq_batch(stash, (uint64_t *)(uintptr_t)ptr, num); + + return -1; } -static inline int32_t stash_get(odp_stash_t st, void *obj, int32_t num, odp_bool_t batch) +static inline int32_t stash_get(odp_stash_t st, void *obj, int32_t num, odp_bool_t is_batch) { - stash_t *stash; + int32_t (*ring_u32_deq)(stash_t *stash, uint32_t val[], int32_t num); + int32_t (*ring_u64_deq)(stash_t *stash, uint64_t val[], int32_t num); + stash_t *stash = stash_entry(st); uint32_t obj_size; uint32_t i, num_deq; - stash = (stash_t *)(uintptr_t)st; - if (odp_unlikely(st == ODP_STASH_INVALID)) return -1; - obj_size = stash->obj_size; - - if (obj_size == sizeof(uint64_t)) { - ring_u64_t *ring_u64 = &stash->ring_u64.hdr; - - if (batch) - return ring_u64_deq_batch(ring_u64, stash->ring_mask, obj, num); - else - return ring_u64_deq_multi(ring_u64, stash->ring_mask, obj, num); + if (is_batch) { + ring_u32_deq = stash->ring_fn.u32.deq_batch; + ring_u64_deq = stash->ring_fn.u64.deq_batch; + } else { + ring_u32_deq = stash->ring_fn.u32.deq_multi; + ring_u64_deq = stash->ring_fn.u64.deq_multi; } - if (obj_size == sizeof(uint32_t)) { - ring_u32_t *ring_u32 = &stash->ring_u32.hdr; + obj_size = stash->obj_size; + + if (obj_size == sizeof(uint64_t)) + return ring_u64_deq(stash, obj, num); - if (batch) - return ring_u32_deq_batch(ring_u32, stash->ring_mask, obj, num); - else - return ring_u32_deq_multi(ring_u32, stash->ring_mask, obj, num); - } + if (obj_size == sizeof(uint32_t)) + return ring_u32_deq(stash, obj, num); if (obj_size == sizeof(uint16_t)) { uint16_t *u16_ptr = obj; - ring_u32_t *ring_u32 = &stash->ring_u32.hdr; uint32_t u32[num]; - if (batch) - num_deq = ring_u32_deq_batch(ring_u32, stash->ring_mask, u32, num); - else - num_deq = ring_u32_deq_multi(ring_u32, stash->ring_mask, u32, num); + num_deq = ring_u32_deq(stash, u32, num); for (i = 0; i < num_deq; i++) u16_ptr[i] = u32[i]; @@ -519,13 +772,9 @@ static inline int32_t stash_get(odp_stash_t st, void *obj, int32_t num, odp_bool if (obj_size == sizeof(uint8_t)) { uint8_t *u8_ptr = obj; - ring_u32_t *ring_u32 = &stash->ring_u32.hdr; uint32_t u32[num]; - if (batch) - num_deq = ring_u32_deq_batch(ring_u32, stash->ring_mask, u32, num); - else - num_deq = ring_u32_deq_multi(ring_u32, stash->ring_mask, u32, num); + num_deq = ring_u32_deq(stash, u32, num); for (i = 0; i < num_deq; i++) u8_ptr[i] = u32[i]; @@ -538,67 +787,65 @@ static inline int32_t stash_get(odp_stash_t st, void *obj, int32_t num, odp_bool int32_t odp_stash_get(odp_stash_t st, void *obj, int32_t num) { - return stash_get(st, obj, num, 0); + return stash_get(st, obj, num, false); } int32_t odp_stash_get_batch(odp_stash_t st, void *obj, int32_t num) { - return stash_get(st, obj, num, 1); + return stash_get(st, obj, num, true); } int32_t odp_stash_get_u32(odp_stash_t st, uint32_t val[], int32_t num) { - stash_t *stash = (stash_t *)(uintptr_t)st; + stash_t *stash = stash_entry(st); if (odp_unlikely(st == ODP_STASH_INVALID)) return -1; _ODP_ASSERT(stash->obj_size == sizeof(uint32_t)); - return ring_u32_deq_multi(&stash->ring_u32.hdr, stash->ring_mask, val, - num); + return stash->ring_fn.u32.deq_multi(stash, val, num); } int32_t odp_stash_get_u32_batch(odp_stash_t st, uint32_t val[], int32_t num) { - stash_t *stash = (stash_t *)(uintptr_t)st; + stash_t *stash = stash_entry(st); if (odp_unlikely(st == ODP_STASH_INVALID)) return -1; _ODP_ASSERT(stash->obj_size == sizeof(uint32_t)); - return ring_u32_deq_batch(&stash->ring_u32.hdr, stash->ring_mask, val, num); + return stash->ring_fn.u32.deq_batch(stash, val, num); } int32_t odp_stash_get_u64(odp_stash_t st, uint64_t val[], int32_t num) { - stash_t *stash = (stash_t *)(uintptr_t)st; + stash_t *stash = stash_entry(st); if (odp_unlikely(st == ODP_STASH_INVALID)) return -1; _ODP_ASSERT(stash->obj_size == sizeof(uint64_t)); - return ring_u64_deq_multi(&stash->ring_u64.hdr, stash->ring_mask, val, - num); + return stash->ring_fn.u64.deq_multi(stash, val, num); } int32_t odp_stash_get_u64_batch(odp_stash_t st, uint64_t val[], int32_t num) { - stash_t *stash = (stash_t *)(uintptr_t)st; + stash_t *stash = stash_entry(st); if (odp_unlikely(st == ODP_STASH_INVALID)) return -1; _ODP_ASSERT(stash->obj_size == sizeof(uint64_t)); - return ring_u64_deq_batch(&stash->ring_u64.hdr, stash->ring_mask, val, num); + return stash->ring_fn.u64.deq_batch(stash, val, num); } int32_t odp_stash_get_ptr(odp_stash_t st, uintptr_t ptr[], int32_t num) { - stash_t *stash = (stash_t *)(uintptr_t)st; + stash_t *stash = stash_entry(st); if (odp_unlikely(st == ODP_STASH_INVALID)) return -1; @@ -606,19 +853,17 @@ int32_t odp_stash_get_ptr(odp_stash_t st, uintptr_t ptr[], int32_t num) _ODP_ASSERT(stash->obj_size == sizeof(uintptr_t)); if (sizeof(uintptr_t) == sizeof(uint32_t)) - return ring_u32_deq_multi(&stash->ring_u32.hdr, - stash->ring_mask, - (uint32_t *)(uintptr_t)ptr, num); - else if (sizeof(uintptr_t) == sizeof(uint64_t)) - return ring_u64_deq_multi(&stash->ring_u64.hdr, - stash->ring_mask, - (uint64_t *)(uintptr_t)ptr, num); + return stash->ring_fn.u32.deq_multi(stash, (uint32_t *)(uintptr_t)ptr, num); + + if (sizeof(uintptr_t) == sizeof(uint64_t)) + return stash->ring_fn.u64.deq_multi(stash, (uint64_t *)(uintptr_t)ptr, num); + return -1; } int32_t odp_stash_get_ptr_batch(odp_stash_t st, uintptr_t ptr[], int32_t num) { - stash_t *stash = (stash_t *)(uintptr_t)st; + stash_t *stash = stash_entry(st); if (odp_unlikely(st == ODP_STASH_INVALID)) return -1; @@ -626,11 +871,11 @@ int32_t odp_stash_get_ptr_batch(odp_stash_t st, uintptr_t ptr[], int32_t num) _ODP_ASSERT(stash->obj_size == sizeof(uintptr_t)); if (sizeof(uintptr_t) == sizeof(uint32_t)) - return ring_u32_deq_batch(&stash->ring_u32.hdr, stash->ring_mask, - (uint32_t *)(uintptr_t)ptr, num); - else if (sizeof(uintptr_t) == sizeof(uint64_t)) - return ring_u64_deq_batch(&stash->ring_u64.hdr, stash->ring_mask, - (uint64_t *)(uintptr_t)ptr, num); + return stash->ring_fn.u32.deq_batch(stash, (uint32_t *)(uintptr_t)ptr, num); + + if (sizeof(uintptr_t) == sizeof(uint64_t)) + return stash->ring_fn.u64.deq_batch(stash, (uint64_t *)(uintptr_t)ptr, num); + return -1; } @@ -644,23 +889,17 @@ int odp_stash_flush_cache(odp_stash_t st) static uint32_t stash_obj_count(stash_t *stash) { - ring_u32_t *ring_u32; uint32_t obj_size = stash->obj_size; - if (obj_size == sizeof(uint64_t)) { - ring_u64_t *ring_u64 = &stash->ring_u64.hdr; - - return ring_u64_len(ring_u64); - } - - ring_u32 = &stash->ring_u32.hdr; + if (obj_size == sizeof(uint64_t)) + return stash->ring_fn.u64.len(stash); - return ring_u32_len(ring_u32); + return stash->ring_fn.u32.len(stash); } void odp_stash_print(odp_stash_t st) { - stash_t *stash = (stash_t *)(uintptr_t)st; + stash_t *stash = stash_entry(st); if (st == ODP_STASH_INVALID) { _ODP_ERR("Bad stash handle\n"); @@ -674,13 +913,13 @@ void odp_stash_print(odp_stash_t st) _ODP_PRINT(" index %i\n", stash->index); _ODP_PRINT(" obj size %u\n", stash->obj_size); _ODP_PRINT(" obj count %u\n", stash_obj_count(stash)); - _ODP_PRINT(" ring size %u\n", stash->ring_mask + 1); + _ODP_PRINT(" ring size %u\n", stash->ring_size); _ODP_PRINT("\n"); } int odp_stash_stats(odp_stash_t st, odp_stash_stats_t *stats) { - stash_t *stash = (stash_t *)(uintptr_t)st; + stash_t *stash = stash_entry(st); if (st == ODP_STASH_INVALID) { _ODP_ERR("Bad stash handle\n"); |