diff options
author | Petri Savolainen <petri.savolainen@linaro.org> | 2014-02-14 15:56:45 +0200 |
---|---|---|
committer | Maxim Uvarov <maxim.uvarov@linaro.org> | 2014-02-17 13:21:04 +0400 |
commit | 87c1485d91c4b2b23b2e94547bed17af930e3f07 (patch) | |
tree | be5d7e7ff9da4431ddf78a01e19f60312b9d14e5 /platform | |
parent | 1a3bba72dd9ce0751139cab6fd231f1d2ec20efd (diff) |
Multi-enq and deq operations
- Added multi-buffer enqueue and dequeue operations for queues
- And scheduler/packet IO support for those
- Added/renamed schedule functions
- _poll removed from functions names (default operation)
- former odp_schedule() is now odp_schedule_once()
- Enabled -O3 optimization (again)
Signed-off-by: Petri Savolainen <petri.savolainen@linaro.org>
Diffstat (limited to 'platform')
-rw-r--r-- | platform/linux-generic/include/odp_buffer_internal.h | 3 | ||||
-rw-r--r-- | platform/linux-generic/include/odp_buffer_pool_internal.h | 116 | ||||
-rw-r--r-- | platform/linux-generic/include/odp_internal.h | 1 | ||||
-rw-r--r-- | platform/linux-generic/include/odp_packet_internal.h | 1 | ||||
-rw-r--r-- | platform/linux-generic/include/odp_packet_io_queue.h | 13 | ||||
-rw-r--r-- | platform/linux-generic/include/odp_queue_internal.h | 21 | ||||
-rw-r--r-- | platform/linux-generic/include/odp_schedule_internal.h | 1 | ||||
-rw-r--r-- | platform/linux-generic/source/odp_buffer.c | 1 | ||||
-rw-r--r-- | platform/linux-generic/source/odp_buffer_pool.c | 85 | ||||
-rw-r--r-- | platform/linux-generic/source/odp_init.c | 5 | ||||
-rw-r--r-- | platform/linux-generic/source/odp_packet_io.c | 82 | ||||
-rw-r--r-- | platform/linux-generic/source/odp_queue.c | 121 | ||||
-rw-r--r-- | platform/linux-generic/source/odp_schedule.c | 201 |
13 files changed, 534 insertions, 117 deletions
diff --git a/platform/linux-generic/include/odp_buffer_internal.h b/platform/linux-generic/include/odp_buffer_internal.h index f14738409..c4e5b3322 100644 --- a/platform/linux-generic/include/odp_buffer_internal.h +++ b/platform/linux-generic/include/odp_buffer_internal.h @@ -105,9 +105,6 @@ typedef struct odp_buffer_chunk_hdr_t { } odp_buffer_chunk_hdr_t; - -odp_buffer_hdr_t *odp_buf_to_hdr(odp_buffer_t buf); - int odp_buffer_snprint(char *str, size_t n, odp_buffer_t buf); diff --git a/platform/linux-generic/include/odp_buffer_pool_internal.h b/platform/linux-generic/include/odp_buffer_pool_internal.h new file mode 100644 index 000000000..380de3692 --- /dev/null +++ b/platform/linux-generic/include/odp_buffer_pool_internal.h @@ -0,0 +1,116 @@ +/* Copyright (c) 2013, Linaro Limited + * All rights reserved. + * + * SPDX-License-Identifier: BSD-3-Clause + */ + + +/** + * @file + * + * ODP buffer pool - internal header + */ + +#ifndef ODP_BUFFER_POOL_INTERNAL_H_ +#define ODP_BUFFER_POOL_INTERNAL_H_ + +#ifdef __cplusplus +extern "C" { +#endif + +#include <odp_std_types.h> +#include <odp_buffer_pool.h> +#include <odp_buffer_internal.h> +#include <odp_align.h> +#include <odp_hints.h> +#include <odp_config.h> +#include <odp_debug.h> + +/* Use ticketlock instead of spinlock */ +#define POOL_USE_TICKETLOCK + +/* Extra error checks */ +/* #define POOL_ERROR_CHECK */ + + +#ifdef POOL_USE_TICKETLOCK +#include <odp_ticketlock.h> +#else +#include <odp_spinlock.h> +#endif + + +struct pool_entry_s { +#ifdef POOL_USE_TICKETLOCK + odp_ticketlock_t lock ODP_ALIGNED_CACHE; +#else + odp_spinlock_t lock ODP_ALIGNED_CACHE; +#endif + + odp_buffer_chunk_hdr_t *head; + uint64_t free_bufs; + char name[ODP_BUFFER_POOL_NAME_LEN]; + + + odp_buffer_pool_t pool ODP_ALIGNED_CACHE; + uintptr_t buf_base; + size_t buf_size; + size_t buf_offset; + uint64_t num_bufs; + void *pool_base_addr; + uint64_t pool_size; + size_t payload_size; + size_t payload_align; + int buf_type; + size_t hdr_size; +}; + + +extern void *pool_entry_ptr[]; + + +static inline void *get_pool_entry(odp_buffer_pool_t pool_id) +{ + return pool_entry_ptr[pool_id]; +} + + +static inline odp_buffer_hdr_t *odp_buf_to_hdr(odp_buffer_t buf) +{ + odp_buffer_bits_t handle; + uint32_t pool_id; + uint32_t index; + struct pool_entry_s *pool; + odp_buffer_hdr_t *hdr; + + handle.u32 = buf; + pool_id = handle.pool; + index = handle.index; + +#ifdef POOL_ERROR_CHECK + if (odp_unlikely(pool_id > ODP_CONFIG_BUFFER_POOLS)) { + ODP_ERR("odp_buf_to_hdr: Bad pool id\n"); + return NULL; + } +#endif + + pool = get_pool_entry(pool_id); + +#ifdef POOL_ERROR_CHECK + if (odp_unlikely(index > pool->num_bufs - 1)) { + ODP_ERR("odp_buf_to_hdr: Bad buffer index\n"); + return NULL; + } +#endif + + hdr = (odp_buffer_hdr_t *)(pool->buf_base + index * pool->buf_size); + + return hdr; +} + + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/platform/linux-generic/include/odp_internal.h b/platform/linux-generic/include/odp_internal.h index cfd19c948..1f42d2abb 100644 --- a/platform/linux-generic/include/odp_internal.h +++ b/platform/linux-generic/include/odp_internal.h @@ -40,6 +40,7 @@ int odp_queue_init_global(void); int odp_schedule_init_global(void); +int odp_schedule_init_local(void); #ifdef __cplusplus diff --git a/platform/linux-generic/include/odp_packet_internal.h b/platform/linux-generic/include/odp_packet_internal.h index 034162fb2..afef97630 100644 --- a/platform/linux-generic/include/odp_packet_internal.h +++ b/platform/linux-generic/include/odp_packet_internal.h @@ -21,6 +21,7 @@ extern "C" { #include <odp_align.h> #include <odp_debug.h> #include <odp_buffer_internal.h> +#include <odp_buffer_pool_internal.h> #include <odp_packet.h> #include <odp_packet_io.h> diff --git a/platform/linux-generic/include/odp_packet_io_queue.h b/platform/linux-generic/include/odp_packet_io_queue.h index 58d69611e..18e55f604 100644 --- a/platform/linux-generic/include/odp_packet_io_queue.h +++ b/platform/linux-generic/include/odp_packet_io_queue.h @@ -21,14 +21,27 @@ extern "C" { #include <odp_queue_internal.h> #include <odp_buffer_internal.h> +/** Max nbr of pkts to receive in one burst (keep same as QUEUE_MULTI_MAX) */ #define ODP_PKTIN_QUEUE_MAX_BURST 16 +/* pktin_deq_multi() depends on the condition: */ +ODP_ASSERT(ODP_PKTIN_QUEUE_MAX_BURST >= QUEUE_MULTI_MAX, + ODP_PKTIN_DEQ_MULTI_MAX_ERROR); int pktin_enqueue(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr); odp_buffer_hdr_t *pktin_dequeue(queue_entry_t *queue); +int pktin_enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num); +int pktin_deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num); + + int pktout_enqueue(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr); odp_buffer_hdr_t *pktout_dequeue(queue_entry_t *queue); +int pktout_enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], + int num); +int pktout_deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], + int num); + #ifdef __cplusplus } #endif diff --git a/platform/linux-generic/include/odp_queue_internal.h b/platform/linux-generic/include/odp_queue_internal.h index 6d6700a60..fede56a0c 100644 --- a/platform/linux-generic/include/odp_queue_internal.h +++ b/platform/linux-generic/include/odp_queue_internal.h @@ -32,6 +32,8 @@ extern "C" { #include <odp_spinlock.h> #endif +#define QUEUE_MULTI_MAX 8 + #define QUEUE_STATUS_FREE 0 #define QUEUE_STATUS_READY 1 #define QUEUE_STATUS_NOTSCHED 2 @@ -40,8 +42,13 @@ extern "C" { /* forward declaration */ union queue_entry_u; -typedef int (*enqueue_func_t)(union queue_entry_u *, odp_buffer_hdr_t *); -typedef odp_buffer_hdr_t *(*dequeue_func_t)(union queue_entry_u *); +typedef int (*enq_func_t)(union queue_entry_u *, odp_buffer_hdr_t *); +typedef odp_buffer_hdr_t *(*deq_func_t)(union queue_entry_u *); + +typedef int (*enq_multi_func_t)(union queue_entry_u *, + odp_buffer_hdr_t **, int); +typedef int (*deq_multi_func_t)(union queue_entry_u *, + odp_buffer_hdr_t **, int); struct queue_entry_s { #ifdef USE_TICKETLOCK @@ -54,8 +61,11 @@ struct queue_entry_s { odp_buffer_hdr_t *tail; int status; - enqueue_func_t enqueue ODP_ALIGNED_CACHE; - dequeue_func_t dequeue; + enq_func_t enqueue ODP_ALIGNED_CACHE; + deq_func_t dequeue; + enq_multi_func_t enqueue_multi; + deq_multi_func_t dequeue_multi; + odp_queue_t handle; odp_buffer_t sched_buf; odp_queue_type_t type; @@ -76,6 +86,9 @@ queue_entry_t *get_qentry(uint32_t queue_id); int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr); odp_buffer_hdr_t *queue_deq(queue_entry_t *queue); +int queue_enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num); +int queue_deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num); + void queue_lock(queue_entry_t *queue); void queue_unlock(queue_entry_t *queue); diff --git a/platform/linux-generic/include/odp_schedule_internal.h b/platform/linux-generic/include/odp_schedule_internal.h index bd0ec27bb..b0135c58b 100644 --- a/platform/linux-generic/include/odp_schedule_internal.h +++ b/platform/linux-generic/include/odp_schedule_internal.h @@ -17,6 +17,7 @@ extern "C" { #include <odp_buffer.h> #include <odp_queue.h> +void odp_schedule_mask_set(odp_queue_t queue, int prio); odp_buffer_t odp_schedule_buffer_alloc(odp_queue_t queue); diff --git a/platform/linux-generic/source/odp_buffer.c b/platform/linux-generic/source/odp_buffer.c index 1451b862c..9d5997aa7 100644 --- a/platform/linux-generic/source/odp_buffer.c +++ b/platform/linux-generic/source/odp_buffer.c @@ -7,6 +7,7 @@ #include <odp_buffer.h> #include <odp_buffer_internal.h> +#include <odp_buffer_pool_internal.h> #include <string.h> #include <stdio.h> diff --git a/platform/linux-generic/source/odp_buffer_pool.c b/platform/linux-generic/source/odp_buffer_pool.c index ab9b07c76..fecdb1815 100644 --- a/platform/linux-generic/source/odp_buffer_pool.c +++ b/platform/linux-generic/source/odp_buffer_pool.c @@ -4,7 +4,9 @@ * SPDX-License-Identifier: BSD-3-Clause */ +#include <odp_std_types.h> #include <odp_buffer_pool.h> +#include <odp_buffer_pool_internal.h> #include <odp_buffer_internal.h> #include <odp_packet_internal.h> #include <odp_shared_memory.h> @@ -18,9 +20,7 @@ #include <stdlib.h> -#define USE_TICKETLOCK - -#ifdef USE_TICKETLOCK +#ifdef POOL_USE_TICKETLOCK #include <odp_ticketlock.h> #define LOCK(a) odp_ticketlock_lock(a) #define UNLOCK(a) odp_ticketlock_unlock(a) @@ -39,31 +39,6 @@ #define NULL_INDEX ((uint32_t)-1) -struct pool_entry_s { -#ifdef USE_TICKETLOCK - odp_ticketlock_t lock ODP_ALIGNED_CACHE; -#else - odp_spinlock_t lock ODP_ALIGNED_CACHE; -#endif - - odp_buffer_chunk_hdr_t *head; - uint64_t free_bufs; - char name[ODP_BUFFER_POOL_NAME_LEN]; - - - odp_buffer_pool_t pool ODP_ALIGNED_CACHE; - uintptr_t buf_base; - size_t buf_size; - size_t buf_offset; - uint64_t num_bufs; - void *pool_base_addr; - uint64_t pool_size; - size_t payload_size; - size_t payload_align; - int buf_type; - size_t hdr_size; -}; - typedef union pool_entry_u { struct pool_entry_s s; @@ -79,16 +54,14 @@ typedef struct pool_table_t { } pool_table_t; +/* The pool table */ static pool_table_t *pool_tbl; - -static __thread odp_buffer_chunk_hdr_t *local_chunk[ODP_CONFIG_BUFFER_POOLS]; +/* Pool entry pointers (for inlining) */ +void *pool_entry_ptr[ODP_CONFIG_BUFFER_POOLS]; -static inline pool_entry_t *get_pool(odp_buffer_pool_t pool_id) -{ - return &pool_tbl->pool[pool_id]; -} +static __thread odp_buffer_chunk_hdr_t *local_chunk[ODP_CONFIG_BUFFER_POOLS]; static inline void set_handle(odp_buffer_hdr_t *hdr, @@ -107,36 +80,6 @@ static inline void set_handle(odp_buffer_hdr_t *hdr, } -odp_buffer_hdr_t *odp_buf_to_hdr(odp_buffer_t buf) -{ - odp_buffer_bits_t handle; - uint32_t pool_id; - uint32_t index; - pool_entry_t *pool; - odp_buffer_hdr_t *hdr; - - handle.u32 = buf; - pool_id = handle.pool; - index = handle.index; - - if (odp_unlikely(pool_id > ODP_CONFIG_BUFFER_POOLS)) { - ODP_ERR("odp_buf_to_hdr: Bad pool id\n"); - return NULL; - } - - pool = get_pool(pool_id); - - if (odp_unlikely(index > pool->s.num_bufs - 1)) { - ODP_ERR("odp_buf_to_hdr: Bad buffer index\n"); - return NULL; - } - - hdr = (odp_buffer_hdr_t *)(pool->s.buf_base + index * pool->s.buf_size); - - return hdr; -} - - int odp_buffer_pool_init_global(void) { odp_buffer_pool_t i; @@ -153,9 +96,11 @@ int odp_buffer_pool_init_global(void) for (i = 0; i < ODP_CONFIG_BUFFER_POOLS; i++) { /* init locks */ - pool_entry_t *pool = get_pool(i); + pool_entry_t *pool = &pool_tbl->pool[i]; LOCK_INIT(&pool->s.lock); pool->s.pool = i; + + pool_entry_ptr[i] = pool; } ODP_DBG("\nBuffer pool init global\n"); @@ -397,7 +342,7 @@ odp_buffer_pool_t odp_buffer_pool_create(const char *name, odp_buffer_pool_t pool_id = ODP_BUFFER_POOL_INVALID; for (i = 0; i < ODP_CONFIG_BUFFER_POOLS; i++) { - pool = get_pool(i); + pool = get_pool_entry(i); LOCK(&pool->s.lock); @@ -434,7 +379,7 @@ odp_buffer_pool_t odp_buffer_pool_lookup(const char *name) pool_entry_t *pool; for (i = 0; i < ODP_CONFIG_BUFFER_POOLS; i++) { - pool = get_pool(i); + pool = get_pool_entry(i); LOCK(&pool->s.lock); @@ -458,7 +403,7 @@ odp_buffer_t odp_buffer_alloc(odp_buffer_pool_t pool_id) odp_buffer_chunk_hdr_t *chunk; odp_buffer_bits_t handle; - pool = get_pool(pool_id); + pool = get_pool_entry(pool_id); chunk = local_chunk[pool_id]; if (chunk == NULL) { @@ -502,7 +447,7 @@ void odp_buffer_free(odp_buffer_t buf) hdr = odp_buf_to_hdr(buf); pool_id = hdr->pool; - pool = get_pool(pool_id); + pool = get_pool_entry(pool_id); chunk_hdr = local_chunk[pool_id]; @@ -532,7 +477,7 @@ void odp_buffer_pool_print(odp_buffer_pool_t pool_id) odp_buffer_chunk_hdr_t *chunk_hdr; uint32_t i; - pool = get_pool(pool_id); + pool = get_pool_entry(pool_id); printf("Pool info\n"); printf("---------\n"); diff --git a/platform/linux-generic/source/odp_init.c b/platform/linux-generic/source/odp_init.c index f56bc2c00..1d9cccd90 100644 --- a/platform/linux-generic/source/odp_init.c +++ b/platform/linux-generic/source/odp_init.c @@ -53,5 +53,10 @@ int odp_init_local(int thr_id) return -1; } + if (odp_schedule_init_local()) { + ODP_ERR("ODP schedule local init failed.\n"); + return -1; + } + return 0; } diff --git a/platform/linux-generic/source/odp_packet_io.c b/platform/linux-generic/source/odp_packet_io.c index 08d3cbe38..92aed34bd 100644 --- a/platform/linux-generic/source/odp_packet_io.c +++ b/platform/linux-generic/source/odp_packet_io.c @@ -378,26 +378,50 @@ odp_queue_t odp_pktio_outq_getdef(odp_pktio_t id) return pktio_entry->s.outq_default; } -int pktout_enqueue(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr) +int pktout_enqueue(queue_entry_t *qentry, odp_buffer_hdr_t *buf_hdr) { odp_packet_t pkt = odp_packet_from_buffer(buf_hdr->handle.handle); int len = 1; int nbr; - nbr = odp_pktio_send(queue->s.pktout, &pkt, len); + nbr = odp_pktio_send(qentry->s.pktout, &pkt, len); return (nbr == len ? 0 : -1); } -odp_buffer_hdr_t *pktout_dequeue(queue_entry_t *queue) +odp_buffer_hdr_t *pktout_dequeue(queue_entry_t *qentry) { - (void)queue; + (void)qentry; return NULL; } -int pktin_enqueue(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr) +int pktout_enq_multi(queue_entry_t *qentry, odp_buffer_hdr_t *buf_hdr[], + int num) +{ + odp_packet_t pkt_tbl[QUEUE_MULTI_MAX]; + int nbr; + int i; + + for (i = 0; i < num; ++i) + pkt_tbl[i] = odp_packet_from_buffer(buf_hdr[i]->handle.handle); + + nbr = odp_pktio_send(qentry->s.pktout, pkt_tbl, num); + return (nbr == num ? 0 : -1); +} + +int pktout_deq_multi(queue_entry_t *qentry, odp_buffer_hdr_t *buf_hdr[], + int num) +{ + (void)qentry; + (void)buf_hdr; + (void)num; + + return 0; +} + +int pktin_enqueue(queue_entry_t *qentry, odp_buffer_hdr_t *buf_hdr) { /* Use default action */ - return queue_enq(queue, buf_hdr); + return queue_enq(qentry, buf_hdr); } odp_buffer_hdr_t *pktin_dequeue(queue_entry_t *qentry) @@ -409,25 +433,57 @@ odp_buffer_hdr_t *pktin_dequeue(queue_entry_t *qentry) if (buf_hdr == NULL) { odp_packet_t pkt; odp_buffer_t buf; - odp_buffer_hdr_t *tmp_hdr; - odp_packet_t pkt_tbl[ODP_PKTIN_QUEUE_MAX_BURST]; - int pkts, i; + odp_packet_t pkt_tbl[QUEUE_MULTI_MAX]; + odp_buffer_hdr_t *tmp_hdr_tbl[QUEUE_MULTI_MAX]; + int pkts, i, j; pkts = odp_pktio_recv(qentry->s.pktin, pkt_tbl, - ODP_PKTIN_QUEUE_MAX_BURST); + QUEUE_MULTI_MAX); if (pkts > 0) { pkt = pkt_tbl[0]; buf = odp_buffer_from_packet(pkt); buf_hdr = odp_buf_to_hdr(buf); - for (i = 1; i < pkts; ++i) { + for (i = 1, j = 0; i < pkts; ++i) { buf = odp_buffer_from_packet(pkt_tbl[i]); - tmp_hdr = odp_buf_to_hdr(buf); - queue_enq(qentry, tmp_hdr); + tmp_hdr_tbl[j++] = odp_buf_to_hdr(buf); } + queue_enq_multi(qentry, tmp_hdr_tbl, j); } } return buf_hdr; } + +int pktin_enq_multi(queue_entry_t *qentry, odp_buffer_hdr_t *buf_hdr[], int num) +{ + /* Use default action */ + return queue_enq_multi(qentry, buf_hdr, num); +} + +int pktin_deq_multi(queue_entry_t *qentry, odp_buffer_hdr_t *buf_hdr[], int num) +{ + int nbr; + + nbr = queue_deq_multi(qentry, buf_hdr, num); + + if (nbr < num) { + odp_packet_t pkt_tbl[QUEUE_MULTI_MAX]; + odp_buffer_hdr_t *tmp_hdr_tbl[QUEUE_MULTI_MAX]; + odp_buffer_t buf; + int pkts, i; + + pkts = odp_pktio_recv(qentry->s.pktin, pkt_tbl, + QUEUE_MULTI_MAX); + if (pkts > 0) { + for (i = 0; i < pkts; ++i) { + buf = odp_buffer_from_packet(pkt_tbl[i]); + tmp_hdr_tbl[i] = odp_buf_to_hdr(buf); + } + queue_enq_multi(qentry, tmp_hdr_tbl, pkts); + } + } + + return nbr; +} diff --git a/platform/linux-generic/source/odp_queue.c b/platform/linux-generic/source/odp_queue.c index d770918dc..648c18e80 100644 --- a/platform/linux-generic/source/odp_queue.c +++ b/platform/linux-generic/source/odp_queue.c @@ -10,6 +10,7 @@ #include <odp_align.h> #include <odp_buffer.h> #include <odp_buffer_internal.h> +#include <odp_buffer_pool_internal.h> #include <odp_internal.h> #include <odp_shared_memory.h> #include <odp_schedule_internal.h> @@ -17,6 +18,7 @@ #include <odp_packet_io_internal.h> #include <odp_packet_io_queue.h> #include <odp_debug.h> +#include <odp_hints.h> #ifdef USE_TICKETLOCK #include <odp_ticketlock.h> @@ -65,14 +67,20 @@ static void queue_init(queue_entry_t *queue, const char *name, case ODP_QUEUE_TYPE_PKTIN: queue->s.enqueue = pktin_enqueue; queue->s.dequeue = pktin_dequeue; + queue->s.enqueue_multi = pktin_enq_multi; + queue->s.dequeue_multi = pktin_deq_multi; break; case ODP_QUEUE_TYPE_PKTOUT: queue->s.enqueue = pktout_enqueue; queue->s.dequeue = pktout_dequeue; + queue->s.enqueue_multi = pktout_enq_multi; + queue->s.dequeue_multi = pktout_deq_multi; break; default: queue->s.enqueue = queue_enq; queue->s.dequeue = queue_deq; + queue->s.enqueue_multi = queue_enq_multi; + queue->s.dequeue_multi = queue_deq_multi; break; } @@ -170,6 +178,8 @@ odp_queue_t odp_queue_create(const char *name, odp_queue_type_t type, } queue->s.sched_buf = buf; + + odp_schedule_mask_set(handle, queue->s.param.sched.prio); } return handle; @@ -251,6 +261,61 @@ int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr) } +int queue_enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num) +{ + int sched = 0; + int i; + odp_buffer_hdr_t *tail; + + for (i = 0; i < num - 1; i++) + buf_hdr[i]->next = buf_hdr[i+1]; + + tail = buf_hdr[num-1]; + buf_hdr[num-1]->next = NULL; + + LOCK(&queue->s.lock); + + /* Empty queue */ + if (queue->s.head == NULL) + queue->s.head = buf_hdr[0]; + else + queue->s.tail->next = buf_hdr[0]; + + queue->s.tail = tail; + + if (queue->s.status == QUEUE_STATUS_NOTSCHED) { + queue->s.status = QUEUE_STATUS_SCHED; + sched = 1; /* retval: schedule queue */ + } + + UNLOCK(&queue->s.lock); + + /* Add queue to scheduling */ + if (sched == 1) + odp_schedule_queue(queue->s.handle, queue->s.param.sched.prio); + + return 0; +} + + +int odp_queue_enq_multi(odp_queue_t handle, odp_buffer_t buf[], int num) +{ + odp_buffer_hdr_t *buf_hdr[QUEUE_MULTI_MAX]; + queue_entry_t *queue; + int i; + + if (num > QUEUE_MULTI_MAX) + num = QUEUE_MULTI_MAX; + + queue = queue_to_qentry(handle); + + for (i = 0; i < num; i++) + buf_hdr[i] = odp_buf_to_hdr(buf[i]); + + return queue->s.enqueue_multi(queue, buf_hdr, num); +} + + int odp_queue_enq(odp_queue_t handle, odp_buffer_t buf) { odp_buffer_hdr_t *buf_hdr; @@ -291,6 +356,62 @@ odp_buffer_hdr_t *queue_deq(queue_entry_t *queue) } +int queue_deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num) +{ + int i = 0; + + LOCK(&queue->s.lock); + + if (queue->s.head == NULL) { + /* Already empty queue */ + if (queue->s.status == QUEUE_STATUS_SCHED && + queue->s.type != ODP_QUEUE_TYPE_PKTIN) + queue->s.status = QUEUE_STATUS_NOTSCHED; + } else { + odp_buffer_hdr_t *hdr = queue->s.head; + + for (; i < num && hdr; i++) { + buf_hdr[i] = hdr; + /* odp_prefetch(hdr->addr); */ + hdr = hdr->next; + buf_hdr[i]->next = NULL; + } + + queue->s.head = hdr; + + if (hdr == NULL) { + /* Queue is now empty */ + queue->s.tail = NULL; + } + } + + UNLOCK(&queue->s.lock); + + return i; +} + + +int odp_queue_deq_multi(odp_queue_t handle, odp_buffer_t buf[], int num) +{ + queue_entry_t *queue; + odp_buffer_hdr_t *buf_hdr[QUEUE_MULTI_MAX]; + int i, ret; + + if (num > QUEUE_MULTI_MAX) + num = QUEUE_MULTI_MAX; + + queue = queue_to_qentry(handle); + + ret = queue->s.dequeue_multi(queue, buf_hdr, num); + + for (i = 0; i < ret; i++) + buf[i] = buf_hdr[i]->handle.handle; + + return ret; +} + + + odp_buffer_t odp_queue_deq(odp_queue_t handle) { queue_entry_t *queue; diff --git a/platform/linux-generic/source/odp_schedule.c b/platform/linux-generic/source/odp_schedule.c index df76272c2..c3e071ad3 100644 --- a/platform/linux-generic/source/odp_schedule.c +++ b/platform/linux-generic/source/odp_schedule.c @@ -15,6 +15,8 @@ #include <odp_config.h> #include <odp_debug.h> #include <odp_thread.h> +#include <odp_spinlock.h> +#include <odp_hints.h> #include <odp_queue_internal.h> @@ -26,11 +28,22 @@ #define QUEUES_PER_PRIO 4 /* TODO: random or queue based selection */ -#define RAND_PRI_QUEUE(x) ((QUEUES_PER_PRIO-1) & (queue_to_id(x))) +#define SEL_PRI_QUEUE(x) ((QUEUES_PER_PRIO-1) & (queue_to_id(x))) + +/* Maximum number of dequeues */ +#define MAX_DEQ 4 + + +/* Mask of queues per priority */ +typedef uint8_t pri_mask_t; + +ODP_ASSERT((8*sizeof(pri_mask_t)) >= QUEUES_PER_PRIO, pri_mask_t_is_too_small); typedef struct { odp_queue_t pri_queue[ODP_CONFIG_SCHED_PRIOS][QUEUES_PER_PRIO]; + pri_mask_t pri_mask[ODP_CONFIG_SCHED_PRIOS]; + odp_spinlock_t mask_lock; odp_buffer_pool_t pool; } sched_t; @@ -42,19 +55,24 @@ typedef struct { typedef struct { odp_queue_t pri_queue; odp_buffer_t desc_buf; -} thread_local_atomic_t; + odp_buffer_t buf[MAX_DEQ]; + int num; + int index; + odp_queue_t queue; + +} sched_local_t; +/* Global scheduler context */ static sched_t *sched; -/* Thread local atomic context status */ -static __thread thread_local_atomic_t tl_atomic = {ODP_QUEUE_INVALID, - ODP_BUFFER_INVALID}; +/* Thread local scheduler context */ +static __thread sched_local_t sched_local; static inline odp_queue_t select_pri_queue(odp_queue_t queue, int prio) { - int id = RAND_PRI_QUEUE(queue); + int id = SEL_PRI_QUEUE(queue); return sched->pri_queue[prio][id]; } @@ -91,6 +109,7 @@ int odp_schedule_init_global(void) } sched->pool = pool; + odp_spinlock_init(&sched->mask_lock); for (i = 0; i < ODP_CONFIG_SCHED_PRIOS; i++) { odp_queue_t queue; @@ -112,6 +131,7 @@ int odp_schedule_init_global(void) } sched->pri_queue[i][j] = queue; + sched->pri_mask[i] = 0; } } @@ -121,6 +141,34 @@ int odp_schedule_init_global(void) } +int odp_schedule_init_local(void) +{ + int i; + + sched_local.pri_queue = ODP_QUEUE_INVALID; + sched_local.desc_buf = ODP_BUFFER_INVALID; + + for (i = 0; i < MAX_DEQ; i++) + sched_local.buf[i] = ODP_BUFFER_INVALID; + + sched_local.num = 0; + sched_local.index = 0; + sched_local.queue = ODP_QUEUE_INVALID; + + return 0; +} + + +void odp_schedule_mask_set(odp_queue_t queue, int prio) +{ + int id = SEL_PRI_QUEUE(queue); + + odp_spinlock_lock(&sched->mask_lock); + sched->pri_mask[prio] |= 1 << id; + odp_spinlock_unlock(&sched->mask_lock); +} + + odp_buffer_t odp_schedule_buffer_alloc(odp_queue_t queue) { odp_buffer_t buf; @@ -151,23 +199,50 @@ void odp_schedule_queue(odp_queue_t queue, int prio) void odp_schedule_release_atomic_context(void) { - if (tl_atomic.pri_queue != ODP_QUEUE_INVALID) { + if (sched_local.pri_queue != ODP_QUEUE_INVALID && + sched_local.num == 0) { /* Release current atomic queue */ - odp_queue_enq(tl_atomic.pri_queue, tl_atomic.desc_buf); - tl_atomic.pri_queue = ODP_QUEUE_INVALID; + odp_queue_enq(sched_local.pri_queue, sched_local.desc_buf); + sched_local.pri_queue = ODP_QUEUE_INVALID; } } +static inline int copy_bufs(odp_buffer_t out_buf[], unsigned int max) +{ + int i = 0; + + while (sched_local.num && max) { + out_buf[i] = sched_local.buf[sched_local.index]; + sched_local.index++; + sched_local.num--; + max--; + i++; + } + + return i; +} + /* * Schedule queues * * TODO: SYNC_ORDERED not implemented yet */ -odp_buffer_t odp_schedule(odp_queue_t *out_queue) +static int schedule(odp_queue_t *out_queue, odp_buffer_t out_buf[], + unsigned int max_num) { int i, j; int thr; + int ret; + + if (sched_local.num) { + ret = copy_bufs(out_buf, max_num); + + if (out_queue) + *out_queue = sched_local.queue; + + return ret; + } odp_schedule_release_atomic_context(); @@ -176,29 +251,37 @@ odp_buffer_t odp_schedule(odp_queue_t *out_queue) for (i = 0; i < ODP_CONFIG_SCHED_PRIOS; i++) { int id; + if (sched->pri_mask[i] == 0) + continue; + id = thr & (QUEUES_PER_PRIO-1); - for (j = 0; j < QUEUES_PER_PRIO; j++) { + for (j = 0; j < QUEUES_PER_PRIO; j++, id++) { odp_queue_t pri_q; odp_buffer_t desc_buf; - pri_q = sched->pri_queue[i][id]; - desc_buf = odp_queue_deq(pri_q); - - id++; if (id >= QUEUES_PER_PRIO) id = 0; + if (odp_unlikely((sched->pri_mask[i] & (1 << id)) == 0)) + continue; + + pri_q = sched->pri_queue[i][id]; + desc_buf = odp_queue_deq(pri_q); + if (desc_buf != ODP_BUFFER_INVALID) { queue_desc_t *desc; odp_queue_t queue; - odp_buffer_t buf; + int num; desc = odp_buffer_addr(desc_buf); queue = desc->queue; - buf = odp_queue_deq(queue); - if (buf == ODP_BUFFER_INVALID) { + num = odp_queue_deq_multi(queue, + sched_local.buf, + MAX_DEQ); + + if (num == 0) { /* Remove empty queue from scheduling, * except packet input queues */ @@ -209,10 +292,16 @@ odp_buffer_t odp_schedule(odp_queue_t *out_queue) continue; } + sched_local.num = num; + sched_local.index = 0; + ret = copy_bufs(out_buf, max_num); + + sched_local.queue = queue; + if (queue_sched_atomic(queue)) { /* Hold queue during atomic access */ - tl_atomic.pri_queue = pri_q; - tl_atomic.desc_buf = desc_buf; + sched_local.pri_queue = pri_q; + sched_local.desc_buf = desc_buf; } else { /* Continue scheduling the queue */ odp_queue_enq(pri_q, desc_buf); @@ -222,27 +311,85 @@ odp_buffer_t odp_schedule(odp_queue_t *out_queue) if (out_queue) *out_queue = queue; - return buf; + return ret; } } } - return ODP_BUFFER_INVALID; + return 0; } -odp_buffer_t odp_schedule_poll(odp_queue_t *queue) +odp_buffer_t odp_schedule_once(odp_queue_t *out_queue) { - odp_buffer_t buf; + odp_buffer_t buf = ODP_BUFFER_INVALID; - do { - buf = odp_schedule(queue); - } while (buf == ODP_BUFFER_INVALID); + schedule(out_queue, &buf, 1); return buf; } +odp_buffer_t odp_schedule(odp_queue_t *out_queue) +{ + odp_buffer_t buf; + int ret; + + while (1) { + ret = schedule(out_queue, &buf, 1); + + if (ret) + return buf; + } +} + + +odp_buffer_t odp_schedule_n(odp_queue_t *out_queue, unsigned int n) +{ + odp_buffer_t buf; + int ret; + + while (n--) { + ret = schedule(out_queue, &buf, 1); + + if (ret) + return buf; + } + + return ODP_BUFFER_INVALID; +} + + +int odp_schedule_multi(odp_queue_t *out_queue, odp_buffer_t out_buf[], + unsigned int num) +{ + int ret; + + while (1) { + ret = schedule(out_queue, out_buf, num); + + if (ret) + return ret; + } +} + + +int odp_schedule_multi_n(odp_queue_t *out_queue, odp_buffer_t out_buf[], + unsigned int num, unsigned int n) +{ + int ret; + + while (n--) { + ret = schedule(out_queue, out_buf, num); + + if (ret) + return ret; + } + + return 0; +} + + int odp_schedule_num_prio(void) { return ODP_CONFIG_SCHED_PRIOS; |