aboutsummaryrefslogtreecommitdiff
path: root/platform/linux-dpdk/odp_queue_spsc.c
diff options
context:
space:
mode:
authorMatias Elo <matias.elo@nokia.com>2018-07-11 15:30:07 +0300
committerMatias Elo <matias.elo@nokia.com>2018-07-13 10:30:04 +0300
commit66fd19ddff3e3bfcfe6369714297f3fea8417318 (patch)
treec369f591cb8c9f46625de8724264d6e9bf08efe5 /platform/linux-dpdk/odp_queue_spsc.c
parent32b033ccb0824ce20989bf312b40c9533ce7c664 (diff)
Port eb021ca3 "linux-gen: queue_spsc: single-producer, single-consumer queue"
Includes also: 3c335400 "linux-gen: queue: fix queue empty check" Signed-off-by: Matias Elo <matias.elo@nokia.com>
Diffstat (limited to 'platform/linux-dpdk/odp_queue_spsc.c')
-rw-r--r--platform/linux-dpdk/odp_queue_spsc.c93
1 files changed, 93 insertions, 0 deletions
diff --git a/platform/linux-dpdk/odp_queue_spsc.c b/platform/linux-dpdk/odp_queue_spsc.c
new file mode 100644
index 000000000..9ee6f5b03
--- /dev/null
+++ b/platform/linux-dpdk/odp_queue_spsc.c
@@ -0,0 +1,93 @@
+/* Copyright (c) 2018, Linaro Limited
+ * All rights reserved.
+ *
+ * SPDX-License-Identifier: BSD-3-Clause
+ */
+#include <odp/api/hints.h>
+#include <odp_queue_internal.h>
+
+#include "config.h"
+#include <odp_debug_internal.h>
+
+static inline int spsc_enq_multi(void *q_int, odp_buffer_hdr_t *buf_hdr[],
+ int num)
+{
+ queue_entry_t *queue;
+ ring_spsc_t ring_spsc;
+
+ queue = q_int;
+ ring_spsc = queue->s.ring_spsc;
+
+ if (odp_unlikely(queue->s.status < QUEUE_STATUS_READY)) {
+ ODP_ERR("Bad queue status\n");
+ return -1;
+ }
+
+ return ring_spsc_enq_multi(ring_spsc, (void **)buf_hdr, num);
+}
+
+static inline int spsc_deq_multi(void *q_int, odp_buffer_hdr_t *buf_hdr[],
+ int num)
+{
+ queue_entry_t *queue;
+ ring_spsc_t ring_spsc;
+
+ queue = q_int;
+ ring_spsc = queue->s.ring_spsc;
+
+ if (odp_unlikely(queue->s.status < QUEUE_STATUS_READY)) {
+ /* Bad queue, or queue has been destroyed. */
+ return -1;
+ }
+
+ return ring_spsc_deq_multi(ring_spsc, (void **)buf_hdr, num);
+}
+
+static int queue_spsc_enq_multi(void *q_int, odp_buffer_hdr_t *buf_hdr[],
+ int num)
+{
+ return spsc_enq_multi(q_int, buf_hdr, num);
+}
+
+static int queue_spsc_enq(void *q_int, odp_buffer_hdr_t *buf_hdr)
+{
+ int ret;
+
+ ret = spsc_enq_multi(q_int, &buf_hdr, 1);
+
+ if (ret == 1)
+ return 0;
+ else
+ return -1;
+}
+
+static int queue_spsc_deq_multi(void *q_int, odp_buffer_hdr_t *buf_hdr[],
+ int num)
+{
+ return spsc_deq_multi(q_int, buf_hdr, num);
+}
+
+static odp_buffer_hdr_t *queue_spsc_deq(void *q_int)
+{
+ odp_buffer_hdr_t *buf_hdr = NULL;
+ int ret;
+
+ ret = spsc_deq_multi(q_int, &buf_hdr, 1);
+
+ if (ret == 1)
+ return buf_hdr;
+ else
+ return NULL;
+}
+
+void queue_spsc_init(queue_entry_t *queue, uint32_t queue_size)
+{
+ queue->s.enqueue = queue_spsc_enq;
+ queue->s.dequeue = queue_spsc_deq;
+ queue->s.enqueue_multi = queue_spsc_enq_multi;
+ queue->s.dequeue_multi = queue_spsc_deq_multi;
+
+ queue->s.ring_spsc = ring_spsc_create(queue->s.name, queue_size);
+ if (queue->s.ring_spsc == NULL)
+ ODP_ABORT("Creating SPSC ring failed\n");
+}