diff options
author | Bill Fischofer <bill.fischofer@linaro.org> | 2015-09-03 10:16:38 -0500 |
---|---|---|
committer | Maxim Uvarov <maxim.uvarov@linaro.org> | 2015-09-03 19:29:30 +0300 |
commit | 9c3fcc0e5c11cf8117672f63a346b80a66d72103 (patch) | |
tree | 368b4936f9c0036b1342916d0db9107c09ea5048 /platform/linux-generic | |
parent | bcf72306bf7cc61841bceedf9deedca5c4b79528 (diff) |
linux-generic: queue: correct handling of reorder completion
Fix a race condition that could cause events to be stuck in an ordered
queue's reorder queue.
Signed-off-by: Bill Fischofer <bill.fischofer@linaro.org>
Reviewed-and-Tested-by: Maxim Uvarov <maxim.uvarov@linaro.org>
Signed-off-by: Maxim Uvarov <maxim.uvarov@linaro.org>
Diffstat (limited to 'platform/linux-generic')
-rw-r--r-- | platform/linux-generic/include/odp_queue_internal.h | 36 | ||||
-rw-r--r-- | platform/linux-generic/odp_queue.c | 48 |
2 files changed, 48 insertions, 36 deletions
diff --git a/platform/linux-generic/include/odp_queue_internal.h b/platform/linux-generic/include/odp_queue_internal.h index f285ea3..48576bc 100644 --- a/platform/linux-generic/include/odp_queue_internal.h +++ b/platform/linux-generic/include/odp_queue_internal.h @@ -284,18 +284,38 @@ static inline int reorder_deq(queue_entry_t *queue, return deq_count; } -static inline int reorder_complete(odp_buffer_hdr_t *reorder_buf) +static inline void reorder_complete(queue_entry_t *origin_qe, + odp_buffer_hdr_t **reorder_buf_return, + odp_buffer_hdr_t **placeholder_buf, + int placeholder_append) { - odp_buffer_hdr_t *next_buf = reorder_buf->next; - uint64_t order = reorder_buf->order; + odp_buffer_hdr_t *reorder_buf = origin_qe->s.reorder_head; + odp_buffer_hdr_t *next_buf; + + *reorder_buf_return = NULL; + if (!placeholder_append) + *placeholder_buf = NULL; - while (reorder_buf->flags.sustain && - next_buf && next_buf->order == order) { - reorder_buf = next_buf; + while (reorder_buf && + reorder_buf->order <= origin_qe->s.order_out) { next_buf = reorder_buf->next; - } - return !reorder_buf->flags.sustain; + if (!reorder_buf->target_qe) { + origin_qe->s.reorder_head = next_buf; + reorder_buf->next = *placeholder_buf; + *placeholder_buf = reorder_buf; + + reorder_buf = next_buf; + order_release(origin_qe, 1); + } else if (reorder_buf->flags.sustain) { + reorder_buf = next_buf; + } else { + *reorder_buf_return = origin_qe->s.reorder_head; + origin_qe->s.reorder_head = + origin_qe->s.reorder_head->next; + break; + } + } } static inline void get_queue_order(queue_entry_t **origin_qe, uint64_t *order, diff --git a/platform/linux-generic/odp_queue.c b/platform/linux-generic/odp_queue.c index 87483b9..15abd93 100644 --- a/platform/linux-generic/odp_queue.c +++ b/platform/linux-generic/odp_queue.c @@ -458,18 +458,10 @@ int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr, int sustain) order_release(origin_qe, release_count + placeholder_count); /* Now handle any unblocked complete buffers destined for - * other queues. Note that these must be complete because - * otherwise another thread is working on it and is - * responsible for resolving order when it is complete. + * other queues, appending placeholder bufs as needed. */ UNLOCK(&queue->s.lock); - - if (reorder_buf && - reorder_buf->order <= origin_qe->s.order_out && - reorder_complete(reorder_buf)) - origin_qe->s.reorder_head = reorder_buf->next; - else - reorder_buf = NULL; + reorder_complete(origin_qe, &reorder_buf, &placeholder_buf, 1); UNLOCK(&origin_qe->s.lock); if (reorder_buf) @@ -827,12 +819,7 @@ int queue_pktout_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr, order_release(origin_qe, release_count + placeholder_count); /* Now handle sends to other queues that are ready to go */ - if (reorder_buf && - reorder_buf->order <= origin_qe->s.order_out && - reorder_complete(reorder_buf)) - origin_qe->s.reorder_head = reorder_buf->next; - else - reorder_buf = NULL; + reorder_complete(origin_qe, &reorder_buf, &placeholder_buf, 1); /* We're fully done with the origin_qe at last */ UNLOCK(&origin_qe->s.lock); @@ -910,23 +897,28 @@ int release_order(queue_entry_t *origin_qe, uint64_t order, order_release(origin_qe, 1); /* Check if this release allows us to unblock waiters. - * Note that we can only process complete waiters since - * if the sustain bit is set for a buffer this means that - * some other thread is working on it and will be - * responsible for resolving order when it is complete. + * At the point of this call, the reorder list may contain + * zero or more placeholders that need to be freed, followed + * by zero or one complete reorder buffer chain. */ - reorder_buf = origin_qe->s.reorder_head; - - if (reorder_buf && - reorder_buf->order <= origin_qe->s.order_out && - reorder_complete(reorder_buf)) - origin_qe->s.reorder_head = reorder_buf->next; - else - reorder_buf = NULL; + reorder_complete(origin_qe, &reorder_buf, + &placeholder_buf_hdr, 0); + /* Now safe to unlock */ UNLOCK(&origin_qe->s.lock); + + /* If reorder_buf has a target, do the enq now */ if (reorder_buf) queue_enq_internal(reorder_buf); + + while (placeholder_buf_hdr) { + odp_buffer_hdr_t *placeholder_next = + placeholder_buf_hdr->next; + + odp_buffer_free(placeholder_buf_hdr->handle.handle); + placeholder_buf_hdr = placeholder_next; + } + return 0; } |