aboutsummaryrefslogtreecommitdiff
path: root/platform/linux-generic/odp_stash.c
diff options
context:
space:
mode:
Diffstat (limited to 'platform/linux-generic/odp_stash.c')
-rw-r--r--platform/linux-generic/odp_stash.c665
1 files changed, 452 insertions, 213 deletions
diff --git a/platform/linux-generic/odp_stash.c b/platform/linux-generic/odp_stash.c
index c7d4136ab..5ff499843 100644
--- a/platform/linux-generic/odp_stash.c
+++ b/platform/linux-generic/odp_stash.c
@@ -1,9 +1,10 @@
-/* Copyright (c) 2020-2022, Nokia
+/* Copyright (c) 2020-2023, Nokia
* All rights reserved.
*
* SPDX-License-Identifier: BSD-3-Clause
*/
+#include <odp/api/align.h>
#include <odp/api/shared_memory.h>
#include <odp/api/stash.h>
#include <odp/api/std_types.h>
@@ -15,7 +16,10 @@
#include <odp_debug_internal.h>
#include <odp_global_data.h>
#include <odp_init_internal.h>
+#include <odp_libconfig_internal.h>
#include <odp_macros_internal.h>
+#include <odp_ring_mpmc_u32_internal.h>
+#include <odp_ring_mpmc_u64_internal.h>
#include <odp_ring_u32_internal.h>
#include <odp_ring_u64_internal.h>
@@ -26,18 +30,61 @@
ODP_STATIC_ASSERT(CONFIG_INTERNAL_STASHES < CONFIG_MAX_STASHES, "TOO_MANY_INTERNAL_STASHES");
-#define MAX_RING_SIZE (1024 * 1024)
#define MIN_RING_SIZE 64
+enum {
+ STASH_FREE = 0,
+ STASH_RESERVED,
+ STASH_ACTIVE
+};
+
+typedef struct stash_t stash_t;
+
+typedef void (*ring_u32_init_fn_t)(stash_t *stash);
+typedef int32_t (*ring_u32_enq_multi_fn_t)(stash_t *stash, const uint32_t val[], int32_t num);
+typedef int32_t (*ring_u32_enq_batch_fn_t)(stash_t *stash, const uint32_t val[], int32_t num);
+typedef int32_t (*ring_u32_deq_multi_fn_t)(stash_t *stash, uint32_t val[], int32_t num);
+typedef int32_t (*ring_u32_deq_batch_fn_t)(stash_t *stash, uint32_t val[], int32_t num);
+typedef int32_t (*ring_u32_len_fn_t)(stash_t *stash);
+
+typedef void (*ring_u64_init_fn_t)(stash_t *stash);
+typedef int32_t (*ring_u64_enq_multi_fn_t)(stash_t *stash, const uint64_t val[], int32_t num);
+typedef int32_t (*ring_u64_enq_batch_fn_t)(stash_t *stash, const uint64_t val[], int32_t num);
+typedef int32_t (*ring_u64_deq_multi_fn_t)(stash_t *stash, uint64_t val[], int32_t num);
+typedef int32_t (*ring_u64_deq_batch_fn_t)(stash_t *stash, uint64_t val[], int32_t num);
+typedef int32_t (*ring_u64_len_fn_t)(stash_t *stash);
+
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wpedantic"
-typedef struct stash_t {
- char name[ODP_STASH_NAME_LEN];
- odp_shm_t shm;
- int index;
+typedef struct ODP_ALIGNED_CACHE stash_t {
+ /* Ring functions */
+ union {
+ struct {
+ ring_u32_enq_multi_fn_t enq_multi;
+ ring_u32_enq_batch_fn_t enq_batch;
+ ring_u32_deq_multi_fn_t deq_multi;
+ ring_u32_deq_batch_fn_t deq_batch;
+ ring_u32_init_fn_t init;
+ ring_u32_len_fn_t len;
+ } u32;
+
+ struct {
+ ring_u64_enq_multi_fn_t enq_multi;
+ ring_u64_enq_batch_fn_t enq_batch;
+ ring_u64_deq_multi_fn_t deq_multi;
+ ring_u64_deq_batch_fn_t deq_batch;
+ ring_u64_init_fn_t init;
+ ring_u64_len_fn_t len;
+ } u64;
+ } ring_fn;
+
uint32_t ring_mask;
+ uint32_t ring_size;
uint32_t obj_size;
+ char name[ODP_STASH_NAME_LEN];
+ int index;
+
/* Ring header followed by variable sized data (object handles) */
union {
struct ODP_ALIGNED_CACHE {
@@ -49,6 +96,16 @@ typedef struct stash_t {
ring_u64_t hdr;
uint64_t data[];
} ring_u64;
+
+ struct ODP_ALIGNED_CACHE {
+ ring_mpmc_u32_t hdr;
+ uint32_t data[];
+ } ring_mpmc_u32;
+
+ struct ODP_ALIGNED_CACHE {
+ ring_mpmc_u64_t hdr;
+ uint64_t data[];
+ } ring_mpmc_u64;
};
} stash_t;
@@ -57,23 +114,92 @@ typedef struct stash_t {
typedef struct stash_global_t {
odp_ticketlock_t lock;
odp_shm_t shm;
- uint8_t stash_reserved[CONFIG_MAX_STASHES];
+ uint32_t max_num;
+ uint32_t max_num_obj;
+ uint32_t num_internal;
+ uint8_t strict_size;
+ uint8_t stash_state[CONFIG_MAX_STASHES];
stash_t *stash[CONFIG_MAX_STASHES];
+ uint8_t data[] ODP_ALIGNED_CACHE;
} stash_global_t;
static stash_global_t *stash_global;
+static inline stash_t *stash_entry(odp_stash_t st)
+{
+ return (stash_t *)(uintptr_t)st;
+}
+
+static inline odp_stash_t stash_handle(stash_t *stash)
+{
+ return (odp_stash_t)(uintptr_t)stash;
+}
+
int _odp_stash_init_global(void)
{
odp_shm_t shm;
-
- if (odp_global_ro.disable.stash) {
+ uint32_t max_num, max_num_obj;
+ const char *str;
+ uint64_t ring_max_size, stash_max_size, stash_data_size, offset;
+ const uint32_t internal_stashes = odp_global_ro.disable.dma ? 0 : CONFIG_INTERNAL_STASHES;
+ uint8_t *stash_data;
+ uint8_t strict_size;
+ int val = 0;
+
+ if (odp_global_ro.disable.stash && odp_global_ro.disable.dma) {
_ODP_PRINT("Stash is DISABLED\n");
return 0;
}
- shm = odp_shm_reserve("_odp_stash_global", sizeof(stash_global_t),
+ _ODP_PRINT("Stash config:\n");
+
+ str = "stash.max_num";
+ if (!_odp_libconfig_lookup_int(str, &val)) {
+ _ODP_ERR("Config option '%s' not found.\n", str);
+ return -1;
+ }
+ _ODP_PRINT(" %s: %i\n", str, val);
+ max_num = val;
+
+ str = "stash.max_num_obj";
+ if (!_odp_libconfig_lookup_int(str, &val)) {
+ _ODP_ERR("Config option '%s' not found.\n", str);
+ return -1;
+ }
+ _ODP_PRINT(" %s: %i\n", str, val);
+ max_num_obj = val;
+
+ str = "stash.strict_size";
+ if (!_odp_libconfig_lookup_int(str, &val)) {
+ _ODP_ERR("Config option '%s' not found.\n", str);
+ return -1;
+ }
+ _ODP_PRINT(" %s: %i\n", str, val);
+ strict_size = !!val;
+
+ _ODP_PRINT("\n");
+
+ /* Reserve resources for implementation internal stashes */
+ if (max_num > CONFIG_MAX_STASHES - internal_stashes) {
+ _ODP_ERR("Maximum supported number of stashes: %d\n",
+ CONFIG_MAX_STASHES - internal_stashes);
+ return -1;
+ }
+ max_num += internal_stashes;
+
+ /* Must have room for minimum sized ring */
+ if (max_num_obj < MIN_RING_SIZE)
+ max_num_obj = MIN_RING_SIZE - 1;
+
+ /* Ring size must be larger than the number of items stored */
+ ring_max_size = _ODP_ROUNDUP_POWER2_U32(max_num_obj + 1);
+
+ stash_max_size = _ODP_ROUNDUP_CACHE_LINE(sizeof(stash_t) +
+ (ring_max_size * sizeof(uint64_t)));
+ stash_data_size = max_num * stash_max_size;
+
+ shm = odp_shm_reserve("_odp_stash_global", sizeof(stash_global_t) + stash_data_size,
ODP_CACHE_LINE_SIZE, 0);
stash_global = odp_shm_addr(shm);
@@ -85,8 +211,21 @@ int _odp_stash_init_global(void)
memset(stash_global, 0, sizeof(stash_global_t));
stash_global->shm = shm;
+ stash_global->max_num = max_num;
+ stash_global->max_num_obj = max_num_obj;
+ stash_global->strict_size = strict_size;
+ stash_global->num_internal = internal_stashes;
odp_ticketlock_init(&stash_global->lock);
+ /* Initialize stash pointers */
+ stash_data = stash_global->data;
+ offset = 0;
+
+ for (uint32_t i = 0; i < max_num; i++) {
+ stash_global->stash[i] = (stash_t *)(uintptr_t)(stash_data + offset);
+ offset += stash_max_size;
+ }
+
return 0;
}
@@ -108,17 +247,21 @@ int _odp_stash_term_global(void)
int odp_stash_capability(odp_stash_capability_t *capa, odp_stash_type_t type)
{
+ uint32_t max_stashes;
+
if (odp_global_ro.disable.stash) {
_ODP_ERR("Stash is disabled\n");
return -1;
}
(void)type;
+ max_stashes = stash_global->max_num - stash_global->num_internal;
+
memset(capa, 0, sizeof(odp_stash_capability_t));
- capa->max_stashes_any_type = CONFIG_MAX_STASHES - CONFIG_INTERNAL_STASHES;
- capa->max_stashes = CONFIG_MAX_STASHES - CONFIG_INTERNAL_STASHES;
- capa->max_num_obj = MAX_RING_SIZE;
+ capa->max_stashes_any_type = max_stashes;
+ capa->max_stashes = max_stashes;
+ capa->max_num_obj = stash_global->max_num_obj;
capa->max_obj_size = sizeof(uint64_t);
capa->max_get_batch = MIN_RING_SIZE;
capa->max_put_batch = MIN_RING_SIZE;
@@ -137,15 +280,14 @@ void odp_stash_param_init(odp_stash_param_t *param)
static int reserve_index(void)
{
- int i;
int index = -1;
odp_ticketlock_lock(&stash_global->lock);
- for (i = 0; i < CONFIG_MAX_STASHES; i++) {
- if (stash_global->stash_reserved[i] == 0) {
+ for (uint32_t i = 0; i < stash_global->max_num; i++) {
+ if (stash_global->stash_state[i] == STASH_FREE) {
index = i;
- stash_global->stash_reserved[i] = 1;
+ stash_global->stash_state[i] = STASH_RESERVED;
break;
}
}
@@ -159,20 +301,152 @@ static void free_index(int i)
{
odp_ticketlock_lock(&stash_global->lock);
- stash_global->stash[i] = NULL;
- stash_global->stash_reserved[i] = 0;
+ stash_global->stash_state[i] = STASH_FREE;
odp_ticketlock_unlock(&stash_global->lock);
}
+static inline void strict_ring_u32_init(stash_t *stash)
+{
+ ring_u32_init(&stash->ring_u32.hdr);
+
+ for (uint32_t i = 0; i < stash->ring_size; i++)
+ stash->ring_u32.data[i] = 0;
+}
+
+static inline void strict_ring_u64_init(stash_t *stash)
+{
+ ring_u64_init(&stash->ring_u64.hdr);
+
+ for (uint32_t i = 0; i < stash->ring_size; i++)
+ stash->ring_u64.data[i] = 0;
+}
+
+static inline int32_t strict_ring_u32_enq_multi(stash_t *stash, const uint32_t val[], int32_t num)
+{
+ /* Success always */
+ ring_u32_enq_multi(&stash->ring_u32.hdr, stash->ring_mask, (uint32_t *)(uintptr_t)val, num);
+
+ return num;
+}
+
+static inline int32_t strict_ring_u64_enq_multi(stash_t *stash, const uint64_t val[], int32_t num)
+{
+ /* Success always */
+ ring_u64_enq_multi(&stash->ring_u64.hdr, stash->ring_mask, (uint64_t *)(uintptr_t)val, num);
+
+ return num;
+}
+
+static inline int32_t strict_ring_u32_deq_multi(stash_t *stash, uint32_t val[], int32_t num)
+{
+ return ring_u32_deq_multi(&stash->ring_u32.hdr, stash->ring_mask, val, num);
+}
+
+static inline int32_t strict_ring_u64_deq_multi(stash_t *stash, uint64_t val[], int32_t num)
+{
+ return ring_u64_deq_multi(&stash->ring_u64.hdr, stash->ring_mask, val, num);
+}
+
+static inline int32_t strict_ring_u32_deq_batch(stash_t *stash, uint32_t val[], int32_t num)
+{
+ return ring_u32_deq_batch(&stash->ring_u32.hdr, stash->ring_mask, val, num);
+}
+
+static inline int32_t strict_ring_u64_deq_batch(stash_t *stash, uint64_t val[], int32_t num)
+{
+ return ring_u64_deq_batch(&stash->ring_u64.hdr, stash->ring_mask, val, num);
+}
+
+static inline int32_t strict_ring_u32_len(stash_t *stash)
+{
+ return ring_u32_len(&stash->ring_u32.hdr);
+}
+
+static inline int32_t strict_ring_u64_len(stash_t *stash)
+{
+ return ring_u64_len(&stash->ring_u64.hdr);
+}
+
+static inline void mpmc_ring_u32_init(stash_t *stash)
+{
+ ring_mpmc_u32_init(&stash->ring_mpmc_u32.hdr);
+
+ for (uint32_t i = 0; i < stash->ring_size; i++)
+ stash->ring_mpmc_u32.data[i] = 0;
+}
+
+static inline void mpmc_ring_u64_init(stash_t *stash)
+{
+ ring_mpmc_u64_init(&stash->ring_mpmc_u64.hdr);
+
+ for (uint32_t i = 0; i < stash->ring_size; i++)
+ stash->ring_mpmc_u64.data[i] = 0;
+}
+
+static inline int32_t mpmc_ring_u32_enq_multi(stash_t *stash, const uint32_t val[], int32_t num)
+{
+ return ring_mpmc_u32_enq_multi(&stash->ring_mpmc_u32.hdr, stash->ring_mpmc_u32.data,
+ stash->ring_mask, val, num);
+}
+
+static inline int32_t mpmc_ring_u64_enq_multi(stash_t *stash, const uint64_t val[], int32_t num)
+{
+ return ring_mpmc_u64_enq_multi(&stash->ring_mpmc_u64.hdr, stash->ring_mpmc_u64.data,
+ stash->ring_mask, val, num);
+}
+
+static inline int32_t mpmc_ring_u32_enq_batch(stash_t *stash, const uint32_t val[], int32_t num)
+{
+ return ring_mpmc_u32_enq_batch(&stash->ring_mpmc_u32.hdr, stash->ring_mpmc_u32.data,
+ stash->ring_mask, val, num);
+}
+
+static inline int32_t mpmc_ring_u64_enq_batch(stash_t *stash, const uint64_t val[], int32_t num)
+{
+ return ring_mpmc_u64_enq_batch(&stash->ring_mpmc_u64.hdr, stash->ring_mpmc_u64.data,
+ stash->ring_mask, val, num);
+}
+
+static inline int32_t mpmc_ring_u32_deq_multi(stash_t *stash, uint32_t val[], int32_t num)
+{
+ return ring_mpmc_u32_deq_multi(&stash->ring_mpmc_u32.hdr, stash->ring_mpmc_u32.data,
+ stash->ring_mask, val, num);
+}
+
+static inline int32_t mpmc_ring_u64_deq_multi(stash_t *stash, uint64_t val[], int32_t num)
+{
+ return ring_mpmc_u64_deq_multi(&stash->ring_mpmc_u64.hdr, stash->ring_mpmc_u64.data,
+ stash->ring_mask, val, num);
+}
+
+static inline int32_t mpmc_ring_u32_deq_batch(stash_t *stash, uint32_t val[], int32_t num)
+{
+ return ring_mpmc_u32_deq_batch(&stash->ring_mpmc_u32.hdr, stash->ring_mpmc_u32.data,
+ stash->ring_mask, val, num);
+}
+
+static inline int32_t mpmc_ring_u64_deq_batch(stash_t *stash, uint64_t val[], int32_t num)
+{
+ return ring_mpmc_u64_deq_batch(&stash->ring_mpmc_u64.hdr, stash->ring_mpmc_u64.data,
+ stash->ring_mask, val, num);
+}
+
+static inline int32_t mpmc_ring_u32_len(stash_t *stash)
+{
+ return ring_mpmc_u32_len(&stash->ring_mpmc_u32.hdr);
+}
+
+static inline int32_t mpmc_ring_u64_len(stash_t *stash)
+{
+ return ring_mpmc_u64_len(&stash->ring_mpmc_u64.hdr);
+}
+
odp_stash_t odp_stash_create(const char *name, const odp_stash_param_t *param)
{
- odp_shm_t shm;
stash_t *stash;
- uint64_t i, ring_size, shm_size;
+ uint64_t ring_size;
int ring_u64, index;
- char shm_name[ODP_STASH_NAME_LEN + 8];
- uint32_t shm_flags = 0;
if (odp_global_ro.disable.stash) {
_ODP_ERR("Stash is disabled\n");
@@ -184,7 +458,7 @@ odp_stash_t odp_stash_create(const char *name, const odp_stash_param_t *param)
return ODP_STASH_INVALID;
}
- if (param->num_obj > MAX_RING_SIZE) {
+ if (param->num_obj > stash_global->max_num_obj) {
_ODP_ERR("Too many objects.\n");
return ODP_STASH_INVALID;
}
@@ -213,75 +487,71 @@ odp_stash_t odp_stash_create(const char *name, const odp_stash_param_t *param)
else
ring_size = _ODP_ROUNDUP_POWER2_U32(ring_size + 1);
- memset(shm_name, 0, sizeof(shm_name));
- snprintf(shm_name, sizeof(shm_name) - 1, "_stash_%s", name);
-
- if (ring_u64)
- shm_size = sizeof(stash_t) + (ring_size * sizeof(uint64_t));
- else
- shm_size = sizeof(stash_t) + (ring_size * sizeof(uint32_t));
-
- if (odp_global_ro.shm_single_va)
- shm_flags |= ODP_SHM_SINGLE_VA;
-
- shm = odp_shm_reserve(shm_name, shm_size, ODP_CACHE_LINE_SIZE, shm_flags);
-
- if (shm == ODP_SHM_INVALID) {
- _ODP_ERR("SHM reserve failed.\n");
- free_index(index);
- return ODP_STASH_INVALID;
- }
-
- stash = odp_shm_addr(shm);
+ stash = stash_global->stash[index];
memset(stash, 0, sizeof(stash_t));
- if (ring_u64) {
- ring_u64_init(&stash->ring_u64.hdr);
-
- for (i = 0; i < ring_size; i++)
- stash->ring_u64.data[i] = 0;
+ /* Set ring function pointers */
+ if (stash_global->strict_size) {
+ if (ring_u64) {
+ stash->ring_fn.u64.init = strict_ring_u64_init;
+ stash->ring_fn.u64.enq_multi = strict_ring_u64_enq_multi;
+ stash->ring_fn.u64.enq_batch = strict_ring_u64_enq_multi;
+ stash->ring_fn.u64.deq_multi = strict_ring_u64_deq_multi;
+ stash->ring_fn.u64.deq_batch = strict_ring_u64_deq_batch;
+ stash->ring_fn.u64.len = strict_ring_u64_len;
+ } else {
+ stash->ring_fn.u32.init = strict_ring_u32_init;
+ stash->ring_fn.u32.enq_multi = strict_ring_u32_enq_multi;
+ stash->ring_fn.u32.enq_batch = strict_ring_u32_enq_multi;
+ stash->ring_fn.u32.deq_multi = strict_ring_u32_deq_multi;
+ stash->ring_fn.u32.deq_batch = strict_ring_u32_deq_batch;
+ stash->ring_fn.u32.len = strict_ring_u32_len;
+ }
} else {
- ring_u32_init(&stash->ring_u32.hdr);
-
- for (i = 0; i < ring_size; i++)
- stash->ring_u32.data[i] = 0;
+ if (ring_u64) {
+ stash->ring_fn.u64.init = mpmc_ring_u64_init;
+ stash->ring_fn.u64.enq_multi = mpmc_ring_u64_enq_multi;
+ stash->ring_fn.u64.enq_batch = mpmc_ring_u64_enq_batch;
+ stash->ring_fn.u64.deq_multi = mpmc_ring_u64_deq_multi;
+ stash->ring_fn.u64.deq_batch = mpmc_ring_u64_deq_batch;
+ stash->ring_fn.u64.len = mpmc_ring_u64_len;
+ } else {
+ stash->ring_fn.u32.init = mpmc_ring_u32_init;
+ stash->ring_fn.u32.enq_multi = mpmc_ring_u32_enq_multi;
+ stash->ring_fn.u32.enq_batch = mpmc_ring_u32_enq_batch;
+ stash->ring_fn.u32.deq_multi = mpmc_ring_u32_deq_multi;
+ stash->ring_fn.u32.deq_batch = mpmc_ring_u32_deq_batch;
+ stash->ring_fn.u32.len = mpmc_ring_u32_len;
+ }
}
if (name)
strcpy(stash->name, name);
stash->index = index;
- stash->shm = shm;
stash->obj_size = param->obj_size;
stash->ring_mask = ring_size - 1;
+ stash->ring_size = ring_size;
+
+ if (ring_u64)
+ stash->ring_fn.u64.init(stash);
+ else
+ stash->ring_fn.u32.init(stash);
/* This makes stash visible to lookups */
odp_ticketlock_lock(&stash_global->lock);
- stash_global->stash[index] = stash;
+ stash_global->stash_state[index] = STASH_ACTIVE;
odp_ticketlock_unlock(&stash_global->lock);
- return (odp_stash_t)stash;
+ return stash_handle(stash);
}
int odp_stash_destroy(odp_stash_t st)
{
- stash_t *stash;
- int index;
- odp_shm_t shm;
-
if (st == ODP_STASH_INVALID)
return -1;
- stash = (stash_t *)(uintptr_t)st;
- index = stash->index;
- shm = stash->shm;
-
- free_index(index);
-
- if (odp_shm_free(shm)) {
- _ODP_ERR("SHM free failed.\n");
- return -1;
- }
+ free_index(stash_entry(st)->index);
return 0;
}
@@ -293,7 +563,6 @@ uint64_t odp_stash_to_u64(odp_stash_t st)
odp_stash_t odp_stash_lookup(const char *name)
{
- int i;
stash_t *stash;
if (name == NULL)
@@ -301,12 +570,13 @@ odp_stash_t odp_stash_lookup(const char *name)
odp_ticketlock_lock(&stash_global->lock);
- for (i = 0; i < CONFIG_MAX_STASHES; i++) {
+ for (uint32_t i = 0; i < stash_global->max_num; i++) {
stash = stash_global->stash[i];
- if (stash && strcmp(stash->name, name) == 0) {
+ if (stash_global->stash_state[i] == STASH_ACTIVE &&
+ strcmp(stash->name, name) == 0) {
odp_ticketlock_unlock(&stash_global->lock);
- return (odp_stash_t)stash;
+ return stash_handle(stash);
}
}
@@ -315,57 +585,51 @@ odp_stash_t odp_stash_lookup(const char *name)
return ODP_STASH_INVALID;
}
-static inline int32_t stash_put(odp_stash_t st, const void *obj, int32_t num)
+static inline int32_t stash_put(odp_stash_t st, const void *obj, int32_t num, odp_bool_t is_batch)
{
- stash_t *stash;
+ int32_t (*ring_u32_enq)(stash_t *stash, const uint32_t val[], int32_t num);
+ int32_t (*ring_u64_enq)(stash_t *stash, const uint64_t val[], int32_t num);
+ stash_t *stash = stash_entry(st);
uint32_t obj_size;
int32_t i;
- stash = (stash_t *)(uintptr_t)st;
-
if (odp_unlikely(st == ODP_STASH_INVALID))
return -1;
- obj_size = stash->obj_size;
-
- if (obj_size == sizeof(uint64_t)) {
- ring_u64_t *ring_u64 = &stash->ring_u64.hdr;
-
- ring_u64_enq_multi(ring_u64, stash->ring_mask,
- (uint64_t *)(uintptr_t)obj, num);
- return num;
+ if (is_batch) {
+ ring_u32_enq = stash->ring_fn.u32.enq_batch;
+ ring_u64_enq = stash->ring_fn.u64.enq_batch;
+ } else {
+ ring_u32_enq = stash->ring_fn.u32.enq_multi;
+ ring_u64_enq = stash->ring_fn.u64.enq_multi;
}
- if (obj_size == sizeof(uint32_t)) {
- ring_u32_t *ring_u32 = &stash->ring_u32.hdr;
+ obj_size = stash->obj_size;
- ring_u32_enq_multi(ring_u32, stash->ring_mask,
- (uint32_t *)(uintptr_t)obj, num);
- return num;
- }
+ if (obj_size == sizeof(uint64_t))
+ return ring_u64_enq(stash, (uint64_t *)(uintptr_t)obj, num);
+
+ if (obj_size == sizeof(uint32_t))
+ return ring_u32_enq(stash, (uint32_t *)(uintptr_t)obj, num);
if (obj_size == sizeof(uint16_t)) {
const uint16_t *u16_ptr = obj;
- ring_u32_t *ring_u32 = &stash->ring_u32.hdr;
uint32_t u32[num];
for (i = 0; i < num; i++)
u32[i] = u16_ptr[i];
- ring_u32_enq_multi(ring_u32, stash->ring_mask, u32, num);
- return num;
+ return ring_u32_enq(stash, u32, num);
}
if (obj_size == sizeof(uint8_t)) {
const uint8_t *u8_ptr = obj;
- ring_u32_t *ring_u32 = &stash->ring_u32.hdr;
uint32_t u32[num];
for (i = 0; i < num; i++)
u32[i] = u8_ptr[i];
- ring_u32_enq_multi(ring_u32, stash->ring_mask, u32, num);
- return num;
+ return ring_u32_enq(stash, u32, num);
}
return -1;
@@ -373,73 +637,66 @@ static inline int32_t stash_put(odp_stash_t st, const void *obj, int32_t num)
int32_t odp_stash_put(odp_stash_t st, const void *obj, int32_t num)
{
- return stash_put(st, obj, num);
+ return stash_put(st, obj, num, false);
}
int32_t odp_stash_put_batch(odp_stash_t st, const void *obj, int32_t num)
{
- /* Returns always 'num', or -1 on failure. */
- return stash_put(st, obj, num);
+ return stash_put(st, obj, num, true);
}
-static inline int32_t stash_put_u32(odp_stash_t st, const uint32_t val[],
- int32_t num)
+int32_t odp_stash_put_u32(odp_stash_t st, const uint32_t val[], int32_t num)
{
- stash_t *stash = (stash_t *)(uintptr_t)st;
+ stash_t *stash = stash_entry(st);
if (odp_unlikely(st == ODP_STASH_INVALID))
return -1;
_ODP_ASSERT(stash->obj_size == sizeof(uint32_t));
- ring_u32_enq_multi(&stash->ring_u32.hdr, stash->ring_mask,
- (uint32_t *)(uintptr_t)val, num);
- return num;
+ return stash->ring_fn.u32.enq_multi(stash, val, num);
}
-int32_t odp_stash_put_u32(odp_stash_t st, const uint32_t val[], int32_t num)
+int32_t odp_stash_put_u32_batch(odp_stash_t st, const uint32_t val[], int32_t num)
{
- return stash_put_u32(st, val, num);
-}
+ stash_t *stash = stash_entry(st);
-int32_t odp_stash_put_u32_batch(odp_stash_t st, const uint32_t val[],
- int32_t num)
-{
- /* Returns always 'num', or -1 on failure. */
- return stash_put_u32(st, val, num);
+ if (odp_unlikely(st == ODP_STASH_INVALID))
+ return -1;
+
+ _ODP_ASSERT(stash->obj_size == sizeof(uint32_t));
+
+ return stash->ring_fn.u32.enq_batch(stash, val, num);
}
-static inline int32_t stash_put_u64(odp_stash_t st, const uint64_t val[],
- int32_t num)
+int32_t odp_stash_put_u64(odp_stash_t st, const uint64_t val[], int32_t num)
{
- stash_t *stash = (stash_t *)(uintptr_t)st;
+ stash_t *stash = stash_entry(st);
if (odp_unlikely(st == ODP_STASH_INVALID))
return -1;
_ODP_ASSERT(stash->obj_size == sizeof(uint64_t));
- ring_u64_enq_multi(&stash->ring_u64.hdr, stash->ring_mask,
- (uint64_t *)(uintptr_t)val, num);
- return num;
-}
-
-int32_t odp_stash_put_u64(odp_stash_t st, const uint64_t val[], int32_t num)
-{
- return stash_put_u64(st, val, num);
+ return stash->ring_fn.u64.enq_multi(stash, (uint64_t *)(uintptr_t)val, num);
}
int32_t odp_stash_put_u64_batch(odp_stash_t st, const uint64_t val[],
int32_t num)
{
- /* Returns always 'num', or -1 on failure. */
- return stash_put_u64(st, val, num);
+ stash_t *stash = stash_entry(st);
+
+ if (odp_unlikely(st == ODP_STASH_INVALID))
+ return -1;
+
+ _ODP_ASSERT(stash->obj_size == sizeof(uint64_t));
+
+ return stash->ring_fn.u64.enq_batch(stash, (uint64_t *)(uintptr_t)val, num);
}
-static inline int32_t stash_put_ptr(odp_stash_t st, const uintptr_t ptr[],
- int32_t num)
+int32_t odp_stash_put_ptr(odp_stash_t st, const uintptr_t ptr[], int32_t num)
{
- stash_t *stash = (stash_t *)(uintptr_t)st;
+ stash_t *stash = stash_entry(st);
if (odp_unlikely(st == ODP_STASH_INVALID))
return -1;
@@ -447,69 +704,65 @@ static inline int32_t stash_put_ptr(odp_stash_t st, const uintptr_t ptr[],
_ODP_ASSERT(stash->obj_size == sizeof(uintptr_t));
if (sizeof(uintptr_t) == sizeof(uint32_t))
- ring_u32_enq_multi(&stash->ring_u32.hdr, stash->ring_mask,
- (uint32_t *)(uintptr_t)ptr, num);
- else if (sizeof(uintptr_t) == sizeof(uint64_t))
- ring_u64_enq_multi(&stash->ring_u64.hdr, stash->ring_mask,
- (uint64_t *)(uintptr_t)ptr, num);
- else
- return -1;
+ return stash->ring_fn.u32.enq_multi(stash, (uint32_t *)(uintptr_t)ptr, num);
- return num;
-}
+ if (sizeof(uintptr_t) == sizeof(uint64_t))
+ return stash->ring_fn.u64.enq_multi(stash, (uint64_t *)(uintptr_t)ptr, num);
-int32_t odp_stash_put_ptr(odp_stash_t st, const uintptr_t ptr[], int32_t num)
-{
- return stash_put_ptr(st, ptr, num);
+ return -1;
}
int32_t odp_stash_put_ptr_batch(odp_stash_t st, const uintptr_t ptr[],
int32_t num)
{
- /* Returns always 'num', or -1 on failure. */
- return stash_put_ptr(st, ptr, num);
+ stash_t *stash = stash_entry(st);
+
+ if (odp_unlikely(st == ODP_STASH_INVALID))
+ return -1;
+
+ _ODP_ASSERT(stash->obj_size == sizeof(uintptr_t));
+
+ if (sizeof(uintptr_t) == sizeof(uint32_t))
+ return stash->ring_fn.u32.enq_batch(stash, (uint32_t *)(uintptr_t)ptr, num);
+
+ if (sizeof(uintptr_t) == sizeof(uint64_t))
+ return stash->ring_fn.u64.enq_batch(stash, (uint64_t *)(uintptr_t)ptr, num);
+
+ return -1;
}
-static inline int32_t stash_get(odp_stash_t st, void *obj, int32_t num, odp_bool_t batch)
+static inline int32_t stash_get(odp_stash_t st, void *obj, int32_t num, odp_bool_t is_batch)
{
- stash_t *stash;
+ int32_t (*ring_u32_deq)(stash_t *stash, uint32_t val[], int32_t num);
+ int32_t (*ring_u64_deq)(stash_t *stash, uint64_t val[], int32_t num);
+ stash_t *stash = stash_entry(st);
uint32_t obj_size;
uint32_t i, num_deq;
- stash = (stash_t *)(uintptr_t)st;
-
if (odp_unlikely(st == ODP_STASH_INVALID))
return -1;
- obj_size = stash->obj_size;
-
- if (obj_size == sizeof(uint64_t)) {
- ring_u64_t *ring_u64 = &stash->ring_u64.hdr;
-
- if (batch)
- return ring_u64_deq_batch(ring_u64, stash->ring_mask, obj, num);
- else
- return ring_u64_deq_multi(ring_u64, stash->ring_mask, obj, num);
+ if (is_batch) {
+ ring_u32_deq = stash->ring_fn.u32.deq_batch;
+ ring_u64_deq = stash->ring_fn.u64.deq_batch;
+ } else {
+ ring_u32_deq = stash->ring_fn.u32.deq_multi;
+ ring_u64_deq = stash->ring_fn.u64.deq_multi;
}
- if (obj_size == sizeof(uint32_t)) {
- ring_u32_t *ring_u32 = &stash->ring_u32.hdr;
+ obj_size = stash->obj_size;
+
+ if (obj_size == sizeof(uint64_t))
+ return ring_u64_deq(stash, obj, num);
- if (batch)
- return ring_u32_deq_batch(ring_u32, stash->ring_mask, obj, num);
- else
- return ring_u32_deq_multi(ring_u32, stash->ring_mask, obj, num);
- }
+ if (obj_size == sizeof(uint32_t))
+ return ring_u32_deq(stash, obj, num);
if (obj_size == sizeof(uint16_t)) {
uint16_t *u16_ptr = obj;
- ring_u32_t *ring_u32 = &stash->ring_u32.hdr;
uint32_t u32[num];
- if (batch)
- num_deq = ring_u32_deq_batch(ring_u32, stash->ring_mask, u32, num);
- else
- num_deq = ring_u32_deq_multi(ring_u32, stash->ring_mask, u32, num);
+ num_deq = ring_u32_deq(stash, u32, num);
for (i = 0; i < num_deq; i++)
u16_ptr[i] = u32[i];
@@ -519,13 +772,9 @@ static inline int32_t stash_get(odp_stash_t st, void *obj, int32_t num, odp_bool
if (obj_size == sizeof(uint8_t)) {
uint8_t *u8_ptr = obj;
- ring_u32_t *ring_u32 = &stash->ring_u32.hdr;
uint32_t u32[num];
- if (batch)
- num_deq = ring_u32_deq_batch(ring_u32, stash->ring_mask, u32, num);
- else
- num_deq = ring_u32_deq_multi(ring_u32, stash->ring_mask, u32, num);
+ num_deq = ring_u32_deq(stash, u32, num);
for (i = 0; i < num_deq; i++)
u8_ptr[i] = u32[i];
@@ -538,67 +787,65 @@ static inline int32_t stash_get(odp_stash_t st, void *obj, int32_t num, odp_bool
int32_t odp_stash_get(odp_stash_t st, void *obj, int32_t num)
{
- return stash_get(st, obj, num, 0);
+ return stash_get(st, obj, num, false);
}
int32_t odp_stash_get_batch(odp_stash_t st, void *obj, int32_t num)
{
- return stash_get(st, obj, num, 1);
+ return stash_get(st, obj, num, true);
}
int32_t odp_stash_get_u32(odp_stash_t st, uint32_t val[], int32_t num)
{
- stash_t *stash = (stash_t *)(uintptr_t)st;
+ stash_t *stash = stash_entry(st);
if (odp_unlikely(st == ODP_STASH_INVALID))
return -1;
_ODP_ASSERT(stash->obj_size == sizeof(uint32_t));
- return ring_u32_deq_multi(&stash->ring_u32.hdr, stash->ring_mask, val,
- num);
+ return stash->ring_fn.u32.deq_multi(stash, val, num);
}
int32_t odp_stash_get_u32_batch(odp_stash_t st, uint32_t val[], int32_t num)
{
- stash_t *stash = (stash_t *)(uintptr_t)st;
+ stash_t *stash = stash_entry(st);
if (odp_unlikely(st == ODP_STASH_INVALID))
return -1;
_ODP_ASSERT(stash->obj_size == sizeof(uint32_t));
- return ring_u32_deq_batch(&stash->ring_u32.hdr, stash->ring_mask, val, num);
+ return stash->ring_fn.u32.deq_batch(stash, val, num);
}
int32_t odp_stash_get_u64(odp_stash_t st, uint64_t val[], int32_t num)
{
- stash_t *stash = (stash_t *)(uintptr_t)st;
+ stash_t *stash = stash_entry(st);
if (odp_unlikely(st == ODP_STASH_INVALID))
return -1;
_ODP_ASSERT(stash->obj_size == sizeof(uint64_t));
- return ring_u64_deq_multi(&stash->ring_u64.hdr, stash->ring_mask, val,
- num);
+ return stash->ring_fn.u64.deq_multi(stash, val, num);
}
int32_t odp_stash_get_u64_batch(odp_stash_t st, uint64_t val[], int32_t num)
{
- stash_t *stash = (stash_t *)(uintptr_t)st;
+ stash_t *stash = stash_entry(st);
if (odp_unlikely(st == ODP_STASH_INVALID))
return -1;
_ODP_ASSERT(stash->obj_size == sizeof(uint64_t));
- return ring_u64_deq_batch(&stash->ring_u64.hdr, stash->ring_mask, val, num);
+ return stash->ring_fn.u64.deq_batch(stash, val, num);
}
int32_t odp_stash_get_ptr(odp_stash_t st, uintptr_t ptr[], int32_t num)
{
- stash_t *stash = (stash_t *)(uintptr_t)st;
+ stash_t *stash = stash_entry(st);
if (odp_unlikely(st == ODP_STASH_INVALID))
return -1;
@@ -606,19 +853,17 @@ int32_t odp_stash_get_ptr(odp_stash_t st, uintptr_t ptr[], int32_t num)
_ODP_ASSERT(stash->obj_size == sizeof(uintptr_t));
if (sizeof(uintptr_t) == sizeof(uint32_t))
- return ring_u32_deq_multi(&stash->ring_u32.hdr,
- stash->ring_mask,
- (uint32_t *)(uintptr_t)ptr, num);
- else if (sizeof(uintptr_t) == sizeof(uint64_t))
- return ring_u64_deq_multi(&stash->ring_u64.hdr,
- stash->ring_mask,
- (uint64_t *)(uintptr_t)ptr, num);
+ return stash->ring_fn.u32.deq_multi(stash, (uint32_t *)(uintptr_t)ptr, num);
+
+ if (sizeof(uintptr_t) == sizeof(uint64_t))
+ return stash->ring_fn.u64.deq_multi(stash, (uint64_t *)(uintptr_t)ptr, num);
+
return -1;
}
int32_t odp_stash_get_ptr_batch(odp_stash_t st, uintptr_t ptr[], int32_t num)
{
- stash_t *stash = (stash_t *)(uintptr_t)st;
+ stash_t *stash = stash_entry(st);
if (odp_unlikely(st == ODP_STASH_INVALID))
return -1;
@@ -626,11 +871,11 @@ int32_t odp_stash_get_ptr_batch(odp_stash_t st, uintptr_t ptr[], int32_t num)
_ODP_ASSERT(stash->obj_size == sizeof(uintptr_t));
if (sizeof(uintptr_t) == sizeof(uint32_t))
- return ring_u32_deq_batch(&stash->ring_u32.hdr, stash->ring_mask,
- (uint32_t *)(uintptr_t)ptr, num);
- else if (sizeof(uintptr_t) == sizeof(uint64_t))
- return ring_u64_deq_batch(&stash->ring_u64.hdr, stash->ring_mask,
- (uint64_t *)(uintptr_t)ptr, num);
+ return stash->ring_fn.u32.deq_batch(stash, (uint32_t *)(uintptr_t)ptr, num);
+
+ if (sizeof(uintptr_t) == sizeof(uint64_t))
+ return stash->ring_fn.u64.deq_batch(stash, (uint64_t *)(uintptr_t)ptr, num);
+
return -1;
}
@@ -644,23 +889,17 @@ int odp_stash_flush_cache(odp_stash_t st)
static uint32_t stash_obj_count(stash_t *stash)
{
- ring_u32_t *ring_u32;
uint32_t obj_size = stash->obj_size;
- if (obj_size == sizeof(uint64_t)) {
- ring_u64_t *ring_u64 = &stash->ring_u64.hdr;
-
- return ring_u64_len(ring_u64);
- }
-
- ring_u32 = &stash->ring_u32.hdr;
+ if (obj_size == sizeof(uint64_t))
+ return stash->ring_fn.u64.len(stash);
- return ring_u32_len(ring_u32);
+ return stash->ring_fn.u32.len(stash);
}
void odp_stash_print(odp_stash_t st)
{
- stash_t *stash = (stash_t *)(uintptr_t)st;
+ stash_t *stash = stash_entry(st);
if (st == ODP_STASH_INVALID) {
_ODP_ERR("Bad stash handle\n");
@@ -674,13 +913,13 @@ void odp_stash_print(odp_stash_t st)
_ODP_PRINT(" index %i\n", stash->index);
_ODP_PRINT(" obj size %u\n", stash->obj_size);
_ODP_PRINT(" obj count %u\n", stash_obj_count(stash));
- _ODP_PRINT(" ring size %u\n", stash->ring_mask + 1);
+ _ODP_PRINT(" ring size %u\n", stash->ring_size);
_ODP_PRINT("\n");
}
int odp_stash_stats(odp_stash_t st, odp_stash_stats_t *stats)
{
- stash_t *stash = (stash_t *)(uintptr_t)st;
+ stash_t *stash = stash_entry(st);
if (st == ODP_STASH_INVALID) {
_ODP_ERR("Bad stash handle\n");