aboutsummaryrefslogtreecommitdiff
path: root/platform/linux-generic/odp_schedule.c
diff options
context:
space:
mode:
Diffstat (limited to 'platform/linux-generic/odp_schedule.c')
-rw-r--r--platform/linux-generic/odp_schedule.c321
1 files changed, 229 insertions, 92 deletions
diff --git a/platform/linux-generic/odp_schedule.c b/platform/linux-generic/odp_schedule.c
index cd5bf21bd..c4567d810 100644
--- a/platform/linux-generic/odp_schedule.c
+++ b/platform/linux-generic/odp_schedule.c
@@ -20,6 +20,7 @@
#include <odp_config_internal.h>
#include <odp_align_internal.h>
#include <odp/api/sync.h>
+#include <odp/api/packet_io.h>
#include <odp_ring_internal.h>
#include <odp_queue_internal.h>
@@ -34,11 +35,18 @@ ODP_STATIC_ASSERT((ODP_SCHED_PRIO_NORMAL > 0) &&
"normal_prio_is_not_between_highest_and_lowest");
/* Number of scheduling groups */
-#define NUM_SCHED_GRPS 256
+#define NUM_SCHED_GRPS 32
/* Priority queues per priority */
#define QUEUES_PER_PRIO 4
+/* A thread polls a non preferred sched queue every this many polls
+ * of the prefer queue. */
+#define PREFER_RATIO 64
+
+/* Size of poll weight table */
+#define WEIGHT_TBL_SIZE ((QUEUES_PER_PRIO - 1) * PREFER_RATIO)
+
/* Packet input poll cmd queues */
#define PKTIO_CMD_QUEUES 4
@@ -142,7 +150,6 @@ typedef struct {
int index;
int pause;
uint16_t round;
- uint16_t prefer_offset;
uint16_t pktin_polls;
uint32_t queue_index;
odp_queue_t queue;
@@ -157,6 +164,12 @@ typedef struct {
ordered_stash_t stash[MAX_ORDERED_STASH];
} ordered;
+ uint32_t grp_epoch;
+ int num_grp;
+ uint8_t grp[NUM_SCHED_GRPS];
+ uint8_t weight_tbl[WEIGHT_TBL_SIZE];
+ uint8_t grp_weight[WEIGHT_TBL_SIZE];
+
} sched_local_t;
/* Priority queue */
@@ -191,7 +204,7 @@ typedef struct {
pri_mask_t pri_mask[NUM_PRIO];
odp_spinlock_t mask_lock;
- prio_queue_t prio_q[NUM_PRIO][QUEUES_PER_PRIO];
+ prio_queue_t prio_q[NUM_SCHED_GRPS][NUM_PRIO][QUEUES_PER_PRIO];
odp_spinlock_t poll_cmd_lock;
/* Number of commands in a command queue */
@@ -206,8 +219,10 @@ typedef struct {
odp_shm_t shm;
uint32_t pri_count[NUM_PRIO][QUEUES_PER_PRIO];
- odp_spinlock_t grp_lock;
- odp_thrmask_t mask_all;
+ odp_thrmask_t mask_all;
+ odp_spinlock_t grp_lock;
+ odp_atomic_u32_t grp_epoch;
+
struct {
char name[ODP_SCHED_GROUP_NAME_LEN];
odp_thrmask_t mask;
@@ -215,6 +230,7 @@ typedef struct {
} sched_grp[NUM_SCHED_GRPS];
struct {
+ int grp;
int prio;
int queue_per_prio;
} queue[ODP_CONFIG_QUEUES];
@@ -237,17 +253,35 @@ static inline void schedule_release_context(void);
static void sched_local_init(void)
{
+ int i;
+ uint8_t id;
+ uint8_t offset = 0;
+
memset(&sched_local, 0, sizeof(sched_local_t));
sched_local.thr = odp_thread_id();
sched_local.queue = ODP_QUEUE_INVALID;
sched_local.queue_index = PRIO_QUEUE_EMPTY;
+
+ id = sched_local.thr & (QUEUES_PER_PRIO - 1);
+
+ for (i = 0; i < WEIGHT_TBL_SIZE; i++) {
+ sched_local.weight_tbl[i] = id;
+
+ if (i % PREFER_RATIO == 0) {
+ offset++;
+ sched_local.weight_tbl[i] = (id + offset) &
+ (QUEUES_PER_PRIO - 1);
+ if (offset == QUEUES_PER_PRIO - 1)
+ offset = 0;
+ }
+ }
}
static int schedule_init_global(void)
{
odp_shm_t shm;
- int i, j;
+ int i, j, grp;
ODP_DBG("Schedule init ... ");
@@ -267,15 +301,20 @@ static int schedule_init_global(void)
sched->shm = shm;
odp_spinlock_init(&sched->mask_lock);
- for (i = 0; i < NUM_PRIO; i++) {
- for (j = 0; j < QUEUES_PER_PRIO; j++) {
- int k;
+ for (grp = 0; grp < NUM_SCHED_GRPS; grp++) {
+ for (i = 0; i < NUM_PRIO; i++) {
+ for (j = 0; j < QUEUES_PER_PRIO; j++) {
+ prio_queue_t *prio_q;
+ int k;
- ring_init(&sched->prio_q[i][j].ring);
+ prio_q = &sched->prio_q[grp][i][j];
+ ring_init(&prio_q->ring);
- for (k = 0; k < PRIO_QUEUE_RING_SIZE; k++)
- sched->prio_q[i][j].queue_index[k] =
- PRIO_QUEUE_EMPTY;
+ for (k = 0; k < PRIO_QUEUE_RING_SIZE; k++) {
+ prio_q->queue_index[k] =
+ PRIO_QUEUE_EMPTY;
+ }
+ }
}
}
@@ -291,12 +330,17 @@ static int schedule_init_global(void)
sched->pktio_cmd[i].cmd_index = PKTIO_CMD_FREE;
odp_spinlock_init(&sched->grp_lock);
+ odp_atomic_init_u32(&sched->grp_epoch, 0);
for (i = 0; i < NUM_SCHED_GRPS; i++) {
memset(sched->sched_grp[i].name, 0, ODP_SCHED_GROUP_NAME_LEN);
odp_thrmask_zero(&sched->sched_grp[i].mask);
}
+ sched->sched_grp[ODP_SCHED_GROUP_ALL].allocated = 1;
+ sched->sched_grp[ODP_SCHED_GROUP_WORKER].allocated = 1;
+ sched->sched_grp[ODP_SCHED_GROUP_CONTROL].allocated = 1;
+
odp_thrmask_setall(&sched->mask_all);
ODP_DBG("done\n");
@@ -304,29 +348,38 @@ static int schedule_init_global(void)
return 0;
}
+static inline void queue_destroy_finalize(uint32_t qi)
+{
+ sched_cb_queue_destroy_finalize(qi);
+}
+
static int schedule_term_global(void)
{
int ret = 0;
int rc = 0;
- int i, j;
+ int i, j, grp;
- for (i = 0; i < NUM_PRIO; i++) {
- for (j = 0; j < QUEUES_PER_PRIO; j++) {
- ring_t *ring = &sched->prio_q[i][j].ring;
- uint32_t qi;
+ for (grp = 0; grp < NUM_SCHED_GRPS; grp++) {
+ for (i = 0; i < NUM_PRIO; i++) {
+ for (j = 0; j < QUEUES_PER_PRIO; j++) {
+ ring_t *ring = &sched->prio_q[grp][i][j].ring;
+ uint32_t qi;
- while ((qi = ring_deq(ring, PRIO_QUEUE_MASK)) !=
- RING_EMPTY) {
- odp_event_t events[1];
- int num;
+ while ((qi = ring_deq(ring, PRIO_QUEUE_MASK)) !=
+ RING_EMPTY) {
+ odp_event_t events[1];
+ int num;
- num = sched_cb_queue_deq_multi(qi, events, 1);
+ num = sched_cb_queue_deq_multi(qi,
+ events,
+ 1);
- if (num < 0)
- sched_cb_queue_destroy_finalize(qi);
+ if (num < 0)
+ queue_destroy_finalize(qi);
- if (num > 0)
- ODP_ERR("Queue not empty\n");
+ if (num > 0)
+ ODP_ERR("Queue not empty\n");
+ }
}
}
}
@@ -357,6 +410,40 @@ static int schedule_term_local(void)
return 0;
}
+static inline void grp_update_mask(int grp, const odp_thrmask_t *new_mask)
+{
+ odp_thrmask_copy(&sched->sched_grp[grp].mask, new_mask);
+ odp_atomic_add_rel_u32(&sched->grp_epoch, 1);
+}
+
+static inline int grp_update_tbl(void)
+{
+ int i;
+ int num = 0;
+ int thr = sched_local.thr;
+
+ odp_spinlock_lock(&sched->grp_lock);
+
+ for (i = 0; i < NUM_SCHED_GRPS; i++) {
+ if (sched->sched_grp[i].allocated == 0)
+ continue;
+
+ if (odp_thrmask_isset(&sched->sched_grp[i].mask, thr)) {
+ sched_local.grp[num] = i;
+ num++;
+ }
+ }
+
+ odp_spinlock_unlock(&sched->grp_lock);
+
+ /* Update group weights. Round robin over all thread's groups. */
+ for (i = 0; i < WEIGHT_TBL_SIZE; i++)
+ sched_local.grp_weight[i] = i % num;
+
+ sched_local.num_grp = num;
+ return num;
+}
+
static unsigned schedule_max_ordered_locks(void)
{
return MAX_ORDERED_LOCKS_PER_QUEUE;
@@ -407,6 +494,7 @@ static int schedule_init_queue(uint32_t queue_index,
int prio = sched_param->prio;
pri_set_queue(queue_index, prio);
+ sched->queue[queue_index].grp = sched_param->group;
sched->queue[queue_index].prio = prio;
sched->queue[queue_index].queue_per_prio = queue_per_prio(queue_index);
@@ -418,6 +506,7 @@ static void schedule_destroy_queue(uint32_t queue_index)
int prio = sched->queue[queue_index].prio;
pri_clr_queue(queue_index, prio);
+ sched->queue[queue_index].grp = 0;
sched->queue[queue_index].prio = 0;
sched->queue[queue_index].queue_per_prio = 0;
}
@@ -509,9 +598,10 @@ static void schedule_release_atomic(void)
uint32_t qi = sched_local.queue_index;
if (qi != PRIO_QUEUE_EMPTY && sched_local.num == 0) {
- int prio = sched->queue[qi].prio;
+ int grp = sched->queue[qi].grp;
+ int prio = sched->queue[qi].prio;
int queue_per_prio = sched->queue[qi].queue_per_prio;
- ring_t *ring = &sched->prio_q[prio][queue_per_prio].ring;
+ ring_t *ring = &sched->prio_q[grp][prio][queue_per_prio].ring;
/* Release current atomic queue */
ring_enq(ring, PRIO_QUEUE_MASK, qi);
@@ -640,7 +730,9 @@ static int schedule_ord_enq_multi(uint32_t queue_index, void *buf_hdr[],
return 0;
}
- if (odp_unlikely(stash_num >= MAX_ORDERED_STASH)) {
+ /* Pktout may drop packets, so the operation cannot be stashed. */
+ if (dst_queue->s.pktout.pktio != ODP_PKTIO_INVALID ||
+ odp_unlikely(stash_num >= MAX_ORDERED_STASH)) {
/* If the local stash is full, wait until it is our turn and
* then release the stash and do enqueue directly. */
wait_for_order(src_queue);
@@ -662,54 +754,26 @@ static int schedule_ord_enq_multi(uint32_t queue_index, void *buf_hdr[],
return 1;
}
-/*
- * Schedule queues
- */
-static int do_schedule(odp_queue_t *out_queue, odp_event_t out_ev[],
- unsigned int max_num)
+static inline int do_schedule_grp(odp_queue_t *out_queue, odp_event_t out_ev[],
+ unsigned int max_num, int grp, int first)
{
int prio, i;
int ret;
int id;
- int offset = 0;
unsigned int max_deq = MAX_DEQ;
uint32_t qi;
- if (sched_local.num) {
- ret = copy_events(out_ev, max_num);
-
- if (out_queue)
- *out_queue = sched_local.queue;
-
- return ret;
- }
-
- schedule_release_context();
-
- if (odp_unlikely(sched_local.pause))
- return 0;
-
- /* Each thread prefers a priority queue. This offset avoids starvation
- * of other priority queues on low thread counts. */
- if (odp_unlikely((sched_local.round & 0x3f) == 0)) {
- offset = sched_local.prefer_offset;
- sched_local.prefer_offset = (offset + 1) &
- (QUEUES_PER_PRIO - 1);
- }
-
- sched_local.round++;
-
/* Schedule events */
for (prio = 0; prio < NUM_PRIO; prio++) {
if (sched->pri_mask[prio] == 0)
continue;
- id = (sched_local.thr + offset) & (QUEUES_PER_PRIO - 1);
+ /* Select the first ring based on weights */
+ id = first;
for (i = 0; i < QUEUES_PER_PRIO;) {
int num;
- int grp;
int ordered;
odp_queue_t handle;
ring_t *ring;
@@ -726,7 +790,7 @@ static int do_schedule(odp_queue_t *out_queue, odp_event_t out_ev[],
}
/* Get queue index from the priority queue */
- ring = &sched->prio_q[prio][id].ring;
+ ring = &sched->prio_q[grp][prio][id].ring;
qi = ring_deq(ring, PRIO_QUEUE_MASK);
/* Priority queue empty */
@@ -736,24 +800,10 @@ static int do_schedule(odp_queue_t *out_queue, odp_event_t out_ev[],
continue;
}
- grp = sched_cb_queue_grp(qi);
-
- if (grp > ODP_SCHED_GROUP_ALL &&
- !odp_thrmask_isset(&sched->sched_grp[grp].mask,
- sched_local.thr)) {
- /* This thread is not eligible for work from
- * this queue, so continue scheduling it.
- */
- ring_enq(ring, PRIO_QUEUE_MASK, qi);
-
- i++;
- id++;
- continue;
- }
-
/* Low priorities have smaller batch size to limit
* head of line blocking latency. */
- if (odp_unlikely(prio > ODP_SCHED_PRIO_DEFAULT))
+ if (odp_unlikely(MAX_DEQ > 1 &&
+ prio > ODP_SCHED_PRIO_DEFAULT))
max_deq = MAX_DEQ / 2;
ordered = sched_cb_queue_is_ordered(qi);
@@ -818,6 +868,70 @@ static int do_schedule(odp_queue_t *out_queue, odp_event_t out_ev[],
}
}
+ return 0;
+}
+
+/*
+ * Schedule queues
+ */
+static inline int do_schedule(odp_queue_t *out_queue, odp_event_t out_ev[],
+ unsigned int max_num)
+{
+ int i, num_grp;
+ int ret;
+ int id, first, grp_id;
+ uint16_t round;
+ uint32_t epoch;
+
+ if (sched_local.num) {
+ ret = copy_events(out_ev, max_num);
+
+ if (out_queue)
+ *out_queue = sched_local.queue;
+
+ return ret;
+ }
+
+ schedule_release_context();
+
+ if (odp_unlikely(sched_local.pause))
+ return 0;
+
+ /* Each thread prefers a priority queue. Poll weight table avoids
+ * starvation of other priority queues on low thread counts. */
+ round = sched_local.round + 1;
+
+ if (odp_unlikely(round == WEIGHT_TBL_SIZE))
+ round = 0;
+
+ sched_local.round = round;
+ first = sched_local.weight_tbl[round];
+
+ epoch = odp_atomic_load_acq_u32(&sched->grp_epoch);
+ num_grp = sched_local.num_grp;
+
+ if (odp_unlikely(sched_local.grp_epoch != epoch)) {
+ num_grp = grp_update_tbl();
+ sched_local.grp_epoch = epoch;
+ }
+
+ grp_id = sched_local.grp_weight[round];
+
+ /* Schedule queues per group and priority */
+ for (i = 0; i < num_grp; i++) {
+ int grp;
+
+ grp = sched_local.grp[grp_id];
+ ret = do_schedule_grp(out_queue, out_ev, max_num, grp, first);
+
+ if (odp_likely(ret))
+ return ret;
+
+ grp_id++;
+ if (odp_unlikely(grp_id >= num_grp))
+ grp_id = 0;
+ }
+
/*
* Poll packet input when there are no events
* * Each thread starts the search for a poll command from its
@@ -1023,7 +1137,8 @@ static odp_schedule_group_t schedule_group_create(const char *name,
ODP_SCHED_GROUP_NAME_LEN - 1);
grp_name[ODP_SCHED_GROUP_NAME_LEN - 1] = 0;
}
- odp_thrmask_copy(&sched->sched_grp[i].mask, mask);
+
+ grp_update_mask(i, mask);
group = (odp_schedule_group_t)i;
sched->sched_grp[i].allocated = 1;
break;
@@ -1036,13 +1151,16 @@ static odp_schedule_group_t schedule_group_create(const char *name,
static int schedule_group_destroy(odp_schedule_group_t group)
{
+ odp_thrmask_t zero;
int ret;
+ odp_thrmask_zero(&zero);
+
odp_spinlock_lock(&sched->grp_lock);
if (group < NUM_SCHED_GRPS && group >= SCHED_GROUP_NAMED &&
sched->sched_grp[group].allocated) {
- odp_thrmask_zero(&sched->sched_grp[group].mask);
+ grp_update_mask(group, &zero);
memset(sched->sched_grp[group].name, 0,
ODP_SCHED_GROUP_NAME_LEN);
sched->sched_grp[group].allocated = 0;
@@ -1082,9 +1200,11 @@ static int schedule_group_join(odp_schedule_group_t group,
if (group < NUM_SCHED_GRPS && group >= SCHED_GROUP_NAMED &&
sched->sched_grp[group].allocated) {
- odp_thrmask_or(&sched->sched_grp[group].mask,
- &sched->sched_grp[group].mask,
- mask);
+ odp_thrmask_t new_mask;
+
+ odp_thrmask_or(&new_mask, &sched->sched_grp[group].mask, mask);
+ grp_update_mask(group, &new_mask);
+
ret = 0;
} else {
ret = -1;
@@ -1097,18 +1217,19 @@ static int schedule_group_join(odp_schedule_group_t group,
static int schedule_group_leave(odp_schedule_group_t group,
const odp_thrmask_t *mask)
{
+ odp_thrmask_t new_mask;
int ret;
+ odp_thrmask_xor(&new_mask, mask, &sched->mask_all);
+
odp_spinlock_lock(&sched->grp_lock);
if (group < NUM_SCHED_GRPS && group >= SCHED_GROUP_NAMED &&
sched->sched_grp[group].allocated) {
- odp_thrmask_t leavemask;
+ odp_thrmask_and(&new_mask, &sched->sched_grp[group].mask,
+ &new_mask);
+ grp_update_mask(group, &new_mask);
- odp_thrmask_xor(&leavemask, mask, &sched->mask_all);
- odp_thrmask_and(&sched->sched_grp[group].mask,
- &sched->sched_grp[group].mask,
- &leavemask);
ret = 0;
} else {
ret = -1;
@@ -1159,12 +1280,19 @@ static int schedule_group_info(odp_schedule_group_t group,
static int schedule_thr_add(odp_schedule_group_t group, int thr)
{
+ odp_thrmask_t mask;
+ odp_thrmask_t new_mask;
+
if (group < 0 || group >= SCHED_GROUP_NAMED)
return -1;
+ odp_thrmask_zero(&mask);
+ odp_thrmask_set(&mask, thr);
+
odp_spinlock_lock(&sched->grp_lock);
- odp_thrmask_set(&sched->sched_grp[group].mask, thr);
+ odp_thrmask_or(&new_mask, &sched->sched_grp[group].mask, &mask);
+ grp_update_mask(group, &new_mask);
odp_spinlock_unlock(&sched->grp_lock);
@@ -1173,12 +1301,20 @@ static int schedule_thr_add(odp_schedule_group_t group, int thr)
static int schedule_thr_rem(odp_schedule_group_t group, int thr)
{
+ odp_thrmask_t mask;
+ odp_thrmask_t new_mask;
+
if (group < 0 || group >= SCHED_GROUP_NAMED)
return -1;
+ odp_thrmask_zero(&mask);
+ odp_thrmask_set(&mask, thr);
+ odp_thrmask_xor(&new_mask, &mask, &sched->mask_all);
+
odp_spinlock_lock(&sched->grp_lock);
- odp_thrmask_clr(&sched->sched_grp[group].mask, thr);
+ odp_thrmask_and(&new_mask, &sched->sched_grp[group].mask, &new_mask);
+ grp_update_mask(group, &new_mask);
odp_spinlock_unlock(&sched->grp_lock);
@@ -1192,9 +1328,10 @@ static void schedule_prefetch(int num ODP_UNUSED)
static int schedule_sched_queue(uint32_t queue_index)
{
+ int grp = sched->queue[queue_index].grp;
int prio = sched->queue[queue_index].prio;
int queue_per_prio = sched->queue[queue_index].queue_per_prio;
- ring_t *ring = &sched->prio_q[prio][queue_per_prio].ring;
+ ring_t *ring = &sched->prio_q[grp][prio][queue_per_prio].ring;
ring_enq(ring, PRIO_QUEUE_MASK, queue_index);
return 0;