aboutsummaryrefslogtreecommitdiff
path: root/platform/linux-generic/include/odp_llqueue.h
blob: e9cf9945eff0977c99e6a5320a7aec02672cccb0 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
/* Copyright (c) 2017, ARM Limited. All rights reserved.
 *
 * Copyright (c) 2017-2018, Linaro Limited
 * All rights reserved.
 *
 * SPDX-License-Identifier: BSD-3-Clause
 */

#ifndef ODP_LLQUEUE_H_
#define ODP_LLQUEUE_H_

#include <odp/api/cpu.h>
#include <odp/api/hints.h>
#include <odp/api/spinlock.h>

#include <odp_config_internal.h>
#include <odp_debug_internal.h>
#include <odp_cpu.h>

#include <stdint.h>
#include <stdlib.h>

/******************************************************************************
 * Linked list queues
 *****************************************************************************/

struct llqueue;
struct llnode;

static struct llnode *llq_head(struct llqueue *llq);
static void llqueue_init(struct llqueue *llq);
static void llq_enqueue(struct llqueue *llq, struct llnode *node);
static struct llnode *llq_dequeue(struct llqueue *llq);
static odp_bool_t llq_dequeue_cond(struct llqueue *llq, struct llnode *exp);
static odp_bool_t llq_cond_rotate(struct llqueue *llq, struct llnode *node);
static odp_bool_t llq_on_queue(struct llnode *node);

/******************************************************************************
 * The implementation(s)
 *****************************************************************************/

#define SENTINEL ((void *)~(uintptr_t)0)

#ifdef CONFIG_LLDSCD
/* Implement queue operations using double-word LL/SC */

/* The scalar equivalent of a double pointer */
#if __SIZEOF_PTRDIFF_T__ == 4
typedef uint64_t dintptr_t;
#endif
#if __SIZEOF_PTRDIFF_T__ == 8
__extension__ typedef __int128 dintptr_t;
#endif

struct llnode {
	struct llnode *next;
};

union llht {
	struct {
		struct llnode *head, *tail;
	} st;
	dintptr_t ui;
};

struct llqueue {
	union llht u;
};

static inline struct llnode *llq_head(struct llqueue *llq)
{
	return __atomic_load_n(&llq->u.st.head, __ATOMIC_RELAXED);
}

static inline void llqueue_init(struct llqueue *llq)
{
	llq->u.st.head = NULL;
	llq->u.st.tail = NULL;
}

static inline void llq_enqueue(struct llqueue *llq, struct llnode *node)
{
	union llht old, neu;

	ODP_ASSERT(node->next == NULL);
	node->next = SENTINEL;
	do {
		old.ui = lld(&llq->u.ui, __ATOMIC_RELAXED);
		neu.st.head = old.st.head == NULL ? node : old.st.head;
		neu.st.tail = node;
	} while (odp_unlikely(scd(&llq->u.ui, neu.ui, __ATOMIC_RELEASE)));
	if (old.st.tail != NULL) {
		/* List was not empty */
		ODP_ASSERT(old.st.tail->next == SENTINEL);
		old.st.tail->next = node;
	}
}

static inline struct llnode *llq_dequeue(struct llqueue *llq)
{
	struct llnode *head;
	union llht old, neu;

	/* llq_dequeue() may be used in a busy-waiting fashion
	 * Read head using plain load to avoid disturbing remote LL/SC
	 */
	head = __atomic_load_n(&llq->u.st.head, __ATOMIC_ACQUIRE);
	if (head == NULL)
		return NULL;
	/* Read head->next before LL to minimize cache miss latency
	 * in LL/SC below
	 */
	(void)__atomic_load_n(&head->next, __ATOMIC_RELAXED);

	do {
		old.ui = lld(&llq->u.ui, __ATOMIC_RELAXED);
		if (odp_unlikely(old.st.head == NULL)) {
			/* Empty list */
			return NULL;
		} else if (odp_unlikely(old.st.head == old.st.tail)) {
			/* Single-element in list */
			neu.st.head = NULL;
			neu.st.tail = NULL;
		} else {
			/* Multi-element list, dequeue head */
			struct llnode *next;
			/* Wait until llq_enqueue() has written true next
			 * pointer
			 */
			while ((next = __atomic_load_n(&old.st.head->next,
						       __ATOMIC_RELAXED)) ==
				SENTINEL)
				odp_cpu_pause();
			neu.st.head = next;
			neu.st.tail = old.st.tail;
		}
	} while (odp_unlikely(scd(&llq->u.ui, neu.ui, __ATOMIC_RELAXED)));
	old.st.head->next = NULL;
	return old.st.head;
}

static inline odp_bool_t llq_dequeue_cond(struct llqueue *llq,
					  struct llnode *exp)
{
	union llht old, neu;

	do {
		old.ui = lld(&llq->u.ui, __ATOMIC_ACQUIRE);
		if (odp_unlikely(old.st.head == NULL || old.st.head != exp)) {
			/* Empty list or wrong head */
			return false;
		} else if (odp_unlikely(old.st.head == old.st.tail)) {
			/* Single-element in list */
			neu.st.head = NULL;
			neu.st.tail = NULL;
		} else {
			/* Multi-element list, dequeue head */
			struct llnode *next;

			/* Wait until llq_enqueue() has written true next
			 * pointer */
			while ((next = __atomic_load_n(&old.st.head->next,
						       __ATOMIC_RELAXED)) ==
				SENTINEL)
				odp_cpu_pause();

			neu.st.head = next;
			neu.st.tail = old.st.tail;
		}
	} while (odp_unlikely(scd(&llq->u.ui, neu.ui, __ATOMIC_RELAXED)));
	old.st.head->next = NULL;
	return true;
}

/* If 'node' is a head of llq then move it to tail */
static inline odp_bool_t llq_cond_rotate(struct llqueue *llq,
					 struct llnode *node)
{
	/* Difficult to make this into a single atomic operation
	 * Instead use existing primitives.
	 */
	if (odp_likely(llq_dequeue_cond(llq, node))) {
		llq_enqueue(llq, node);
		return true;
	}
	return false;
}

static inline odp_bool_t llq_on_queue(struct llnode *node)
{
	return node->next != NULL;
}

#else
/* Implement queue operations protected by a spin lock */

struct llnode {
	struct llnode *next;
};

struct llqueue {
	struct llnode *head, *tail;
	odp_spinlock_t lock;
};

static inline struct llnode *llq_head(struct llqueue *llq)
{
	return __atomic_load_n(&llq->head, __ATOMIC_RELAXED);
}

static inline void llqueue_init(struct llqueue *llq)
{
	llq->head = NULL;
	llq->tail = NULL;
	odp_spinlock_init(&llq->lock);
}

static inline void llq_enqueue(struct llqueue *llq, struct llnode *node)
{
	ODP_ASSERT(node->next == NULL);
	node->next = SENTINEL;

	odp_spinlock_lock(&llq->lock);
	if (llq->head == NULL) {
		llq->head = node;
		llq->tail = node;
	} else {
		llq->tail->next = node;
		llq->tail = node;
	}
	odp_spinlock_unlock(&llq->lock);
}

static inline struct llnode *llq_dequeue(struct llqueue *llq)
{
	struct llnode *head;
	struct llnode *node = NULL;

	head = __atomic_load_n(&llq->head, __ATOMIC_RELAXED);
	if (head == NULL)
		return NULL;

	odp_spinlock_lock(&llq->lock);
	if (llq->head != NULL) {
		node = llq->head;
		if (llq->head == llq->tail) {
			ODP_ASSERT(node->next == SENTINEL);
			llq->head = NULL;
			llq->tail = NULL;
		} else {
			ODP_ASSERT(node->next != SENTINEL);
			llq->head = node->next;
		}
		node->next = NULL;
	}
	odp_spinlock_unlock(&llq->lock);
	return node;
}

static inline odp_bool_t llq_dequeue_cond(struct llqueue *llq,
					  struct llnode *node)
{
	odp_bool_t success = false;

	odp_spinlock_lock(&llq->lock);
	if (odp_likely(llq->head != NULL && llq->head == node)) {
		success = true;
		if (llq->head == llq->tail) {
			ODP_ASSERT(node->next == SENTINEL);
			llq->head = NULL;
			llq->tail = NULL;
		} else {
			ODP_ASSERT(node->next != SENTINEL);
			llq->head = node->next;
		}
		node->next = NULL;
	}
	odp_spinlock_unlock(&llq->lock);
	return success;
}

/* If 'node' is a head of llq then move it to tail */
static inline odp_bool_t llq_cond_rotate(struct llqueue *llq,
					 struct llnode *node)
{
	odp_bool_t success = false;

	odp_spinlock_lock(&llq->lock);
	if (odp_likely(llq->head == node)) {
		success = true;
		if (llq->tail != node) {
			ODP_ASSERT(node->next != SENTINEL);
			llq->head = node->next;
			llq->tail->next = node;
			llq->tail = node;
			node->next = SENTINEL;
		}
		/* Else 'node' is only element on list => nothing to do */
	}
	odp_spinlock_unlock(&llq->lock);
	return success;
}

static inline odp_bool_t llq_on_queue(struct llnode *node)
{
	return node->next != NULL;
}

#endif

#endif