diff options
Diffstat (limited to 'platform/linux-generic/odp_schedule.c')
-rw-r--r-- | platform/linux-generic/odp_schedule.c | 321 |
1 files changed, 229 insertions, 92 deletions
diff --git a/platform/linux-generic/odp_schedule.c b/platform/linux-generic/odp_schedule.c index cd5bf21bd..c4567d810 100644 --- a/platform/linux-generic/odp_schedule.c +++ b/platform/linux-generic/odp_schedule.c @@ -20,6 +20,7 @@ #include <odp_config_internal.h> #include <odp_align_internal.h> #include <odp/api/sync.h> +#include <odp/api/packet_io.h> #include <odp_ring_internal.h> #include <odp_queue_internal.h> @@ -34,11 +35,18 @@ ODP_STATIC_ASSERT((ODP_SCHED_PRIO_NORMAL > 0) && "normal_prio_is_not_between_highest_and_lowest"); /* Number of scheduling groups */ -#define NUM_SCHED_GRPS 256 +#define NUM_SCHED_GRPS 32 /* Priority queues per priority */ #define QUEUES_PER_PRIO 4 +/* A thread polls a non preferred sched queue every this many polls + * of the prefer queue. */ +#define PREFER_RATIO 64 + +/* Size of poll weight table */ +#define WEIGHT_TBL_SIZE ((QUEUES_PER_PRIO - 1) * PREFER_RATIO) + /* Packet input poll cmd queues */ #define PKTIO_CMD_QUEUES 4 @@ -142,7 +150,6 @@ typedef struct { int index; int pause; uint16_t round; - uint16_t prefer_offset; uint16_t pktin_polls; uint32_t queue_index; odp_queue_t queue; @@ -157,6 +164,12 @@ typedef struct { ordered_stash_t stash[MAX_ORDERED_STASH]; } ordered; + uint32_t grp_epoch; + int num_grp; + uint8_t grp[NUM_SCHED_GRPS]; + uint8_t weight_tbl[WEIGHT_TBL_SIZE]; + uint8_t grp_weight[WEIGHT_TBL_SIZE]; + } sched_local_t; /* Priority queue */ @@ -191,7 +204,7 @@ typedef struct { pri_mask_t pri_mask[NUM_PRIO]; odp_spinlock_t mask_lock; - prio_queue_t prio_q[NUM_PRIO][QUEUES_PER_PRIO]; + prio_queue_t prio_q[NUM_SCHED_GRPS][NUM_PRIO][QUEUES_PER_PRIO]; odp_spinlock_t poll_cmd_lock; /* Number of commands in a command queue */ @@ -206,8 +219,10 @@ typedef struct { odp_shm_t shm; uint32_t pri_count[NUM_PRIO][QUEUES_PER_PRIO]; - odp_spinlock_t grp_lock; - odp_thrmask_t mask_all; + odp_thrmask_t mask_all; + odp_spinlock_t grp_lock; + odp_atomic_u32_t grp_epoch; + struct { char name[ODP_SCHED_GROUP_NAME_LEN]; odp_thrmask_t mask; @@ -215,6 +230,7 @@ typedef struct { } sched_grp[NUM_SCHED_GRPS]; struct { + int grp; int prio; int queue_per_prio; } queue[ODP_CONFIG_QUEUES]; @@ -237,17 +253,35 @@ static inline void schedule_release_context(void); static void sched_local_init(void) { + int i; + uint8_t id; + uint8_t offset = 0; + memset(&sched_local, 0, sizeof(sched_local_t)); sched_local.thr = odp_thread_id(); sched_local.queue = ODP_QUEUE_INVALID; sched_local.queue_index = PRIO_QUEUE_EMPTY; + + id = sched_local.thr & (QUEUES_PER_PRIO - 1); + + for (i = 0; i < WEIGHT_TBL_SIZE; i++) { + sched_local.weight_tbl[i] = id; + + if (i % PREFER_RATIO == 0) { + offset++; + sched_local.weight_tbl[i] = (id + offset) & + (QUEUES_PER_PRIO - 1); + if (offset == QUEUES_PER_PRIO - 1) + offset = 0; + } + } } static int schedule_init_global(void) { odp_shm_t shm; - int i, j; + int i, j, grp; ODP_DBG("Schedule init ... "); @@ -267,15 +301,20 @@ static int schedule_init_global(void) sched->shm = shm; odp_spinlock_init(&sched->mask_lock); - for (i = 0; i < NUM_PRIO; i++) { - for (j = 0; j < QUEUES_PER_PRIO; j++) { - int k; + for (grp = 0; grp < NUM_SCHED_GRPS; grp++) { + for (i = 0; i < NUM_PRIO; i++) { + for (j = 0; j < QUEUES_PER_PRIO; j++) { + prio_queue_t *prio_q; + int k; - ring_init(&sched->prio_q[i][j].ring); + prio_q = &sched->prio_q[grp][i][j]; + ring_init(&prio_q->ring); - for (k = 0; k < PRIO_QUEUE_RING_SIZE; k++) - sched->prio_q[i][j].queue_index[k] = - PRIO_QUEUE_EMPTY; + for (k = 0; k < PRIO_QUEUE_RING_SIZE; k++) { + prio_q->queue_index[k] = + PRIO_QUEUE_EMPTY; + } + } } } @@ -291,12 +330,17 @@ static int schedule_init_global(void) sched->pktio_cmd[i].cmd_index = PKTIO_CMD_FREE; odp_spinlock_init(&sched->grp_lock); + odp_atomic_init_u32(&sched->grp_epoch, 0); for (i = 0; i < NUM_SCHED_GRPS; i++) { memset(sched->sched_grp[i].name, 0, ODP_SCHED_GROUP_NAME_LEN); odp_thrmask_zero(&sched->sched_grp[i].mask); } + sched->sched_grp[ODP_SCHED_GROUP_ALL].allocated = 1; + sched->sched_grp[ODP_SCHED_GROUP_WORKER].allocated = 1; + sched->sched_grp[ODP_SCHED_GROUP_CONTROL].allocated = 1; + odp_thrmask_setall(&sched->mask_all); ODP_DBG("done\n"); @@ -304,29 +348,38 @@ static int schedule_init_global(void) return 0; } +static inline void queue_destroy_finalize(uint32_t qi) +{ + sched_cb_queue_destroy_finalize(qi); +} + static int schedule_term_global(void) { int ret = 0; int rc = 0; - int i, j; + int i, j, grp; - for (i = 0; i < NUM_PRIO; i++) { - for (j = 0; j < QUEUES_PER_PRIO; j++) { - ring_t *ring = &sched->prio_q[i][j].ring; - uint32_t qi; + for (grp = 0; grp < NUM_SCHED_GRPS; grp++) { + for (i = 0; i < NUM_PRIO; i++) { + for (j = 0; j < QUEUES_PER_PRIO; j++) { + ring_t *ring = &sched->prio_q[grp][i][j].ring; + uint32_t qi; - while ((qi = ring_deq(ring, PRIO_QUEUE_MASK)) != - RING_EMPTY) { - odp_event_t events[1]; - int num; + while ((qi = ring_deq(ring, PRIO_QUEUE_MASK)) != + RING_EMPTY) { + odp_event_t events[1]; + int num; - num = sched_cb_queue_deq_multi(qi, events, 1); + num = sched_cb_queue_deq_multi(qi, + events, + 1); - if (num < 0) - sched_cb_queue_destroy_finalize(qi); + if (num < 0) + queue_destroy_finalize(qi); - if (num > 0) - ODP_ERR("Queue not empty\n"); + if (num > 0) + ODP_ERR("Queue not empty\n"); + } } } } @@ -357,6 +410,40 @@ static int schedule_term_local(void) return 0; } +static inline void grp_update_mask(int grp, const odp_thrmask_t *new_mask) +{ + odp_thrmask_copy(&sched->sched_grp[grp].mask, new_mask); + odp_atomic_add_rel_u32(&sched->grp_epoch, 1); +} + +static inline int grp_update_tbl(void) +{ + int i; + int num = 0; + int thr = sched_local.thr; + + odp_spinlock_lock(&sched->grp_lock); + + for (i = 0; i < NUM_SCHED_GRPS; i++) { + if (sched->sched_grp[i].allocated == 0) + continue; + + if (odp_thrmask_isset(&sched->sched_grp[i].mask, thr)) { + sched_local.grp[num] = i; + num++; + } + } + + odp_spinlock_unlock(&sched->grp_lock); + + /* Update group weights. Round robin over all thread's groups. */ + for (i = 0; i < WEIGHT_TBL_SIZE; i++) + sched_local.grp_weight[i] = i % num; + + sched_local.num_grp = num; + return num; +} + static unsigned schedule_max_ordered_locks(void) { return MAX_ORDERED_LOCKS_PER_QUEUE; @@ -407,6 +494,7 @@ static int schedule_init_queue(uint32_t queue_index, int prio = sched_param->prio; pri_set_queue(queue_index, prio); + sched->queue[queue_index].grp = sched_param->group; sched->queue[queue_index].prio = prio; sched->queue[queue_index].queue_per_prio = queue_per_prio(queue_index); @@ -418,6 +506,7 @@ static void schedule_destroy_queue(uint32_t queue_index) int prio = sched->queue[queue_index].prio; pri_clr_queue(queue_index, prio); + sched->queue[queue_index].grp = 0; sched->queue[queue_index].prio = 0; sched->queue[queue_index].queue_per_prio = 0; } @@ -509,9 +598,10 @@ static void schedule_release_atomic(void) uint32_t qi = sched_local.queue_index; if (qi != PRIO_QUEUE_EMPTY && sched_local.num == 0) { - int prio = sched->queue[qi].prio; + int grp = sched->queue[qi].grp; + int prio = sched->queue[qi].prio; int queue_per_prio = sched->queue[qi].queue_per_prio; - ring_t *ring = &sched->prio_q[prio][queue_per_prio].ring; + ring_t *ring = &sched->prio_q[grp][prio][queue_per_prio].ring; /* Release current atomic queue */ ring_enq(ring, PRIO_QUEUE_MASK, qi); @@ -640,7 +730,9 @@ static int schedule_ord_enq_multi(uint32_t queue_index, void *buf_hdr[], return 0; } - if (odp_unlikely(stash_num >= MAX_ORDERED_STASH)) { + /* Pktout may drop packets, so the operation cannot be stashed. */ + if (dst_queue->s.pktout.pktio != ODP_PKTIO_INVALID || + odp_unlikely(stash_num >= MAX_ORDERED_STASH)) { /* If the local stash is full, wait until it is our turn and * then release the stash and do enqueue directly. */ wait_for_order(src_queue); @@ -662,54 +754,26 @@ static int schedule_ord_enq_multi(uint32_t queue_index, void *buf_hdr[], return 1; } -/* - * Schedule queues - */ -static int do_schedule(odp_queue_t *out_queue, odp_event_t out_ev[], - unsigned int max_num) +static inline int do_schedule_grp(odp_queue_t *out_queue, odp_event_t out_ev[], + unsigned int max_num, int grp, int first) { int prio, i; int ret; int id; - int offset = 0; unsigned int max_deq = MAX_DEQ; uint32_t qi; - if (sched_local.num) { - ret = copy_events(out_ev, max_num); - - if (out_queue) - *out_queue = sched_local.queue; - - return ret; - } - - schedule_release_context(); - - if (odp_unlikely(sched_local.pause)) - return 0; - - /* Each thread prefers a priority queue. This offset avoids starvation - * of other priority queues on low thread counts. */ - if (odp_unlikely((sched_local.round & 0x3f) == 0)) { - offset = sched_local.prefer_offset; - sched_local.prefer_offset = (offset + 1) & - (QUEUES_PER_PRIO - 1); - } - - sched_local.round++; - /* Schedule events */ for (prio = 0; prio < NUM_PRIO; prio++) { if (sched->pri_mask[prio] == 0) continue; - id = (sched_local.thr + offset) & (QUEUES_PER_PRIO - 1); + /* Select the first ring based on weights */ + id = first; for (i = 0; i < QUEUES_PER_PRIO;) { int num; - int grp; int ordered; odp_queue_t handle; ring_t *ring; @@ -726,7 +790,7 @@ static int do_schedule(odp_queue_t *out_queue, odp_event_t out_ev[], } /* Get queue index from the priority queue */ - ring = &sched->prio_q[prio][id].ring; + ring = &sched->prio_q[grp][prio][id].ring; qi = ring_deq(ring, PRIO_QUEUE_MASK); /* Priority queue empty */ @@ -736,24 +800,10 @@ static int do_schedule(odp_queue_t *out_queue, odp_event_t out_ev[], continue; } - grp = sched_cb_queue_grp(qi); - - if (grp > ODP_SCHED_GROUP_ALL && - !odp_thrmask_isset(&sched->sched_grp[grp].mask, - sched_local.thr)) { - /* This thread is not eligible for work from - * this queue, so continue scheduling it. - */ - ring_enq(ring, PRIO_QUEUE_MASK, qi); - - i++; - id++; - continue; - } - /* Low priorities have smaller batch size to limit * head of line blocking latency. */ - if (odp_unlikely(prio > ODP_SCHED_PRIO_DEFAULT)) + if (odp_unlikely(MAX_DEQ > 1 && + prio > ODP_SCHED_PRIO_DEFAULT)) max_deq = MAX_DEQ / 2; ordered = sched_cb_queue_is_ordered(qi); @@ -818,6 +868,70 @@ static int do_schedule(odp_queue_t *out_queue, odp_event_t out_ev[], } } + return 0; +} + +/* + * Schedule queues + */ +static inline int do_schedule(odp_queue_t *out_queue, odp_event_t out_ev[], + unsigned int max_num) +{ + int i, num_grp; + int ret; + int id, first, grp_id; + uint16_t round; + uint32_t epoch; + + if (sched_local.num) { + ret = copy_events(out_ev, max_num); + + if (out_queue) + *out_queue = sched_local.queue; + + return ret; + } + + schedule_release_context(); + + if (odp_unlikely(sched_local.pause)) + return 0; + + /* Each thread prefers a priority queue. Poll weight table avoids + * starvation of other priority queues on low thread counts. */ + round = sched_local.round + 1; + + if (odp_unlikely(round == WEIGHT_TBL_SIZE)) + round = 0; + + sched_local.round = round; + first = sched_local.weight_tbl[round]; + + epoch = odp_atomic_load_acq_u32(&sched->grp_epoch); + num_grp = sched_local.num_grp; + + if (odp_unlikely(sched_local.grp_epoch != epoch)) { + num_grp = grp_update_tbl(); + sched_local.grp_epoch = epoch; + } + + grp_id = sched_local.grp_weight[round]; + + /* Schedule queues per group and priority */ + for (i = 0; i < num_grp; i++) { + int grp; + + grp = sched_local.grp[grp_id]; + ret = do_schedule_grp(out_queue, out_ev, max_num, grp, first); + + if (odp_likely(ret)) + return ret; + + grp_id++; + if (odp_unlikely(grp_id >= num_grp)) + grp_id = 0; + } + /* * Poll packet input when there are no events * * Each thread starts the search for a poll command from its @@ -1023,7 +1137,8 @@ static odp_schedule_group_t schedule_group_create(const char *name, ODP_SCHED_GROUP_NAME_LEN - 1); grp_name[ODP_SCHED_GROUP_NAME_LEN - 1] = 0; } - odp_thrmask_copy(&sched->sched_grp[i].mask, mask); + + grp_update_mask(i, mask); group = (odp_schedule_group_t)i; sched->sched_grp[i].allocated = 1; break; @@ -1036,13 +1151,16 @@ static odp_schedule_group_t schedule_group_create(const char *name, static int schedule_group_destroy(odp_schedule_group_t group) { + odp_thrmask_t zero; int ret; + odp_thrmask_zero(&zero); + odp_spinlock_lock(&sched->grp_lock); if (group < NUM_SCHED_GRPS && group >= SCHED_GROUP_NAMED && sched->sched_grp[group].allocated) { - odp_thrmask_zero(&sched->sched_grp[group].mask); + grp_update_mask(group, &zero); memset(sched->sched_grp[group].name, 0, ODP_SCHED_GROUP_NAME_LEN); sched->sched_grp[group].allocated = 0; @@ -1082,9 +1200,11 @@ static int schedule_group_join(odp_schedule_group_t group, if (group < NUM_SCHED_GRPS && group >= SCHED_GROUP_NAMED && sched->sched_grp[group].allocated) { - odp_thrmask_or(&sched->sched_grp[group].mask, - &sched->sched_grp[group].mask, - mask); + odp_thrmask_t new_mask; + + odp_thrmask_or(&new_mask, &sched->sched_grp[group].mask, mask); + grp_update_mask(group, &new_mask); + ret = 0; } else { ret = -1; @@ -1097,18 +1217,19 @@ static int schedule_group_join(odp_schedule_group_t group, static int schedule_group_leave(odp_schedule_group_t group, const odp_thrmask_t *mask) { + odp_thrmask_t new_mask; int ret; + odp_thrmask_xor(&new_mask, mask, &sched->mask_all); + odp_spinlock_lock(&sched->grp_lock); if (group < NUM_SCHED_GRPS && group >= SCHED_GROUP_NAMED && sched->sched_grp[group].allocated) { - odp_thrmask_t leavemask; + odp_thrmask_and(&new_mask, &sched->sched_grp[group].mask, + &new_mask); + grp_update_mask(group, &new_mask); - odp_thrmask_xor(&leavemask, mask, &sched->mask_all); - odp_thrmask_and(&sched->sched_grp[group].mask, - &sched->sched_grp[group].mask, - &leavemask); ret = 0; } else { ret = -1; @@ -1159,12 +1280,19 @@ static int schedule_group_info(odp_schedule_group_t group, static int schedule_thr_add(odp_schedule_group_t group, int thr) { + odp_thrmask_t mask; + odp_thrmask_t new_mask; + if (group < 0 || group >= SCHED_GROUP_NAMED) return -1; + odp_thrmask_zero(&mask); + odp_thrmask_set(&mask, thr); + odp_spinlock_lock(&sched->grp_lock); - odp_thrmask_set(&sched->sched_grp[group].mask, thr); + odp_thrmask_or(&new_mask, &sched->sched_grp[group].mask, &mask); + grp_update_mask(group, &new_mask); odp_spinlock_unlock(&sched->grp_lock); @@ -1173,12 +1301,20 @@ static int schedule_thr_add(odp_schedule_group_t group, int thr) static int schedule_thr_rem(odp_schedule_group_t group, int thr) { + odp_thrmask_t mask; + odp_thrmask_t new_mask; + if (group < 0 || group >= SCHED_GROUP_NAMED) return -1; + odp_thrmask_zero(&mask); + odp_thrmask_set(&mask, thr); + odp_thrmask_xor(&new_mask, &mask, &sched->mask_all); + odp_spinlock_lock(&sched->grp_lock); - odp_thrmask_clr(&sched->sched_grp[group].mask, thr); + odp_thrmask_and(&new_mask, &sched->sched_grp[group].mask, &new_mask); + grp_update_mask(group, &new_mask); odp_spinlock_unlock(&sched->grp_lock); @@ -1192,9 +1328,10 @@ static void schedule_prefetch(int num ODP_UNUSED) static int schedule_sched_queue(uint32_t queue_index) { + int grp = sched->queue[queue_index].grp; int prio = sched->queue[queue_index].prio; int queue_per_prio = sched->queue[queue_index].queue_per_prio; - ring_t *ring = &sched->prio_q[prio][queue_per_prio].ring; + ring_t *ring = &sched->prio_q[grp][prio][queue_per_prio].ring; ring_enq(ring, PRIO_QUEUE_MASK, queue_index); return 0; |