aboutsummaryrefslogtreecommitdiff
path: root/platform/linux-dpdk/odp_queue_spsc.c
blob: d074510427fa820d67c8ea399160be43637e4380 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
/* Copyright (c) 2018, Linaro Limited
 * All rights reserved.
 *
 * SPDX-License-Identifier:     BSD-3-Clause
 */
#include <odp/api/hints.h>
#include <odp_queue_basic_internal.h>

#include <odp_debug_internal.h>

static inline int spsc_enq_multi(odp_queue_t handle,
				 odp_buffer_hdr_t *buf_hdr[], int num)
{
	queue_entry_t *queue;
	ring_spsc_t ring_spsc;

	queue = qentry_from_handle(handle);
	ring_spsc = queue->s.ring_spsc;

	if (odp_unlikely(queue->s.status < QUEUE_STATUS_READY)) {
		ODP_ERR("Bad queue status\n");
		return -1;
	}

	return ring_spsc_enq_multi(ring_spsc, (void **)buf_hdr, num);
}

static inline int spsc_deq_multi(odp_queue_t handle,
				 odp_buffer_hdr_t *buf_hdr[], int num)
{
	queue_entry_t *queue;
	ring_spsc_t ring_spsc;

	queue = qentry_from_handle(handle);
	ring_spsc = queue->s.ring_spsc;

	if (odp_unlikely(queue->s.status < QUEUE_STATUS_READY)) {
		/* Bad queue, or queue has been destroyed. */
		return -1;
	}

	return ring_spsc_deq_multi(ring_spsc, (void **)buf_hdr, num);
}

static int queue_spsc_enq_multi(odp_queue_t handle, odp_buffer_hdr_t *buf_hdr[],
				int num)
{
	return spsc_enq_multi(handle, buf_hdr, num);
}

static int queue_spsc_enq(odp_queue_t handle, odp_buffer_hdr_t *buf_hdr)
{
	int ret;

	ret = spsc_enq_multi(handle, &buf_hdr, 1);

	if (ret == 1)
		return 0;
	else
		return -1;
}

static int queue_spsc_deq_multi(odp_queue_t handle, odp_buffer_hdr_t *buf_hdr[],
				int num)
{
	return spsc_deq_multi(handle, buf_hdr, num);
}

static odp_buffer_hdr_t *queue_spsc_deq(odp_queue_t handle)
{
	odp_buffer_hdr_t *buf_hdr = NULL;
	int ret;

	ret = spsc_deq_multi(handle, &buf_hdr, 1);

	if (ret == 1)
		return buf_hdr;
	else
		return NULL;
}

void _odp_queue_spsc_init(queue_entry_t *queue, uint32_t queue_size)
{
	queue->s.enqueue = queue_spsc_enq;
	queue->s.dequeue = queue_spsc_deq;
	queue->s.enqueue_multi = queue_spsc_enq_multi;
	queue->s.dequeue_multi = queue_spsc_deq_multi;
	queue->s.orig_dequeue_multi = queue_spsc_deq_multi;

	queue->s.ring_spsc = ring_spsc_create(queue->s.name, queue_size);
	if (queue->s.ring_spsc == NULL)
		ODP_ABORT("Creating SPSC ring failed\n");
}