aboutsummaryrefslogtreecommitdiff
path: root/lib/worker.c
blob: ce4a53b26051505d5e995b335c7cbee87e72adef (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
/* Copyright (c) 2012, 2013 Nicira, Inc.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at:
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include <config.h>

#include "worker.h"

#include <assert.h>
#include <errno.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/uio.h>
#include <sys/wait.h>
#include <unistd.h>

#include "command-line.h"
#include "daemon.h"
#include "ofpbuf.h"
#include "poll-loop.h"
#include "socket-util.h"
#include "util.h"
#include "vlog.h"

VLOG_DEFINE_THIS_MODULE(worker);

/* ovs_assert() logs the assertion message and logging sometimes goes through a
 * worker, so using ovs_assert() in this source file could cause recursion. */
#undef ovs_assert
#define ovs_assert use_assert_instead_of_ovs_assert_in_this_module

/* Header for an RPC request. */
struct worker_request {
    size_t request_len;              /* Length of the payload in bytes. */
    worker_request_func *request_cb; /* Function to call in worker process. */
    worker_reply_func *reply_cb;     /* Function to call in main process. */
    void *reply_aux;                 /* Auxiliary data for 'reply_cb'. */
};

/* Header for an RPC reply. */
struct worker_reply {
    size_t reply_len;            /* Length of the payload in bytes. */
    worker_reply_func *reply_cb; /* Function to call in main process. */
    void *reply_aux;             /* Auxiliary data for 'reply_cb'. */
};

/* Receive buffer for a RPC request or reply. */
struct rxbuf {
    /* Header. */
    struct ofpbuf header;       /* Header data. */
    int fds[SOUTIL_MAX_FDS];    /* File descriptors. */
    size_t n_fds;

    /* Payload. */
    struct ofpbuf payload;      /* Payload data. */
};

static int client_sock = -1;
static struct rxbuf client_rx;

static void rxbuf_init(struct rxbuf *);
static void rxbuf_clear(struct rxbuf *);
static int rxbuf_run(struct rxbuf *, int sock, size_t header_len);

static struct iovec *prefix_iov(void *data, size_t len,
                                const struct iovec *iovs, size_t n_iovs);

static void worker_broke(void);

static void worker_main(int fd) NO_RETURN;

/* Starts a worker process as a subprocess of the current process.  Currently
 * only a single worker process is supported, so this function may only be
 * called once.
 *
 * The client should call worker_run() and worker_wait() from its main loop.
 *
 * Call this function between daemonize_start() and daemonize_complete(). */
void
worker_start(void)
{
    int work_fds[2];

    assert(client_sock < 0);

    /* Create non-blocking socket pair. */
    xsocketpair(AF_UNIX, SOCK_STREAM, 0, work_fds);
    xset_nonblocking(work_fds[0]);
    xset_nonblocking(work_fds[1]);

    if (!fork_and_clean_up()) {
        /* In child (worker) process. */
        daemonize_post_detach();
        close(work_fds[0]);
        worker_main(work_fds[1]);
        NOT_REACHED();
    }

    /* In parent (main) process. */
    close(work_fds[1]);
    client_sock = work_fds[0];
    rxbuf_init(&client_rx);
}

/* Returns true if this process has started a worker and the worker is not
 * known to have malfunctioned. */
bool
worker_is_running(void)
{
    return client_sock >= 0;
}

/* If a worker process was started, processes RPC replies from it, calling the
 * registered 'reply_cb' callbacks.
 *
 * If the worker process died or malfunctioned, aborts. */
void
worker_run(void)
{
    if (worker_is_running()) {
        int error;

        error = rxbuf_run(&client_rx, client_sock,
                          sizeof(struct worker_reply));
        if (!error) {
            struct worker_reply *reply = client_rx.header.data;
            reply->reply_cb(&client_rx.payload, client_rx.fds,
                            client_rx.n_fds, reply->reply_aux);
            rxbuf_clear(&client_rx);
        } else if (error != EAGAIN) {
            worker_broke();
            VLOG_ABORT("receive from worker failed (%s)",
                       ovs_retval_to_string(error));
        }
    }
}

/* Causes the poll loop to wake up if we need to process RPC replies. */
void
worker_wait(void)
{
    if (worker_is_running()) {
        poll_fd_wait(client_sock, POLLIN);
    }
}

/* Interface for main process to interact with the worker. */

/* Sends an RPC request to the worker process.  The worker process will call
 * 'request_cb' passing the 'size' (zero or more) bytes of data in 'data' as
 * arguments as well as the 'n_fds' (SOUTIL_MAX_FDS or fewer) file descriptors
 * in 'fds'.
 *
 * If and only if 'reply_cb' is nonnull, 'request_cb' must call worker_reply()
 * or worker_reply_iovec() with a reply.  The main process will later call
 * 'reply_cb' with the reply data (if any) and file descriptors (if any).
 *
 * 'request_cb' receives copies (as if by dup()) of the file descriptors in
 * fds[].  'request_cb' takes ownership of these copies, and the caller of
 * worker_request() retains its ownership of the originals.
 *
 * This function may block until the RPC request has been sent (if the socket
 * buffer fills up) but it does not wait for the reply (if any).  If this
 * function blocks, it may invoke reply callbacks for previous requests.
 *
 * The worker process executes RPC requests in strict order of submission and
 * runs each request to completion before beginning the next request.  The main
 * process invokes reply callbacks in strict order of request submission. */
void
worker_request(const void *data, size_t size,
               const int fds[], size_t n_fds,
               worker_request_func *request_cb,
               worker_reply_func *reply_cb, void *aux)
{
    if (size > 0) {
        struct iovec iov;

        iov.iov_base = (void *) data;
        iov.iov_len = size;
        worker_request_iovec(&iov, 1, fds, n_fds, request_cb, reply_cb, aux);
    } else {
        worker_request_iovec(NULL, 0, fds, n_fds, request_cb, reply_cb, aux);
    }
}

static int
worker_send_iovec(const struct iovec iovs[], size_t n_iovs,
                  const int fds[], size_t n_fds)
{
    size_t sent = 0;

    for (;;) {
        struct pollfd pfd;
        int error;

        /* Try to send the rest of the request. */
        error = send_iovec_and_fds_fully(client_sock, iovs, n_iovs,
                                         fds, n_fds, sent, &sent);
        if (error != EAGAIN) {
            return error;
        }

        /* Process replies to avoid deadlock. */
        worker_run();

        /* Wait for 'client_sock' to become ready before trying again.  We
         * can't use poll_block() because it sometimes calls into vlog, which
         * calls indirectly into worker_send_iovec().  To be usable here,
         * poll_block() would therefore need to be reentrant, but it isn't
         * (calling it recursively causes memory corruption and an eventual
         * crash). */
        pfd.fd = client_sock;
        pfd.events = POLLIN | POLLOUT;
        do {
            error = poll(&pfd, 1, -1) < 0 ? errno : 0;
        } while (error == EINTR);
        if (error) {
            worker_broke();
            VLOG_ABORT("poll failed (%s)", strerror(error));
        }
    }
}

/* Same as worker_request() except that the data to send is specified as an
 * array of iovecs. */
void
worker_request_iovec(const struct iovec iovs[], size_t n_iovs,
                     const int fds[], size_t n_fds,
                     worker_request_func *request_cb,
                     worker_reply_func *reply_cb, void *aux)
{
    static bool recursing = false;
    struct worker_request rq;
    struct iovec *all_iovs;
    int error;

    assert(worker_is_running());
    assert(!recursing);
    recursing = true;

    rq.request_len = iovec_len(iovs, n_iovs);
    rq.request_cb = request_cb;
    rq.reply_cb = reply_cb;
    rq.reply_aux = aux;

    all_iovs = prefix_iov(&rq, sizeof rq, iovs, n_iovs);
    error = worker_send_iovec(all_iovs, n_iovs + 1, fds, n_fds);
    if (error) {
        worker_broke();
        VLOG_ABORT("send failed (%s)", strerror(error));
    }
    free(all_iovs);

    recursing = false;
}

/* Closes the client socket, if any, so that worker_is_running() will return
 * false.
 *
 * The client does this just before aborting if the worker process dies or
 * malfunctions, to prevent the logging subsystem from trying to use the
 * worker to log the failure. */
static void
worker_broke(void)
{
    if (client_sock >= 0) {
        close(client_sock);
        client_sock = -1;
    }
}

/* Interfaces for RPC implementations (running in the worker process). */

static int server_sock = -1;
static bool expect_reply;
static struct worker_request request;

/* When a call to worker_request() or worker_request_iovec() provides a
 * 'reply_cb' callback, the 'request_cb' implementation must call this function
 * to send its reply.  The main process will call 'reply_cb' passing the
 * 'size' (zero or more) bytes of data in 'data' as arguments as well as the
 * 'n_fds' (SOUTIL_MAX_FDS or fewer) file descriptors in 'fds'.
 *
 * If a call to worker_request() or worker_request_iovec() provides no
 * 'reply_cb' callback, the 'request_cb' implementation must not call this
 * function.
 *
 * 'reply_cb' receives copies (as if by dup()) of the file descriptors in
 * fds[].  'reply_cb' takes ownership of these copies, and the caller of
 * worker_reply() retains its ownership of the originals.
 *
 * This function blocks until the RPC reply has been sent (if the socket buffer
 * fills up) but it does not wait for the main process to receive or to process
 * the reply. */
void
worker_reply(const void *data, size_t size, const int fds[], size_t n_fds)
{
    if (size > 0) {
        struct iovec iov;

        iov.iov_base = (void *) data;
        iov.iov_len = size;
        worker_reply_iovec(&iov, 1, fds, n_fds);
    } else {
        worker_reply_iovec(NULL, 0, fds, n_fds);
    }
}

/* Same as worker_reply() except that the data to send is specified as an array
 * of iovecs. */
void
worker_reply_iovec(const struct iovec *iovs, size_t n_iovs,
                       const int fds[], size_t n_fds)
{
    struct worker_reply reply;
    struct iovec *all_iovs;
    int error;

    assert(expect_reply);
    expect_reply = false;

    reply.reply_len = iovec_len(iovs, n_iovs);
    reply.reply_cb = request.reply_cb;
    reply.reply_aux = request.reply_aux;

    all_iovs = prefix_iov(&reply, sizeof reply, iovs, n_iovs);

    error = send_iovec_and_fds_fully_block(server_sock, all_iovs, n_iovs + 1,
                                           fds, n_fds);
    if (error == EPIPE) {
        /* Parent probably died.  Continue processing any RPCs still buffered,
         * to avoid missing log messages. */
        VLOG_INFO("send failed (%s)", strerror(error));
    } else if (error) {
        VLOG_FATAL("send failed (%s)", strerror(error));
    }

    free(all_iovs);
}

static void
worker_main(int fd)
{
    struct rxbuf rx;

    server_sock = fd;

    subprogram_name = "worker";
    proctitle_set("worker process for pid %lu", (unsigned long int) getppid());
    VLOG_INFO("worker process started");

    rxbuf_init(&rx);
    for (;;) {
        int error;

        error = rxbuf_run(&rx, server_sock, sizeof(struct worker_request));
        if (!error) {
            request = *(struct worker_request *) rx.header.data;

            expect_reply = request.reply_cb != NULL;
            request.request_cb(&rx.payload, rx.fds, rx.n_fds);
            assert(!expect_reply);

            rxbuf_clear(&rx);
        } else if (error == EOF && !rx.header.size) {
            /* Main process closed the IPC socket.  Exit cleanly. */
            break;
        } else if (error != EAGAIN) {
            VLOG_FATAL("RPC receive failed (%s)", ovs_retval_to_string(error));
        }

        poll_fd_wait(server_sock, POLLIN);
        poll_block();
    }

    VLOG_INFO("worker process exiting");
    exit(0);
}

static void
rxbuf_init(struct rxbuf *rx)
{
    ofpbuf_init(&rx->header, 0);
    rx->n_fds = 0;
    ofpbuf_init(&rx->payload, 0);
}

static void
rxbuf_clear(struct rxbuf *rx)
{
    ofpbuf_clear(&rx->header);
    rx->n_fds = 0;
    ofpbuf_clear(&rx->payload);
}

static int
rxbuf_run(struct rxbuf *rx, int sock, size_t header_len)
{
    for (;;) {
        if (!rx->header.size) {
            int retval;

            ofpbuf_clear(&rx->header);
            ofpbuf_prealloc_tailroom(&rx->header, header_len);

            retval = recv_data_and_fds(sock, rx->header.data, header_len,
                                       rx->fds, &rx->n_fds);
            if (retval <= 0) {
                return retval ? -retval : EOF;
            }
            rx->header.size += retval;
        } else if (rx->header.size < header_len) {
            size_t bytes_read;
            int error;

            error = read_fully(sock, ofpbuf_tail(&rx->header),
                               header_len - rx->header.size, &bytes_read);
            rx->header.size += bytes_read;
            if (error) {
                return error;
            }
        } else {
            size_t payload_len = *(size_t *) rx->header.data;

            if (rx->payload.size < payload_len) {
                size_t left = payload_len - rx->payload.size;
                size_t bytes_read;
                int error;

                ofpbuf_prealloc_tailroom(&rx->payload, left);
                error = read_fully(sock, ofpbuf_tail(&rx->payload), left,
                                   &bytes_read);
                rx->payload.size += bytes_read;
                if (error) {
                    return error;
                }
            } else {
                return 0;
            }
        }
    }
}

static struct iovec *
prefix_iov(void *data, size_t len, const struct iovec *iovs, size_t n_iovs)
{
    struct iovec *dst;

    dst = xmalloc((n_iovs + 1) * sizeof *dst);
    dst[0].iov_base = data;
    dst[0].iov_len = len;
    memcpy(dst + 1, iovs, n_iovs * sizeof *iovs);

    return dst;
}