aboutsummaryrefslogtreecommitdiff
path: root/platform/linux-generic/odp_queue_lf.c
diff options
context:
space:
mode:
authorPetri Savolainen <petri.savolainen@linaro.org>2018-06-20 13:03:18 +0300
committerMaxim Uvarov <maxim.uvarov@linaro.org>2018-06-25 23:30:12 +0300
commit3d79b95543b6043a9df9f523f81416a30052a9ce (patch)
tree8cc764a3f6f91996ea2bde55786305e546aa7f3b /platform/linux-generic/odp_queue_lf.c
parent3c335400833959b9a96674984c797e8ec4734566 (diff)
linux-gen: queue_lf: fix event ordering issue
New enqueues may happen during a dequeue operation is searching for the lowest counter value. If the first enqueue (with lower counter value) added a node before the current dequeue search index and the second enqueue after the index, dequeue returned these events in wrong order. After finding the lowest counter value, dequeue needs to search nodes before that node again. Also change CAS operation memory model to acq-rel, so that the first enqueue from a thread is always visible before the second enqueue (from the same thread). Signed-off-by: Petri Savolainen <petri.savolainen@linaro.org> Reviewed-by: Bill Fischofer <bill.fischofer@linaro.org> Signed-off-by: Maxim Uvarov <maxim.uvarov@linaro.org>
Diffstat (limited to 'platform/linux-generic/odp_queue_lf.c')
-rw-r--r--platform/linux-generic/odp_queue_lf.c133
1 files changed, 62 insertions, 71 deletions
diff --git a/platform/linux-generic/odp_queue_lf.c b/platform/linux-generic/odp_queue_lf.c
index e4902a3f5..3ec32524b 100644
--- a/platform/linux-generic/odp_queue_lf.c
+++ b/platform/linux-generic/odp_queue_lf.c
@@ -36,25 +36,14 @@ static inline void atomic_zero_u128(u128_t *atomic)
*/
#include <odp_cpu.h>
-static inline int atomic_cas_rel_u128(u128_t *atomic, u128_t old_val,
- u128_t new_val)
+static inline int atomic_cas_acq_rel_u128(u128_t *atomic, u128_t old_val,
+ u128_t new_val)
{
return __lockfree_compare_exchange_16((__int128 *)atomic,
(__int128 *)&old_val,
new_val,
0,
- __ATOMIC_RELEASE,
- __ATOMIC_RELAXED);
-}
-
-static inline int atomic_cas_acq_u128(u128_t *atomic, u128_t old_val,
- u128_t new_val)
-{
- return __lockfree_compare_exchange_16((__int128 *)atomic,
- (__int128 *)&old_val,
- new_val,
- 0,
- __ATOMIC_ACQUIRE,
+ __ATOMIC_ACQ_REL,
__ATOMIC_RELAXED);
}
@@ -75,21 +64,12 @@ static inline u128_t atomic_load_u128(u128_t *atomic)
return __atomic_load_n(atomic, __ATOMIC_RELAXED);
}
-static inline int atomic_cas_rel_u128(u128_t *atomic, u128_t old_val,
- u128_t new_val)
+static inline int atomic_cas_acq_rel_u128(u128_t *atomic, u128_t old_val,
+ u128_t new_val)
{
return __atomic_compare_exchange_n(atomic, &old_val, new_val,
0 /* strong */,
- __ATOMIC_RELEASE,
- __ATOMIC_RELAXED);
-}
-
-static inline int atomic_cas_acq_u128(u128_t *atomic, u128_t old_val,
- u128_t new_val)
-{
- return __atomic_compare_exchange_n(atomic, &old_val, new_val,
- 0 /* strong */,
- __ATOMIC_ACQUIRE,
+ __ATOMIC_ACQ_REL,
__ATOMIC_RELAXED);
}
@@ -120,18 +100,17 @@ static inline void atomic_zero_u128(u128_t *atomic)
atomic->u64[1] = 0;
}
-static inline int atomic_cas_rel_u128(u128_t *atomic, u128_t old_val,
- u128_t new_val)
+static inline int atomic_cas_acq_rel_u128(u128_t *atomic, u128_t old_val,
+ u128_t new_val)
{
- (void)old_val;
- *atomic = new_val;
- return 1;
-}
+ if (atomic->u64[0] == old_val.u64[0] &&
+ atomic->u64[1] == old_val.u64[1]) {
+ atomic->u64[0] = new_val.u64[0];
+ atomic->u64[1] = new_val.u64[1];
+ return 1;
+ }
-static inline int atomic_cas_acq_u128(u128_t *atomic, u128_t old_val,
- u128_t new_val)
-{
- return atomic_cas_rel_u128(atomic, old_val, new_val);
+ return 0;
}
static inline int atomic_is_lockfree_u128(void)
@@ -146,15 +125,12 @@ typedef union {
u128_t u128;
struct {
- /* 0: empty, 1: data */
- uint64_t mark: 1;
+ /* Data with lowest counter value is the head. Empty node has
+ * counter value 0. */
+ uint64_t counter;
- /* A cache aligned pointer fits into 63 bits, since the least
- * significant bits are zero. */
- uint64_t ptr: 63;
-
- /* Data with lowest counter value is the head. */
- uint64_t count;
+ /* Data pointer */
+ uint64_t ptr;
} s;
} ring_lf_node_t;
@@ -190,30 +166,31 @@ static int queue_lf_enq(void *q_int, odp_buffer_hdr_t *buf_hdr)
{
queue_entry_t *queue;
queue_lf_t *queue_lf;
- int i, j, i_node;
+ int i, j, idx;
int found;
ring_lf_node_t node_val;
ring_lf_node_t new_val;
ring_lf_node_t *node;
- uint64_t counter;
queue = q_int;
queue_lf = queue->s.queue_lf;
- i_node = 0;
+ new_val.s.ptr = (uintptr_t)buf_hdr;
+ new_val.s.counter = odp_atomic_fetch_inc_u64(&queue_lf->enq_counter);
- counter = odp_atomic_fetch_inc_u64(&queue_lf->enq_counter);
+ idx = 0;
for (j = 0; j < ENQ_RETRIES; j++) {
found = 0;
/* Find empty node */
for (i = 0; i < RING_LF_SIZE; i++) {
- i_node = next_idx(i_node);
- node = &queue_lf->node[i_node];
+ node = &queue_lf->node[idx];
+ idx = next_idx(idx);
+
node_val.u128 = atomic_load_u128(&node->u128);
- if (node_val.s.mark == 0) {
+ if (node_val.s.counter == 0) {
found = 1;
break;
}
@@ -224,12 +201,8 @@ static int queue_lf_enq(void *q_int, odp_buffer_hdr_t *buf_hdr)
return -1;
/* Try to insert data */
- new_val.s.mark = 1;
- new_val.s.count = counter;
- new_val.s.ptr = ((uintptr_t)buf_hdr) >> 1;
-
- if (atomic_cas_rel_u128(&node->u128, node_val.u128,
- new_val.u128))
+ if (atomic_cas_acq_rel_u128(&node->u128, node_val.u128,
+ new_val.u128))
return 0;
}
@@ -251,15 +224,18 @@ static odp_buffer_hdr_t *queue_lf_deq(void *q_int)
{
queue_entry_t *queue;
queue_lf_t *queue_lf;
- int i, j, i_node;
+ int i, j, i_lowest;
int found;
ring_lf_node_t node_val, old_val, new_val;
ring_lf_node_t *node, *old;
- uint64_t lowest;
+ uint64_t lowest, counter;
+ odp_buffer_hdr_t *buf_hdr;
queue = q_int;
queue_lf = queue->s.queue_lf;
- i_node = 0;
+ new_val.s.counter = 0;
+ new_val.s.ptr = 0;
+ old = NULL;
for (j = 0; j < DEQ_RETRIES; j++) {
found = 0;
@@ -268,14 +244,15 @@ static odp_buffer_hdr_t *queue_lf_deq(void *q_int)
/* Find the head node. The one with data and
* the lowest counter. */
for (i = 0; i < RING_LF_SIZE; i++) {
- i_node = next_idx(i_node);
- node = &queue_lf->node[i_node];
+ node = &queue_lf->node[i];
node_val.u128 = atomic_load_u128(&node->u128);
+ counter = node_val.s.counter;
- if (node_val.s.mark == 1 && node_val.s.count < lowest) {
+ if (counter && counter < lowest) {
old = node;
old_val.u128 = node_val.u128;
- lowest = node_val.s.count;
+ lowest = counter;
+ i_lowest = i;
found = 1;
}
}
@@ -284,13 +261,27 @@ static odp_buffer_hdr_t *queue_lf_deq(void *q_int)
if (found == 0)
return NULL;
- /* Try to remove data */
- new_val.u128 = old_val.u128;
- new_val.s.mark = 0;
+ /* New data may have been written to the area we searched before
+ * finding the current lowest. Check that there are no lower
+ * values. */
+ for (i = 0; i < i_lowest; i++) {
+ node = &queue_lf->node[i];
+ node_val.u128 = atomic_load_u128(&node->u128);
+ counter = node_val.s.counter;
+
+ if (counter && counter < lowest) {
+ old = node;
+ old_val.u128 = node_val.u128;
+ lowest = counter;
+ }
+ }
- if (atomic_cas_acq_u128(&old->u128, old_val.u128,
- new_val.u128))
- return (void *)(((uintptr_t)old_val.s.ptr) << 1);
+ buf_hdr = (void *)(uintptr_t)old_val.s.ptr;
+
+ /* Try to remove data */
+ if (atomic_cas_acq_rel_u128(&old->u128, old_val.u128,
+ new_val.u128))
+ return buf_hdr;
}
return NULL;
@@ -363,7 +354,7 @@ static void init_queue(queue_lf_t *queue_lf)
{
int i;
- odp_atomic_init_u64(&queue_lf->enq_counter, 0);
+ odp_atomic_init_u64(&queue_lf->enq_counter, 1);
for (i = 0; i < RING_LF_SIZE; i++)
atomic_zero_u128(&queue_lf->node[i].u128);