diff options
author | Matias Elo <matias.elo@nokia.com> | 2018-07-11 15:30:07 +0300 |
---|---|---|
committer | Matias Elo <matias.elo@nokia.com> | 2018-07-13 10:30:04 +0300 |
commit | 66fd19ddff3e3bfcfe6369714297f3fea8417318 (patch) | |
tree | c369f591cb8c9f46625de8724264d6e9bf08efe5 /platform/linux-dpdk/odp_queue_spsc.c | |
parent | 32b033ccb0824ce20989bf312b40c9533ce7c664 (diff) |
Port eb021ca3 "linux-gen: queue_spsc: single-producer, single-consumer queue"
Includes also:
3c335400 "linux-gen: queue: fix queue empty check"
Signed-off-by: Matias Elo <matias.elo@nokia.com>
Diffstat (limited to 'platform/linux-dpdk/odp_queue_spsc.c')
-rw-r--r-- | platform/linux-dpdk/odp_queue_spsc.c | 93 |
1 files changed, 93 insertions, 0 deletions
diff --git a/platform/linux-dpdk/odp_queue_spsc.c b/platform/linux-dpdk/odp_queue_spsc.c new file mode 100644 index 000000000..9ee6f5b03 --- /dev/null +++ b/platform/linux-dpdk/odp_queue_spsc.c @@ -0,0 +1,93 @@ +/* Copyright (c) 2018, Linaro Limited + * All rights reserved. + * + * SPDX-License-Identifier: BSD-3-Clause + */ +#include <odp/api/hints.h> +#include <odp_queue_internal.h> + +#include "config.h" +#include <odp_debug_internal.h> + +static inline int spsc_enq_multi(void *q_int, odp_buffer_hdr_t *buf_hdr[], + int num) +{ + queue_entry_t *queue; + ring_spsc_t ring_spsc; + + queue = q_int; + ring_spsc = queue->s.ring_spsc; + + if (odp_unlikely(queue->s.status < QUEUE_STATUS_READY)) { + ODP_ERR("Bad queue status\n"); + return -1; + } + + return ring_spsc_enq_multi(ring_spsc, (void **)buf_hdr, num); +} + +static inline int spsc_deq_multi(void *q_int, odp_buffer_hdr_t *buf_hdr[], + int num) +{ + queue_entry_t *queue; + ring_spsc_t ring_spsc; + + queue = q_int; + ring_spsc = queue->s.ring_spsc; + + if (odp_unlikely(queue->s.status < QUEUE_STATUS_READY)) { + /* Bad queue, or queue has been destroyed. */ + return -1; + } + + return ring_spsc_deq_multi(ring_spsc, (void **)buf_hdr, num); +} + +static int queue_spsc_enq_multi(void *q_int, odp_buffer_hdr_t *buf_hdr[], + int num) +{ + return spsc_enq_multi(q_int, buf_hdr, num); +} + +static int queue_spsc_enq(void *q_int, odp_buffer_hdr_t *buf_hdr) +{ + int ret; + + ret = spsc_enq_multi(q_int, &buf_hdr, 1); + + if (ret == 1) + return 0; + else + return -1; +} + +static int queue_spsc_deq_multi(void *q_int, odp_buffer_hdr_t *buf_hdr[], + int num) +{ + return spsc_deq_multi(q_int, buf_hdr, num); +} + +static odp_buffer_hdr_t *queue_spsc_deq(void *q_int) +{ + odp_buffer_hdr_t *buf_hdr = NULL; + int ret; + + ret = spsc_deq_multi(q_int, &buf_hdr, 1); + + if (ret == 1) + return buf_hdr; + else + return NULL; +} + +void queue_spsc_init(queue_entry_t *queue, uint32_t queue_size) +{ + queue->s.enqueue = queue_spsc_enq; + queue->s.dequeue = queue_spsc_deq; + queue->s.enqueue_multi = queue_spsc_enq_multi; + queue->s.dequeue_multi = queue_spsc_deq_multi; + + queue->s.ring_spsc = ring_spsc_create(queue->s.name, queue_size); + if (queue->s.ring_spsc == NULL) + ODP_ABORT("Creating SPSC ring failed\n"); +} |