aboutsummaryrefslogtreecommitdiff
path: root/secchan/pinsched.c
blob: 0afd22ff645958861172c51be308349deb6f6f07 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
/*
 * Copyright (c) 2008, 2009 Nicira Networks.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at:
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include <config.h>
#include "pinsched.h"
#include <arpa/inet.h>
#include <stdlib.h>
#include "ofpbuf.h"
#include "openflow/openflow.h"
#include "poll-loop.h"
#include "port-array.h"
#include "queue.h"
#include "random.h"
#include "rconn.h"
#include "status.h"
#include "timeval.h"
#include "vconn.h"

struct pinsched {
    /* Client-supplied parameters. */
    int rate_limit;           /* Packets added to bucket per second. */
    int burst_limit;          /* Maximum token bucket size, in packets. */

    /* One queue per physical port. */
    struct port_array queues;   /* Array of "struct ovs_queue *". */
    int n_queued;               /* Sum over queues[*].n. */
    unsigned int last_tx_port;  /* Last port checked in round-robin. */

    /* Token bucket.
     *
     * It costs 1000 tokens to send a single packet_in message.  A single token
     * per message would be more straightforward, but this choice lets us avoid
     * round-off error in refill_bucket()'s calculation of how many tokens to
     * add to the bucket, since no division step is needed. */
    long long int last_fill;    /* Time at which we last added tokens. */
    int tokens;                 /* Current number of tokens. */

    /* Transmission queue. */
    int n_txq;                  /* No. of packets waiting in rconn for tx. */

    /* Statistics reporting. */
    unsigned long long n_normal;        /* # txed w/o rate limit queuing. */
    unsigned long long n_limited;       /* # queued for rate limiting. */
    unsigned long long n_queue_dropped; /* # dropped due to queue overflow. */

    /* Switch status. */
    struct status_category *ss_cat;
};

static struct ofpbuf *
dequeue_packet(struct pinsched *ps, struct ovs_queue *q,
               unsigned int port_no)
{
    struct ofpbuf *packet = queue_pop_head(q);
    if (!q->n) {
        free(q);
        port_array_set(&ps->queues, port_no, NULL);
    }
    ps->n_queued--;
    return packet;
}

/* Drop a packet from the longest queue in 'ps'. */
static void
drop_packet(struct pinsched *ps)
{
    struct ovs_queue *longest;  /* Queue currently selected as longest. */
    int n_longest;              /* # of queues of same length as 'longest'. */
    unsigned int longest_port_no;
    unsigned int port_no;
    struct ovs_queue *q;

    ps->n_queue_dropped++;

    longest = port_array_first(&ps->queues, &port_no);
    longest_port_no = port_no;
    n_longest = 1;
    while ((q = port_array_next(&ps->queues, &port_no)) != NULL) {
        if (longest->n < q->n) {
            longest = q;
            n_longest = 1;
        } else if (longest->n == q->n) {
            n_longest++;

            /* Randomly select one of the longest queues, with a uniform
             * distribution (Knuth algorithm 3.4.2R). */
            if (!random_range(n_longest)) {
                longest = q;
                longest_port_no = port_no;
            }
        }
    }

    /* FIXME: do we want to pop the tail instead? */
    ofpbuf_delete(dequeue_packet(ps, longest, longest_port_no));
}

/* Remove and return the next packet to transmit (in round-robin order). */
static struct ofpbuf *
get_tx_packet(struct pinsched *ps)
{
    struct ovs_queue *q = port_array_next(&ps->queues, &ps->last_tx_port);
    if (!q) {
        q = port_array_first(&ps->queues, &ps->last_tx_port);
    }
    return dequeue_packet(ps, q, ps->last_tx_port);
}

/* Add tokens to the bucket based on elapsed time. */
static void
refill_bucket(struct pinsched *ps)
{
    long long int now = time_msec();
    long long int tokens = (now - ps->last_fill) * ps->rate_limit + ps->tokens;
    if (tokens >= 1000) {
        ps->last_fill = now;
        ps->tokens = MIN(tokens, ps->burst_limit * 1000);
    }
}

/* Attempts to remove enough tokens from 'ps' to transmit a packet.  Returns
 * true if successful, false otherwise.  (In the latter case no tokens are
 * removed.) */
static bool
get_token(struct pinsched *ps)
{
    if (ps->tokens >= 1000) {
        ps->tokens -= 1000;
        return true;
    } else {
        return false;
    }
}

void
pinsched_send(struct pinsched *ps, uint16_t port_no,
              struct ofpbuf *packet, pinsched_tx_cb *cb, void *aux)
{
    if (!ps) {
        cb(packet, aux);
    } else if (!ps->n_queued && get_token(ps)) {
        /* In the common case where we are not constrained by the rate limit,
         * let the packet take the normal path. */
        ps->n_normal++;
        cb(packet, aux);
    } else {
        /* Otherwise queue it up for the periodic callback to drain out. */
        struct ovs_queue *q;

        /* We are called with a buffer obtained from dpif_recv() that has much
         * more allocated space than actual content most of the time.  Since
         * we're going to store the packet for some time, free up that
         * otherwise wasted space. */
        ofpbuf_trim(packet);

        if (ps->n_queued >= ps->burst_limit) {
            drop_packet(ps);
        }
        q = port_array_get(&ps->queues, port_no);
        if (!q) {
            q = xmalloc(sizeof *q);
            queue_init(q);
            port_array_set(&ps->queues, port_no, q);
        }
        queue_push_tail(q, packet);
        ps->n_queued++;
        ps->n_limited++;
    }
}

static void
pinsched_status_cb(struct status_reply *sr, void *ps_)
{
    struct pinsched *ps = ps_;

    status_reply_put(sr, "normal=%llu", ps->n_normal);
    status_reply_put(sr, "limited=%llu", ps->n_limited);
    status_reply_put(sr, "queue-dropped=%llu", ps->n_queue_dropped);
}

void
pinsched_run(struct pinsched *ps, pinsched_tx_cb *cb, void *aux)
{
    if (ps) {
        int i;

        /* Drain some packets out of the bucket if possible, but limit the
         * number of iterations to allow other code to get work done too. */
        refill_bucket(ps);
        for (i = 0; ps->n_queued && get_token(ps) && i < 50; i++) {
            cb(get_tx_packet(ps), aux);
        }
    }
}

void
pinsched_wait(struct pinsched *ps)
{
    if (ps && ps->n_queued) {
        if (ps->tokens >= 1000) {
            /* We can transmit more packets as soon as we're called again. */
            poll_immediate_wake();
        } else {
            /* We have to wait for the bucket to re-fill.  We could calculate
             * the exact amount of time here for increased smoothness. */
            poll_timer_wait(TIME_UPDATE_INTERVAL / 2);
        }
    }
}

/* Creates and returns a scheduler for sending packet-in messages. */
struct pinsched *
pinsched_create(int rate_limit, int burst_limit, struct switch_status *ss)
{
    struct pinsched *ps;

    ps = xcalloc(1, sizeof *ps);
    port_array_init(&ps->queues);
    ps->n_queued = 0;
    ps->last_tx_port = PORT_ARRAY_SIZE;
    ps->last_fill = time_msec();
    ps->tokens = rate_limit * 100;
    ps->n_txq = 0;
    ps->n_normal = 0;
    ps->n_limited = 0;
    ps->n_queue_dropped = 0;
    pinsched_set_limits(ps, rate_limit, burst_limit);

    if (ss) {
        ps->ss_cat = switch_status_register(ss, "rate-limit",
                                            pinsched_status_cb, ps);
    }

    return ps;
}

void
pinsched_destroy(struct pinsched *ps)
{
    if (ps) {
        struct ovs_queue *queue;
        unsigned int port_no;

        PORT_ARRAY_FOR_EACH (queue, &ps->queues, port_no) {
            queue_destroy(queue);
            free(queue);
        }
        port_array_destroy(&ps->queues);
        switch_status_unregister(ps->ss_cat);
        free(ps);
    }
}

void
pinsched_set_limits(struct pinsched *ps, int rate_limit, int burst_limit)
{
    if (rate_limit <= 0) {
        rate_limit = 1000;
    }
    if (burst_limit <= 0) {
        burst_limit = rate_limit / 4;
    }
    burst_limit = MAX(burst_limit, 1);
    burst_limit = MIN(burst_limit, INT_MAX / 1000);

    ps->rate_limit = rate_limit;
    ps->burst_limit = burst_limit;
    while (ps->n_queued > burst_limit) {
        drop_packet(ps);
    }
}