diff options
author | Petri Savolainen <petri.savolainen@linaro.org> | 2018-10-11 15:03:01 +0300 |
---|---|---|
committer | Maxim Uvarov <maxim.uvarov@linaro.org> | 2018-10-15 16:59:33 +0300 |
commit | 08620451ad0d82b092ac7516673644ab20ebc9bc (patch) | |
tree | e84e447db4abf59d36e77a06a444cf7dbd87703a /platform | |
parent | 83478c2c04ee939b69e09867f97be88ae5c9e684 (diff) |
linux-gen: ring: add reader tail check
Reader tail index is needed to detect if a reader is so much
behind that a writer is going to overwrite the data it is
reading.
Signed-off-by: Petri Savolainen <petri.savolainen@linaro.org>
Reviewed-by: Bill Fischofer <bill.fischofer@linaro.org>
Signed-off-by: Maxim Uvarov <maxim.uvarov@linaro.org>
Diffstat (limited to 'platform')
-rw-r--r-- | platform/linux-generic/include/odp_ring_internal.h | 52 |
1 files changed, 42 insertions, 10 deletions
diff --git a/platform/linux-generic/include/odp_ring_internal.h b/platform/linux-generic/include/odp_ring_internal.h index 9a637afb3..ad2f37ef2 100644 --- a/platform/linux-generic/include/odp_ring_internal.h +++ b/platform/linux-generic/include/odp_ring_internal.h @@ -23,8 +23,9 @@ extern "C" { * Ring stores head and tail counters. Ring indexes are formed from these * counters with a mask (mask = ring_size - 1), which requires that ring size * must be a power of two. Also ring size must be larger than the maximum - * number of data items that will be stored on it (there's no check against - * overwriting). */ + * number of data items that will be stored on it as write operations are + * assumed to succeed eventually (after readers complete their current + * operations). */ typedef struct ODP_ALIGNED_CACHE { /* Writer head and tail */ odp_atomic_u32_t w_head; @@ -33,6 +34,7 @@ typedef struct ODP_ALIGNED_CACHE { /* Reader head and tail */ odp_atomic_u32_t r_head; + odp_atomic_u32_t r_tail; uint32_t data[0]; } ring_t; @@ -53,6 +55,7 @@ static inline void ring_init(ring_t *ring) odp_atomic_init_u32(&ring->w_head, 0); odp_atomic_init_u32(&ring->w_tail, 0); odp_atomic_init_u32(&ring->r_head, 0); + odp_atomic_init_u32(&ring->r_tail, 0); } /* Dequeue data from the ring head */ @@ -75,12 +78,19 @@ static inline uint32_t ring_deq(ring_t *ring, uint32_t mask, uint32_t *data) new_head = head + 1; } while (odp_unlikely(cas_mo_u32(&ring->r_head, &head, new_head, - __ATOMIC_ACQ_REL, + __ATOMIC_ACQUIRE, __ATOMIC_ACQUIRE) == 0)); - /* Read data. CAS acquire-release ensures that data read - * does not move above from here. */ + /* Read data. */ *data = ring->data[new_head & mask]; + + /* Wait until other readers have updated the tail */ + while (odp_unlikely(odp_atomic_load_u32(&ring->r_tail) != head)) + odp_cpu_pause(); + + /* Update the tail. Writers acquire it. */ + odp_atomic_store_rel_u32(&ring->r_tail, new_head); + return 1; } @@ -110,14 +120,20 @@ static inline uint32_t ring_deq_multi(ring_t *ring, uint32_t mask, new_head = head + num; } while (odp_unlikely(cas_mo_u32(&ring->r_head, &head, new_head, - __ATOMIC_ACQ_REL, + __ATOMIC_ACQUIRE, __ATOMIC_ACQUIRE) == 0)); - /* Read data. CAS acquire-release ensures that data read - * does not move above from here. */ + /* Read data. */ for (i = 0; i < num; i++) data[i] = ring->data[(head + 1 + i) & mask]; + /* Wait until other readers have updated the tail */ + while (odp_unlikely(odp_atomic_load_u32(&ring->r_tail) != head)) + odp_cpu_pause(); + + /* Update the tail. Writers acquire it. */ + odp_atomic_store_rel_u32(&ring->r_tail, new_head); + return num; } @@ -125,16 +141,24 @@ static inline uint32_t ring_deq_multi(ring_t *ring, uint32_t mask, static inline void ring_enq(ring_t *ring, uint32_t mask, uint32_t data) { uint32_t old_head, new_head; + uint32_t size = mask + 1; /* Reserve a slot in the ring for writing */ old_head = odp_atomic_fetch_inc_u32(&ring->w_head); new_head = old_head + 1; + /* Wait for the last reader to finish. This prevents overwrite when + * a reader has been left behind (e.g. due to an interrupt) and is + * still reading the same slot. */ + while (odp_unlikely(new_head - odp_atomic_load_acq_u32(&ring->r_tail) + >= size)) + odp_cpu_pause(); + /* Write data */ ring->data[new_head & mask] = data; /* Wait until other writers have updated the tail */ - while (odp_unlikely(odp_atomic_load_acq_u32(&ring->w_tail) != old_head)) + while (odp_unlikely(odp_atomic_load_u32(&ring->w_tail) != old_head)) odp_cpu_pause(); /* Release the new writer tail, readers acquire it. */ @@ -146,17 +170,25 @@ static inline void ring_enq_multi(ring_t *ring, uint32_t mask, uint32_t data[], uint32_t num) { uint32_t old_head, new_head, i; + uint32_t size = mask + 1; /* Reserve a slot in the ring for writing */ old_head = odp_atomic_fetch_add_u32(&ring->w_head, num); new_head = old_head + 1; + /* Wait for the last reader to finish. This prevents overwrite when + * a reader has been left behind (e.g. due to an interrupt) and is + * still reading these slots. */ + while (odp_unlikely(new_head - odp_atomic_load_acq_u32(&ring->r_tail) + >= size)) + odp_cpu_pause(); + /* Write data */ for (i = 0; i < num; i++) ring->data[(new_head + i) & mask] = data[i]; /* Wait until other writers have updated the tail */ - while (odp_unlikely(odp_atomic_load_acq_u32(&ring->w_tail) != old_head)) + while (odp_unlikely(odp_atomic_load_u32(&ring->w_tail) != old_head)) odp_cpu_pause(); /* Release the new writer tail, readers acquire it. */ |