1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
|
/* Copyright (c) 2018, Linaro Limited
* All rights reserved.
*
* SPDX-License-Identifier: BSD-3-Clause
*/
#include <odp/api/hints.h>
#include <odp_queue_basic_internal.h>
#include <odp_debug_internal.h>
static inline int spsc_enq_multi(odp_queue_t handle,
odp_buffer_hdr_t *buf_hdr[], int num)
{
queue_entry_t *queue;
ring_spsc_t ring_spsc;
queue = qentry_from_handle(handle);
ring_spsc = queue->s.ring_spsc;
if (odp_unlikely(queue->s.status < QUEUE_STATUS_READY)) {
ODP_ERR("Bad queue status\n");
return -1;
}
return ring_spsc_enq_multi(ring_spsc, (void **)buf_hdr, num);
}
static inline int spsc_deq_multi(odp_queue_t handle,
odp_buffer_hdr_t *buf_hdr[], int num)
{
queue_entry_t *queue;
ring_spsc_t ring_spsc;
queue = qentry_from_handle(handle);
ring_spsc = queue->s.ring_spsc;
if (odp_unlikely(queue->s.status < QUEUE_STATUS_READY)) {
/* Bad queue, or queue has been destroyed. */
return -1;
}
return ring_spsc_deq_multi(ring_spsc, (void **)buf_hdr, num);
}
static int queue_spsc_enq_multi(odp_queue_t handle, odp_buffer_hdr_t *buf_hdr[],
int num)
{
return spsc_enq_multi(handle, buf_hdr, num);
}
static int queue_spsc_enq(odp_queue_t handle, odp_buffer_hdr_t *buf_hdr)
{
int ret;
ret = spsc_enq_multi(handle, &buf_hdr, 1);
if (ret == 1)
return 0;
else
return -1;
}
static int queue_spsc_deq_multi(odp_queue_t handle, odp_buffer_hdr_t *buf_hdr[],
int num)
{
return spsc_deq_multi(handle, buf_hdr, num);
}
static odp_buffer_hdr_t *queue_spsc_deq(odp_queue_t handle)
{
odp_buffer_hdr_t *buf_hdr = NULL;
int ret;
ret = spsc_deq_multi(handle, &buf_hdr, 1);
if (ret == 1)
return buf_hdr;
else
return NULL;
}
void _odp_queue_spsc_init(queue_entry_t *queue, uint32_t queue_size)
{
queue->s.enqueue = queue_spsc_enq;
queue->s.dequeue = queue_spsc_deq;
queue->s.enqueue_multi = queue_spsc_enq_multi;
queue->s.dequeue_multi = queue_spsc_deq_multi;
queue->s.orig_dequeue_multi = queue_spsc_deq_multi;
queue->s.ring_spsc = ring_spsc_create(queue->s.name, queue_size);
if (queue->s.ring_spsc == NULL)
ODP_ABORT("Creating SPSC ring failed\n");
}
|