diff options
author | Ben Pfaff <blp@nicira.com> | 2009-11-04 15:02:32 -0800 |
---|---|---|
committer | Ben Pfaff <blp@nicira.com> | 2009-11-04 15:24:40 -0800 |
commit | c34b65c731a1b6dae014efe8895141e5b2fe758a (patch) | |
tree | 094548cb094eaacc6d6b12056282d2106759c8de /lib/stream.c | |
parent | 8a8eb867724ccbfe5e5130c5b604b51c86de3b9f (diff) |
stream: New library for bidirectional streams (e.g. TCP, SSL, Unix sockets).
This code is heavily based on the vconn code. Eventually we should make
the stream-based vconns (currently that's all of them) a wrapper around
streams, but I haven't done that yet.
SSL is not implemented yet.
Diffstat (limited to 'lib/stream.c')
-rw-r--r-- | lib/stream.c | 490 |
1 files changed, 490 insertions, 0 deletions
diff --git a/lib/stream.c b/lib/stream.c new file mode 100644 index 00000000..1a0ea7ba --- /dev/null +++ b/lib/stream.c @@ -0,0 +1,490 @@ +/* + * Copyright (c) 2008, 2009 Nicira Networks. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <config.h> +#include "stream-provider.h" +#include <assert.h> +#include <errno.h> +#include <inttypes.h> +#include <netinet/in.h> +#include <poll.h> +#include <stdlib.h> +#include <string.h> +#include "coverage.h" +#include "dynamic-string.h" +#include "flow.h" +#include "ofp-print.h" +#include "ofpbuf.h" +#include "openflow/nicira-ext.h" +#include "openflow/openflow.h" +#include "packets.h" +#include "poll-loop.h" +#include "random.h" +#include "util.h" + +#define THIS_MODULE VLM_stream +#include "vlog.h" + +/* State of an active stream.*/ +enum stream_state { + SCS_CONNECTING, /* Underlying stream is not connected. */ + SCS_CONNECTED, /* Connection established. */ + SCS_DISCONNECTED /* Connection failed or connection closed. */ +}; + +static struct stream_class *stream_classes[] = { + &tcp_stream_class, + &unix_stream_class, +}; + +static struct pstream_class *pstream_classes[] = { + &ptcp_pstream_class, + &punix_pstream_class, +}; + +/* Check the validity of the stream class structures. */ +static void +check_stream_classes(void) +{ +#ifndef NDEBUG + size_t i; + + for (i = 0; i < ARRAY_SIZE(stream_classes); i++) { + struct stream_class *class = stream_classes[i]; + assert(class->name != NULL); + assert(class->open != NULL); + if (class->close || class->recv || class->send || class->wait) { + assert(class->close != NULL); + assert(class->recv != NULL); + assert(class->send != NULL); + assert(class->wait != NULL); + } else { + /* This class delegates to another one. */ + } + } + + for (i = 0; i < ARRAY_SIZE(pstream_classes); i++) { + struct pstream_class *class = pstream_classes[i]; + assert(class->name != NULL); + assert(class->listen != NULL); + if (class->close || class->accept || class->wait) { + assert(class->close != NULL); + assert(class->accept != NULL); + assert(class->wait != NULL); + } else { + /* This class delegates to another one. */ + } + } +#endif +} + +/* Prints information on active (if 'active') and passive (if 'passive') + * connection methods supported by the stream. */ +void +stream_usage(const char *name, bool active, bool passive) +{ + /* Really this should be implemented via callbacks into the stream + * providers, but that seems too heavy-weight to bother with at the + * moment. */ + + printf("\n"); + if (active) { + printf("Active %s connection methods:\n", name); + printf(" tcp:IP:PORT " + "PORT at remote IP\n"); + printf(" unix:FILE " + "Unix domain socket named FILE\n"); + } + + if (passive) { + printf("Passive %s connection methods:\n", name); + printf(" ptcp:PORT[:IP] " + "listen to TCP PORT on IP\n"); + printf(" punix:FILE " + "listen on Unix domain socket FILE\n"); + } +} + +/* Attempts to connect a stream to a remote peer. 'name' is a connection name + * in the form "TYPE:ARGS", where TYPE is an active stream class's name and + * ARGS are stream class-specific. + * + * Returns 0 if successful, otherwise a positive errno value. If successful, + * stores a pointer to the new connection in '*streamp', otherwise a null + * pointer. */ +int +stream_open(const char *name, struct stream **streamp) +{ + size_t prefix_len; + size_t i; + + COVERAGE_INC(stream_open); + check_stream_classes(); + + *streamp = NULL; + prefix_len = strcspn(name, ":"); + if (prefix_len == strlen(name)) { + return EAFNOSUPPORT; + } + for (i = 0; i < ARRAY_SIZE(stream_classes); i++) { + struct stream_class *class = stream_classes[i]; + if (strlen(class->name) == prefix_len + && !memcmp(class->name, name, prefix_len)) { + struct stream *stream; + char *suffix_copy = xstrdup(name + prefix_len + 1); + int retval = class->open(name, suffix_copy, &stream); + free(suffix_copy); + if (!retval) { + assert(stream->state != SCS_CONNECTING + || stream->class->connect); + *streamp = stream; + } + return retval; + } + } + return EAFNOSUPPORT; +} + +int +stream_open_block(const char *name, struct stream **streamp) +{ + struct stream *stream; + int error; + + error = stream_open(name, &stream); + while (error == EAGAIN) { + stream_connect_wait(stream); + poll_block(); + error = stream_connect(stream); + assert(error != EINPROGRESS); + } + if (error) { + stream_close(stream); + *streamp = NULL; + } else { + *streamp = stream; + } + return error; +} + +/* Closes 'stream'. */ +void +stream_close(struct stream *stream) +{ + if (stream != NULL) { + char *name = stream->name; + (stream->class->close)(stream); + free(name); + } +} + +/* Returns the name of 'stream', that is, the string passed to + * stream_open(). */ +const char * +stream_get_name(const struct stream *stream) +{ + return stream ? stream->name : "(null)"; +} + +/* Returns the IP address of the peer, or 0 if the peer is not connected over + * an IP-based protocol or if its IP address is not yet known. */ +uint32_t +stream_get_remote_ip(const struct stream *stream) +{ + return stream->remote_ip; +} + +/* Returns the transport port of the peer, or 0 if the connection does not + * contain a port or if the port is not yet known. */ +uint16_t +stream_get_remote_port(const struct stream *stream) +{ + return stream->remote_port; +} + +/* Returns the IP address used to connect to the peer, or 0 if the connection + * is not an IP-based protocol or if its IP address is not yet known. */ +uint32_t +stream_get_local_ip(const struct stream *stream) +{ + return stream->local_ip; +} + +/* Returns the transport port used to connect to the peer, or 0 if the + * connection does not contain a port or if the port is not yet known. */ +uint16_t +stream_get_local_port(const struct stream *stream) +{ + return stream->local_port; +} + +static void +scs_connecting(struct stream *stream) +{ + int retval = (stream->class->connect)(stream); + assert(retval != EINPROGRESS); + if (!retval) { + stream->state = SCS_CONNECTED; + } else if (retval != EAGAIN) { + stream->state = SCS_DISCONNECTED; + stream->error = retval; + } +} + +/* Tries to complete the connection on 'stream', which must be an active + * stream. If 'stream''s connection is complete, returns 0 if the connection + * was successful or a positive errno value if it failed. If the + * connection is still in progress, returns EAGAIN. */ +int +stream_connect(struct stream *stream) +{ + enum stream_state last_state; + + do { + last_state = stream->state; + switch (stream->state) { + case SCS_CONNECTING: + scs_connecting(stream); + break; + + case SCS_CONNECTED: + return 0; + + case SCS_DISCONNECTED: + return stream->error; + + default: + NOT_REACHED(); + } + } while (stream->state != last_state); + + return EAGAIN; +} + +/* Tries to receive up to 'n' bytes from 'stream' into 'buffer', and returns: + * + * - If successful, the number of bytes received (between 1 and 'n'). + * + * - On error, a negative errno value. + * + * - 0, if the connection has been closed in the normal fashion, or if 'n' + * is zero. + * + * The recv function will not block waiting for a packet to arrive. If no + * data have been received, it returns -EAGAIN immediately. */ +int +stream_recv(struct stream *stream, void *buffer, size_t n) +{ + int retval = stream_connect(stream); + return (retval ? -retval + : n == 0 ? 0 + : (stream->class->recv)(stream, buffer, n)); +} + +/* Tries to send up to 'n' bytes of 'buffer' on 'stream', and returns: + * + * - If successful, the number of bytes sent (between 1 and 'n'). 0 is + * only a valid return value if 'n' is 0. + * + * - On error, a negative errno value. + * + * The send function will not block. If no bytes can be immediately accepted + * for transmission, it returns -EAGAIN immediately. */ +int +stream_send(struct stream *stream, const void *buffer, size_t n) +{ + int retval = stream_connect(stream); + return (retval ? -retval + : n == 0 ? 0 + : (stream->class->send)(stream, buffer, n)); +} + +void +stream_wait(struct stream *stream, enum stream_wait_type wait) +{ + assert(wait == STREAM_CONNECT || wait == STREAM_RECV + || wait == STREAM_SEND); + + switch (stream->state) { + case SCS_CONNECTING: + wait = STREAM_CONNECT; + break; + + case SCS_DISCONNECTED: + poll_immediate_wake(); + return; + } + (stream->class->wait)(stream, wait); +} + +void +stream_connect_wait(struct stream *stream) +{ + stream_wait(stream, STREAM_CONNECT); +} + +void +stream_recv_wait(struct stream *stream) +{ + stream_wait(stream, STREAM_RECV); +} + +void +stream_send_wait(struct stream *stream) +{ + stream_wait(stream, STREAM_SEND); +} + +/* Attempts to start listening for remote stream connections. 'name' is a + * connection name in the form "TYPE:ARGS", where TYPE is an passive stream + * class's name and ARGS are stream class-specific. + * + * Returns 0 if successful, otherwise a positive errno value. If successful, + * stores a pointer to the new connection in '*pstreamp', otherwise a null + * pointer. */ +int +pstream_open(const char *name, struct pstream **pstreamp) +{ + size_t prefix_len; + size_t i; + + check_stream_classes(); + + *pstreamp = NULL; + prefix_len = strcspn(name, ":"); + if (prefix_len == strlen(name)) { + return EAFNOSUPPORT; + } + for (i = 0; i < ARRAY_SIZE(pstream_classes); i++) { + struct pstream_class *class = pstream_classes[i]; + if (strlen(class->name) == prefix_len + && !memcmp(class->name, name, prefix_len)) { + char *suffix_copy = xstrdup(name + prefix_len + 1); + int retval = class->listen(name, suffix_copy, pstreamp); + free(suffix_copy); + if (retval) { + *pstreamp = NULL; + } + return retval; + } + } + return EAFNOSUPPORT; +} + +/* Returns the name that was used to open 'pstream'. The caller must not + * modify or free the name. */ +const char * +pstream_get_name(const struct pstream *pstream) +{ + return pstream->name; +} + +/* Closes 'pstream'. */ +void +pstream_close(struct pstream *pstream) +{ + if (pstream != NULL) { + char *name = pstream->name; + (pstream->class->close)(pstream); + free(name); + } +} + +/* Tries to accept a new connection on 'pstream'. If successful, stores the + * new connection in '*new_stream' and returns 0. Otherwise, returns a + * positive errno value. + * + * pstream_accept() will not block waiting for a connection. If no connection + * is ready to be accepted, it returns EAGAIN immediately. */ +int +pstream_accept(struct pstream *pstream, struct stream **new_stream) +{ + int retval = (pstream->class->accept)(pstream, new_stream); + if (retval) { + *new_stream = NULL; + } else { + assert((*new_stream)->state != SCS_CONNECTING + || (*new_stream)->class->connect); + } + return retval; +} + +void +pstream_wait(struct pstream *pstream) +{ + (pstream->class->wait)(pstream); +} + +/* Initializes 'stream' as a new stream named 'name', implemented via 'class'. + * The initial connection status, supplied as 'connect_status', is interpreted + * as follows: + * + * - 0: 'stream' is connected. Its 'send' and 'recv' functions may be + * called in the normal fashion. + * + * - EAGAIN: 'stream' is trying to complete a connection. Its 'connect' + * function should be called to complete the connection. + * + * - Other positive errno values indicate that the connection failed with + * the specified error. + * + * After calling this function, stream_close() must be used to destroy + * 'stream', otherwise resources will be leaked. + * + * The caller retains ownership of 'name'. */ +void +stream_init(struct stream *stream, struct stream_class *class, + int connect_status, const char *name) +{ + stream->class = class; + stream->state = (connect_status == EAGAIN ? SCS_CONNECTING + : !connect_status ? SCS_CONNECTED + : SCS_DISCONNECTED); + stream->error = connect_status; + stream->name = xstrdup(name); +} + +void +stream_set_remote_ip(struct stream *stream, uint32_t ip) +{ + stream->remote_ip = ip; +} + +void +stream_set_remote_port(struct stream *stream, uint16_t port) +{ + stream->remote_port = port; +} + +void +stream_set_local_ip(struct stream *stream, uint32_t ip) +{ + stream->local_ip = ip; +} + +void +stream_set_local_port(struct stream *stream, uint16_t port) +{ + stream->local_port = port; +} + +void +pstream_init(struct pstream *pstream, struct pstream_class *class, + const char *name) +{ + pstream->class = class; + pstream->name = xstrdup(name); +} |