diff options
-rw-r--r-- | platform/linux-generic/include/odp_buffer_internal.h | 7 | ||||
-rw-r--r-- | platform/linux-generic/odp_queue.c | 124 | ||||
-rw-r--r-- | platform/linux-generic/odp_schedule_ordered.c | 19 |
3 files changed, 113 insertions, 37 deletions
diff --git a/platform/linux-generic/include/odp_buffer_internal.h b/platform/linux-generic/include/odp_buffer_internal.h index 7b0ef8b0..69daf94b 100644 --- a/platform/linux-generic/include/odp_buffer_internal.h +++ b/platform/linux-generic/include/odp_buffer_internal.h @@ -103,6 +103,8 @@ typedef union odp_buffer_bits_t { }; } odp_buffer_bits_t; +#define BUFFER_BURST_SIZE 8 + /* Common buffer header */ struct odp_buffer_hdr_t { struct odp_buffer_hdr_t *next; /* next buf in a list--keep 1st */ @@ -111,6 +113,11 @@ struct odp_buffer_hdr_t { struct odp_buffer_hdr_t *link; }; odp_buffer_bits_t handle; /* handle */ + + int burst_num; + int burst_first; + struct odp_buffer_hdr_t *burst[BUFFER_BURST_SIZE]; + union { uint32_t all; struct { diff --git a/platform/linux-generic/odp_queue.c b/platform/linux-generic/odp_queue.c index 80d99e88..5b962e9f 100644 --- a/platform/linux-generic/odp_queue.c +++ b/platform/linux-generic/odp_queue.c @@ -388,20 +388,48 @@ static inline int enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], { int sched = 0; int i, ret; - odp_buffer_hdr_t *tail; - - /* Chain input buffers together */ - for (i = 0; i < num - 1; i++) - buf_hdr[i]->next = buf_hdr[i + 1]; - - tail = buf_hdr[num - 1]; - buf_hdr[num - 1]->next = NULL; + odp_buffer_hdr_t *hdr, *tail, *next_hdr; + /* Ordered queues do not use bursts */ if (sched_fn->ord_enq_multi(queue->s.index, (void **)buf_hdr, num, sustain, &ret)) return ret; - /* Handle unordered enqueues */ + /* Optimize the common case of single enqueue */ + if (num == 1) { + tail = buf_hdr[0]; + hdr = tail; + hdr->burst_num = 0; + } else { + int next; + + /* Start from the last buffer header */ + tail = buf_hdr[num - 1]; + hdr = tail; + next = num - 2; + + while (1) { + /* Build a burst. The buffer header carrying + * a burst is the last buffer of the burst. */ + for (i = 0; next >= 0 && i < BUFFER_BURST_SIZE; + i++, next--) + hdr->burst[BUFFER_BURST_SIZE - 1 - i] = + buf_hdr[next]; + + hdr->burst_num = i; + hdr->burst_first = BUFFER_BURST_SIZE - i; + + if (odp_likely(next < 0)) + break; + + /* Get another header and link it */ + next_hdr = hdr; + hdr = buf_hdr[next]; + hdr->next = next_hdr; + next--; + } + } + LOCK(&queue->s.lock); if (odp_unlikely(queue->s.status < QUEUE_STATUS_READY)) { UNLOCK(&queue->s.lock); @@ -411,9 +439,9 @@ static inline int enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], /* Empty queue */ if (queue->s.head == NULL) - queue->s.head = buf_hdr[0]; + queue->s.head = hdr; else - queue->s.tail->next = buf_hdr[0]; + queue->s.tail->next = hdr; queue->s.tail = tail; @@ -483,9 +511,9 @@ int odp_queue_enq(odp_queue_t handle, odp_event_t ev) static inline int deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num) { - odp_buffer_hdr_t *hdr; - int i; - uint32_t j; + odp_buffer_hdr_t *hdr, *next; + int i, j; + int updated = 0; LOCK(&queue->s.lock); if (odp_unlikely(queue->s.status < QUEUE_STATUS_READY)) { @@ -506,33 +534,65 @@ static inline int deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], return 0; } - for (i = 0; i < num && hdr; i++) { - buf_hdr[i] = hdr; - hdr = hdr->next; - buf_hdr[i]->next = NULL; - if (queue_is_ordered(queue)) { - buf_hdr[i]->origin_qe = queue; - buf_hdr[i]->order = queue->s.order_in++; - for (j = 0; j < queue->s.param.sched.lock_count; j++) { - buf_hdr[i]->sync[j] = + for (i = 0; i < num && hdr; ) { + int burst_num = hdr->burst_num; + int first = hdr->burst_first; + + /* First, get bursted buffers */ + for (j = 0; j < burst_num && i < num; j++, i++) { + buf_hdr[i] = hdr->burst[first + j]; + odp_prefetch(buf_hdr[i]); + } + + if (burst_num) { + hdr->burst_num = burst_num - j; + hdr->burst_first = first + j; + } + + if (i == num) + break; + + /* When burst is empty, consume the current buffer header and + * move to the next header */ + buf_hdr[i] = hdr; + next = hdr->next; + hdr->next = NULL; + hdr = next; + updated++; + i++; + } + + /* Ordered queue book keeping inside the lock */ + if (queue_is_ordered(queue)) { + for (j = 0; j < i; j++) { + uint32_t k; + + buf_hdr[j]->origin_qe = queue; + buf_hdr[j]->order = queue->s.order_in++; + for (k = 0; k < queue->s.param.sched.lock_count; k++) { + buf_hdr[j]->sync[k] = odp_atomic_fetch_inc_u64 - (&queue->s.sync_in[j]); + (&queue->s.sync_in[k]); } - buf_hdr[i]->flags.sustain = SUSTAIN_ORDER; - } else { - buf_hdr[i]->origin_qe = NULL; + buf_hdr[j]->flags.sustain = SUSTAIN_ORDER; } } - queue->s.head = hdr; + /* Write head only if updated */ + if (updated) + queue->s.head = hdr; - if (hdr == NULL) { - /* Queue is now empty */ + /* Queue is empty */ + if (hdr == NULL) queue->s.tail = NULL; - } UNLOCK(&queue->s.lock); + /* Init origin_qe for non-ordered queues */ + if (!queue_is_ordered(queue)) + for (j = 0; j < i; j++) + buf_hdr[j]->origin_qe = NULL; + return i; } @@ -543,7 +603,7 @@ int queue_deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num) odp_buffer_hdr_t *queue_deq(queue_entry_t *queue) { - odp_buffer_hdr_t *buf_hdr; + odp_buffer_hdr_t *buf_hdr = NULL; int ret; ret = deq_multi(queue, &buf_hdr, 1); diff --git a/platform/linux-generic/odp_schedule_ordered.c b/platform/linux-generic/odp_schedule_ordered.c index 92a1cc85..84121838 100644 --- a/platform/linux-generic/odp_schedule_ordered.c +++ b/platform/linux-generic/odp_schedule_ordered.c @@ -457,15 +457,24 @@ int schedule_ordered_queue_enq_multi(uint32_t queue_index, void *p_buf_hdr[], { queue_entry_t *origin_qe; uint64_t order; - int rc; + int i, rc; queue_entry_t *qe = get_qentry(queue_index); - odp_buffer_hdr_t *buf_hdr = p_buf_hdr[0]; + odp_buffer_hdr_t *first_hdr = p_buf_hdr[0]; + odp_buffer_hdr_t **buf_hdr = (odp_buffer_hdr_t **)p_buf_hdr; + + /* Chain input buffers together */ + for (i = 0; i < num - 1; i++) { + buf_hdr[i]->next = buf_hdr[i + 1]; + buf_hdr[i]->burst_num = 0; + } + + buf_hdr[num - 1]->next = NULL; /* Handle ordered enqueues commonly via links */ - get_queue_order(&origin_qe, &order, buf_hdr); + get_queue_order(&origin_qe, &order, first_hdr); if (origin_qe) { - buf_hdr->link = buf_hdr->next; - rc = ordered_queue_enq(qe, buf_hdr, sustain, + first_hdr->link = first_hdr->next; + rc = ordered_queue_enq(qe, first_hdr, sustain, origin_qe, order); *ret = rc == 0 ? num : rc; return 1; |