aboutsummaryrefslogtreecommitdiff
path: root/platform/linux-generic
diff options
context:
space:
mode:
authorBill Fischofer <bill.fischofer@linaro.org>2015-09-03 10:16:38 -0500
committerMaxim Uvarov <maxim.uvarov@linaro.org>2015-09-03 19:29:30 +0300
commit9c3fcc0e5c11cf8117672f63a346b80a66d72103 (patch)
tree368b4936f9c0036b1342916d0db9107c09ea5048 /platform/linux-generic
parentbcf72306bf7cc61841bceedf9deedca5c4b79528 (diff)
linux-generic: queue: correct handling of reorder completion
Fix a race condition that could cause events to be stuck in an ordered queue's reorder queue. Signed-off-by: Bill Fischofer <bill.fischofer@linaro.org> Reviewed-and-Tested-by: Maxim Uvarov <maxim.uvarov@linaro.org> Signed-off-by: Maxim Uvarov <maxim.uvarov@linaro.org>
Diffstat (limited to 'platform/linux-generic')
-rw-r--r--platform/linux-generic/include/odp_queue_internal.h36
-rw-r--r--platform/linux-generic/odp_queue.c48
2 files changed, 48 insertions, 36 deletions
diff --git a/platform/linux-generic/include/odp_queue_internal.h b/platform/linux-generic/include/odp_queue_internal.h
index f285ea3..48576bc 100644
--- a/platform/linux-generic/include/odp_queue_internal.h
+++ b/platform/linux-generic/include/odp_queue_internal.h
@@ -284,18 +284,38 @@ static inline int reorder_deq(queue_entry_t *queue,
return deq_count;
}
-static inline int reorder_complete(odp_buffer_hdr_t *reorder_buf)
+static inline void reorder_complete(queue_entry_t *origin_qe,
+ odp_buffer_hdr_t **reorder_buf_return,
+ odp_buffer_hdr_t **placeholder_buf,
+ int placeholder_append)
{
- odp_buffer_hdr_t *next_buf = reorder_buf->next;
- uint64_t order = reorder_buf->order;
+ odp_buffer_hdr_t *reorder_buf = origin_qe->s.reorder_head;
+ odp_buffer_hdr_t *next_buf;
+
+ *reorder_buf_return = NULL;
+ if (!placeholder_append)
+ *placeholder_buf = NULL;
- while (reorder_buf->flags.sustain &&
- next_buf && next_buf->order == order) {
- reorder_buf = next_buf;
+ while (reorder_buf &&
+ reorder_buf->order <= origin_qe->s.order_out) {
next_buf = reorder_buf->next;
- }
- return !reorder_buf->flags.sustain;
+ if (!reorder_buf->target_qe) {
+ origin_qe->s.reorder_head = next_buf;
+ reorder_buf->next = *placeholder_buf;
+ *placeholder_buf = reorder_buf;
+
+ reorder_buf = next_buf;
+ order_release(origin_qe, 1);
+ } else if (reorder_buf->flags.sustain) {
+ reorder_buf = next_buf;
+ } else {
+ *reorder_buf_return = origin_qe->s.reorder_head;
+ origin_qe->s.reorder_head =
+ origin_qe->s.reorder_head->next;
+ break;
+ }
+ }
}
static inline void get_queue_order(queue_entry_t **origin_qe, uint64_t *order,
diff --git a/platform/linux-generic/odp_queue.c b/platform/linux-generic/odp_queue.c
index 87483b9..15abd93 100644
--- a/platform/linux-generic/odp_queue.c
+++ b/platform/linux-generic/odp_queue.c
@@ -458,18 +458,10 @@ int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr, int sustain)
order_release(origin_qe, release_count + placeholder_count);
/* Now handle any unblocked complete buffers destined for
- * other queues. Note that these must be complete because
- * otherwise another thread is working on it and is
- * responsible for resolving order when it is complete.
+ * other queues, appending placeholder bufs as needed.
*/
UNLOCK(&queue->s.lock);
-
- if (reorder_buf &&
- reorder_buf->order <= origin_qe->s.order_out &&
- reorder_complete(reorder_buf))
- origin_qe->s.reorder_head = reorder_buf->next;
- else
- reorder_buf = NULL;
+ reorder_complete(origin_qe, &reorder_buf, &placeholder_buf, 1);
UNLOCK(&origin_qe->s.lock);
if (reorder_buf)
@@ -827,12 +819,7 @@ int queue_pktout_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr,
order_release(origin_qe, release_count + placeholder_count);
/* Now handle sends to other queues that are ready to go */
- if (reorder_buf &&
- reorder_buf->order <= origin_qe->s.order_out &&
- reorder_complete(reorder_buf))
- origin_qe->s.reorder_head = reorder_buf->next;
- else
- reorder_buf = NULL;
+ reorder_complete(origin_qe, &reorder_buf, &placeholder_buf, 1);
/* We're fully done with the origin_qe at last */
UNLOCK(&origin_qe->s.lock);
@@ -910,23 +897,28 @@ int release_order(queue_entry_t *origin_qe, uint64_t order,
order_release(origin_qe, 1);
/* Check if this release allows us to unblock waiters.
- * Note that we can only process complete waiters since
- * if the sustain bit is set for a buffer this means that
- * some other thread is working on it and will be
- * responsible for resolving order when it is complete.
+ * At the point of this call, the reorder list may contain
+ * zero or more placeholders that need to be freed, followed
+ * by zero or one complete reorder buffer chain.
*/
- reorder_buf = origin_qe->s.reorder_head;
-
- if (reorder_buf &&
- reorder_buf->order <= origin_qe->s.order_out &&
- reorder_complete(reorder_buf))
- origin_qe->s.reorder_head = reorder_buf->next;
- else
- reorder_buf = NULL;
+ reorder_complete(origin_qe, &reorder_buf,
+ &placeholder_buf_hdr, 0);
+ /* Now safe to unlock */
UNLOCK(&origin_qe->s.lock);
+
+ /* If reorder_buf has a target, do the enq now */
if (reorder_buf)
queue_enq_internal(reorder_buf);
+
+ while (placeholder_buf_hdr) {
+ odp_buffer_hdr_t *placeholder_next =
+ placeholder_buf_hdr->next;
+
+ odp_buffer_free(placeholder_buf_hdr->handle.handle);
+ placeholder_buf_hdr = placeholder_next;
+ }
+
return 0;
}