aboutsummaryrefslogtreecommitdiff
path: root/platform
diff options
context:
space:
mode:
authorPetri Savolainen <petri.savolainen@linaro.org>2018-10-11 15:03:01 +0300
committerMaxim Uvarov <maxim.uvarov@linaro.org>2018-10-15 16:59:33 +0300
commit08620451ad0d82b092ac7516673644ab20ebc9bc (patch)
treee84e447db4abf59d36e77a06a444cf7dbd87703a /platform
parent83478c2c04ee939b69e09867f97be88ae5c9e684 (diff)
linux-gen: ring: add reader tail check
Reader tail index is needed to detect if a reader is so much behind that a writer is going to overwrite the data it is reading. Signed-off-by: Petri Savolainen <petri.savolainen@linaro.org> Reviewed-by: Bill Fischofer <bill.fischofer@linaro.org> Signed-off-by: Maxim Uvarov <maxim.uvarov@linaro.org>
Diffstat (limited to 'platform')
-rw-r--r--platform/linux-generic/include/odp_ring_internal.h52
1 files changed, 42 insertions, 10 deletions
diff --git a/platform/linux-generic/include/odp_ring_internal.h b/platform/linux-generic/include/odp_ring_internal.h
index 9a637afb3..ad2f37ef2 100644
--- a/platform/linux-generic/include/odp_ring_internal.h
+++ b/platform/linux-generic/include/odp_ring_internal.h
@@ -23,8 +23,9 @@ extern "C" {
* Ring stores head and tail counters. Ring indexes are formed from these
* counters with a mask (mask = ring_size - 1), which requires that ring size
* must be a power of two. Also ring size must be larger than the maximum
- * number of data items that will be stored on it (there's no check against
- * overwriting). */
+ * number of data items that will be stored on it as write operations are
+ * assumed to succeed eventually (after readers complete their current
+ * operations). */
typedef struct ODP_ALIGNED_CACHE {
/* Writer head and tail */
odp_atomic_u32_t w_head;
@@ -33,6 +34,7 @@ typedef struct ODP_ALIGNED_CACHE {
/* Reader head and tail */
odp_atomic_u32_t r_head;
+ odp_atomic_u32_t r_tail;
uint32_t data[0];
} ring_t;
@@ -53,6 +55,7 @@ static inline void ring_init(ring_t *ring)
odp_atomic_init_u32(&ring->w_head, 0);
odp_atomic_init_u32(&ring->w_tail, 0);
odp_atomic_init_u32(&ring->r_head, 0);
+ odp_atomic_init_u32(&ring->r_tail, 0);
}
/* Dequeue data from the ring head */
@@ -75,12 +78,19 @@ static inline uint32_t ring_deq(ring_t *ring, uint32_t mask, uint32_t *data)
new_head = head + 1;
} while (odp_unlikely(cas_mo_u32(&ring->r_head, &head, new_head,
- __ATOMIC_ACQ_REL,
+ __ATOMIC_ACQUIRE,
__ATOMIC_ACQUIRE) == 0));
- /* Read data. CAS acquire-release ensures that data read
- * does not move above from here. */
+ /* Read data. */
*data = ring->data[new_head & mask];
+
+ /* Wait until other readers have updated the tail */
+ while (odp_unlikely(odp_atomic_load_u32(&ring->r_tail) != head))
+ odp_cpu_pause();
+
+ /* Update the tail. Writers acquire it. */
+ odp_atomic_store_rel_u32(&ring->r_tail, new_head);
+
return 1;
}
@@ -110,14 +120,20 @@ static inline uint32_t ring_deq_multi(ring_t *ring, uint32_t mask,
new_head = head + num;
} while (odp_unlikely(cas_mo_u32(&ring->r_head, &head, new_head,
- __ATOMIC_ACQ_REL,
+ __ATOMIC_ACQUIRE,
__ATOMIC_ACQUIRE) == 0));
- /* Read data. CAS acquire-release ensures that data read
- * does not move above from here. */
+ /* Read data. */
for (i = 0; i < num; i++)
data[i] = ring->data[(head + 1 + i) & mask];
+ /* Wait until other readers have updated the tail */
+ while (odp_unlikely(odp_atomic_load_u32(&ring->r_tail) != head))
+ odp_cpu_pause();
+
+ /* Update the tail. Writers acquire it. */
+ odp_atomic_store_rel_u32(&ring->r_tail, new_head);
+
return num;
}
@@ -125,16 +141,24 @@ static inline uint32_t ring_deq_multi(ring_t *ring, uint32_t mask,
static inline void ring_enq(ring_t *ring, uint32_t mask, uint32_t data)
{
uint32_t old_head, new_head;
+ uint32_t size = mask + 1;
/* Reserve a slot in the ring for writing */
old_head = odp_atomic_fetch_inc_u32(&ring->w_head);
new_head = old_head + 1;
+ /* Wait for the last reader to finish. This prevents overwrite when
+ * a reader has been left behind (e.g. due to an interrupt) and is
+ * still reading the same slot. */
+ while (odp_unlikely(new_head - odp_atomic_load_acq_u32(&ring->r_tail)
+ >= size))
+ odp_cpu_pause();
+
/* Write data */
ring->data[new_head & mask] = data;
/* Wait until other writers have updated the tail */
- while (odp_unlikely(odp_atomic_load_acq_u32(&ring->w_tail) != old_head))
+ while (odp_unlikely(odp_atomic_load_u32(&ring->w_tail) != old_head))
odp_cpu_pause();
/* Release the new writer tail, readers acquire it. */
@@ -146,17 +170,25 @@ static inline void ring_enq_multi(ring_t *ring, uint32_t mask, uint32_t data[],
uint32_t num)
{
uint32_t old_head, new_head, i;
+ uint32_t size = mask + 1;
/* Reserve a slot in the ring for writing */
old_head = odp_atomic_fetch_add_u32(&ring->w_head, num);
new_head = old_head + 1;
+ /* Wait for the last reader to finish. This prevents overwrite when
+ * a reader has been left behind (e.g. due to an interrupt) and is
+ * still reading these slots. */
+ while (odp_unlikely(new_head - odp_atomic_load_acq_u32(&ring->r_tail)
+ >= size))
+ odp_cpu_pause();
+
/* Write data */
for (i = 0; i < num; i++)
ring->data[(new_head + i) & mask] = data[i];
/* Wait until other writers have updated the tail */
- while (odp_unlikely(odp_atomic_load_acq_u32(&ring->w_tail) != old_head))
+ while (odp_unlikely(odp_atomic_load_u32(&ring->w_tail) != old_head))
odp_cpu_pause();
/* Release the new writer tail, readers acquire it. */