aboutsummaryrefslogtreecommitdiff
path: root/ofproto
diff options
context:
space:
mode:
authorBen Pfaff <blp@nicira.com>2012-07-12 14:18:05 -0700
committerBen Pfaff <blp@nicira.com>2012-07-12 14:18:05 -0700
commit2b07c8b182b76e4e3a162796d3ae273ef51d4131 (patch)
treea27e0faafd0f228ec8199b47b1fea027efc42c3f /ofproto
parente2a3d183f60b93265c095ede1379194916444822 (diff)
ofproto: New feature to notify controllers of flow table changes.
OpenFlow switching monitoring and controller coordination can be made more efficient if the switch can notify a controller of flow table changes as they occur, rather than periodically polling for changes. This commit implements such a feature. Feature #6633. CC: Natasha Gude <natasha@nicira.com> Signed-off-by: Ben Pfaff <blp@nicira.com>
Diffstat (limited to 'ofproto')
-rw-r--r--ofproto/connmgr.c260
-rw-r--r--ofproto/connmgr.h32
-rw-r--r--ofproto/ofproto-provider.h11
-rw-r--r--ofproto/ofproto.c356
4 files changed, 642 insertions, 17 deletions
diff --git a/ofproto/connmgr.c b/ofproto/connmgr.c
index 3e750d24..b70b0708 100644
--- a/ofproto/connmgr.c
+++ b/ofproto/connmgr.c
@@ -88,6 +88,13 @@ struct ofconn {
* that the message might be generated, a 0-bit disables it. */
uint32_t master_async_config[OAM_N_TYPES]; /* master, other */
uint32_t slave_async_config[OAM_N_TYPES]; /* slave */
+
+ /* Flow monitors. */
+ struct hmap monitors; /* Contains "struct ofmonitor"s. */
+ struct list updates; /* List of "struct ofpbuf"s. */
+ bool sent_abbrev_update; /* Does 'updates' contain NXFME_ABBREV? */
+ struct rconn_packet_counter *monitor_counter;
+ uint64_t monitor_paused;
};
static struct ofconn *ofconn_create(struct connmgr *, struct rconn *,
@@ -162,6 +169,8 @@ struct connmgr {
static void update_in_band_remotes(struct connmgr *);
static void add_snooper(struct connmgr *, struct vconn *);
+static void ofmonitor_run(struct connmgr *);
+static void ofmonitor_wait(struct connmgr *);
/* Creates and returns a new connection manager owned by 'ofproto'. 'name' is
* a name for the ofproto suitable for using in log messages.
@@ -267,6 +276,7 @@ connmgr_run(struct connmgr *mgr,
LIST_FOR_EACH_SAFE (ofconn, next_ofconn, node, &mgr->all_conns) {
ofconn_run(ofconn, handle_openflow);
}
+ ofmonitor_run(mgr);
/* Fail-open maintenance. Do this after processing the ofconns since
* fail-open checks the status of the controller rconn. */
@@ -326,6 +336,7 @@ connmgr_wait(struct connmgr *mgr, bool handling_openflow)
LIST_FOR_EACH (ofconn, node, &mgr->all_conns) {
ofconn_wait(ofconn, handling_openflow);
}
+ ofmonitor_wait(mgr);
if (handling_openflow && mgr->in_band) {
in_band_wait(mgr->in_band);
}
@@ -1002,6 +1013,9 @@ ofconn_create(struct connmgr *mgr, struct rconn *rconn, enum ofconn_type type,
list_init(&ofconn->opgroups);
+ hmap_init(&ofconn->monitors);
+ list_init(&ofconn->updates);
+
ofconn_flush(ofconn);
return ofconn;
@@ -1012,6 +1026,7 @@ ofconn_create(struct connmgr *mgr, struct rconn *rconn, enum ofconn_type type,
static void
ofconn_flush(struct ofconn *ofconn)
{
+ struct ofmonitor *monitor, *next_monitor;
int i;
ofconn->role = NX_ROLE_OTHER;
@@ -1080,6 +1095,14 @@ ofconn_flush(struct ofconn *ofconn)
memset(ofconn->slave_async_config, 0,
sizeof ofconn->slave_async_config);
}
+
+ HMAP_FOR_EACH_SAFE (monitor, next_monitor, ofconn_node,
+ &ofconn->monitors) {
+ ofmonitor_destroy(monitor);
+ }
+ rconn_packet_counter_destroy(ofconn->monitor_counter);
+ ofconn->monitor_counter = rconn_packet_counter_create();
+ ofpbuf_list_delete(&ofconn->updates); /* ...but it should be empty. */
}
static void
@@ -1096,6 +1119,7 @@ ofconn_destroy(struct ofconn *ofconn)
rconn_packet_counter_destroy(ofconn->packet_in_counter);
rconn_packet_counter_destroy(ofconn->reply_counter);
pktbuf_destroy(ofconn->pktbuf);
+ rconn_packet_counter_destroy(ofconn->monitor_counter);
free(ofconn);
}
@@ -1646,3 +1670,239 @@ ofservice_lookup(struct connmgr *mgr, const char *target)
}
return NULL;
}
+
+/* Flow monitors (NXST_FLOW_MONITOR). */
+
+/* A counter incremented when something significant happens to an OpenFlow
+ * rule.
+ *
+ * - When a rule is added, its 'add_seqno' and 'modify_seqno' are set to
+ * the current value (which is then incremented).
+ *
+ * - When a rule is modified, its 'modify_seqno' is set to the current
+ * value (which is then incremented).
+ *
+ * Thus, by comparing an old value of monitor_seqno against a rule's
+ * 'add_seqno', one can tell whether the rule was added before or after the old
+ * value was read, and similarly for 'modify_seqno'.
+ *
+ * 32 bits should normally be sufficient (and would be nice, to save space in
+ * each rule) but then we'd have to have some special cases for wraparound.
+ *
+ * We initialize monitor_seqno to 1 to allow 0 to be used as an invalid
+ * value. */
+static uint64_t monitor_seqno = 1;
+
+COVERAGE_DEFINE(ofmonitor_pause);
+COVERAGE_DEFINE(ofmonitor_resume);
+
+enum ofperr
+ofmonitor_create(const struct ofputil_flow_monitor_request *request,
+ struct ofconn *ofconn, struct ofmonitor **monitorp)
+{
+ struct ofmonitor *m;
+
+ *monitorp = NULL;
+
+ m = ofmonitor_lookup(ofconn, request->id);
+ if (m) {
+ return OFPERR_NXBRC_FM_DUPLICATE_ID;
+ }
+
+ m = xmalloc(sizeof *m);
+ m->ofconn = ofconn;
+ hmap_insert(&ofconn->monitors, &m->ofconn_node, hash_int(request->id, 0));
+ m->id = request->id;
+ m->flags = request->flags;
+ m->out_port = request->out_port;
+ m->table_id = request->table_id;
+ m->match = request->match;
+
+ *monitorp = m;
+ return 0;
+}
+
+struct ofmonitor *
+ofmonitor_lookup(struct ofconn *ofconn, uint32_t id)
+{
+ struct ofmonitor *m;
+
+ HMAP_FOR_EACH_IN_BUCKET (m, ofconn_node, hash_int(id, 0),
+ &ofconn->monitors) {
+ if (m->id == id) {
+ return m;
+ }
+ }
+ return NULL;
+}
+
+void
+ofmonitor_destroy(struct ofmonitor *m)
+{
+ if (m) {
+ hmap_remove(&m->ofconn->monitors, &m->ofconn_node);
+ free(m);
+ }
+}
+
+void
+ofmonitor_report(struct connmgr *mgr, struct rule *rule,
+ enum nx_flow_update_event event,
+ enum ofp_flow_removed_reason reason,
+ const struct ofconn *abbrev_ofconn, ovs_be32 abbrev_xid)
+{
+ enum nx_flow_monitor_flags update;
+ struct ofconn *ofconn;
+
+ switch (event) {
+ case NXFME_ADDED:
+ update = NXFMF_ADD;
+ rule->add_seqno = rule->modify_seqno = monitor_seqno++;
+ break;
+
+ case NXFME_DELETED:
+ update = NXFMF_DELETE;
+ break;
+
+ case NXFME_MODIFIED:
+ update = NXFMF_MODIFY;
+ rule->modify_seqno = monitor_seqno++;
+ break;
+
+ default:
+ case NXFME_ABBREV:
+ NOT_REACHED();
+ }
+
+ LIST_FOR_EACH (ofconn, node, &mgr->all_conns) {
+ enum nx_flow_monitor_flags flags = 0;
+ struct ofmonitor *m;
+
+ if (ofconn->monitor_paused) {
+ /* Only send NXFME_DELETED notifications for flows that were added
+ * before we paused. */
+ if (event != NXFME_DELETED
+ || rule->add_seqno > ofconn->monitor_paused) {
+ continue;
+ }
+ }
+
+ HMAP_FOR_EACH (m, ofconn_node, &ofconn->monitors) {
+ if (m->flags & update
+ && (m->table_id == 0xff || m->table_id == rule->table_id)
+ && ofoperation_has_out_port(rule->pending, m->out_port)
+ && cls_rule_is_loose_match(&rule->cr, &m->match)) {
+ flags |= m->flags;
+ }
+ }
+
+ if (flags) {
+ if (list_is_empty(&ofconn->updates)) {
+ ofputil_start_flow_update(&ofconn->updates);
+ ofconn->sent_abbrev_update = false;
+ }
+
+ if (ofconn != abbrev_ofconn || ofconn->monitor_paused) {
+ struct ofputil_flow_update fu;
+
+ fu.event = event;
+ fu.reason = event == NXFME_DELETED ? reason : 0;
+ fu.idle_timeout = rule->idle_timeout;
+ fu.hard_timeout = rule->hard_timeout;
+ fu.table_id = rule->table_id;
+ fu.cookie = rule->flow_cookie;
+ fu.match = &rule->cr;
+ if (flags & NXFMF_ACTIONS) {
+ fu.ofpacts = rule->ofpacts;
+ fu.ofpacts_len = rule->ofpacts_len;
+ } else {
+ fu.ofpacts = NULL;
+ fu.ofpacts_len = 0;
+ }
+ ofputil_append_flow_update(&fu, &ofconn->updates);
+ } else if (!ofconn->sent_abbrev_update) {
+ struct ofputil_flow_update fu;
+
+ fu.event = NXFME_ABBREV;
+ fu.xid = abbrev_xid;
+ ofputil_append_flow_update(&fu, &ofconn->updates);
+
+ ofconn->sent_abbrev_update = true;
+ }
+ }
+ }
+}
+
+void
+ofmonitor_flush(struct connmgr *mgr)
+{
+ struct ofconn *ofconn;
+
+ LIST_FOR_EACH (ofconn, node, &mgr->all_conns) {
+ struct ofpbuf *msg, *next;
+
+ LIST_FOR_EACH_SAFE (msg, next, list_node, &ofconn->updates) {
+ list_remove(&msg->list_node);
+ ofconn_send(ofconn, msg, ofconn->monitor_counter);
+ if (!ofconn->monitor_paused
+ && ofconn->monitor_counter->n_bytes > 128 * 1024) {
+ struct ofpbuf *pause;
+
+ COVERAGE_INC(ofmonitor_pause);
+ ofconn->monitor_paused = monitor_seqno++;
+ make_nxmsg_xid(sizeof(struct nicira_header),
+ NXT_FLOW_MONITOR_PAUSED, htonl(0), &pause);
+ ofconn_send(ofconn, pause, ofconn->monitor_counter);
+ }
+ }
+ }
+}
+
+static void
+ofmonitor_resume(struct ofconn *ofconn)
+{
+ struct ofpbuf *resume;
+ struct ofmonitor *m;
+ struct list rules;
+ struct list msgs;
+
+ list_init(&rules);
+ HMAP_FOR_EACH (m, ofconn_node, &ofconn->monitors) {
+ ofmonitor_collect_resume_rules(m, ofconn->monitor_paused, &rules);
+ }
+
+ list_init(&msgs);
+ ofmonitor_compose_refresh_updates(&rules, &msgs);
+
+ make_nxmsg_xid(sizeof(struct nicira_header),
+ NXT_FLOW_MONITOR_RESUMED, htonl(0), &resume);
+ list_push_back(&msgs, &resume->list_node);
+ ofconn_send_replies(ofconn, &msgs);
+
+ ofconn->monitor_paused = 0;
+}
+
+static void
+ofmonitor_run(struct connmgr *mgr)
+{
+ struct ofconn *ofconn;
+
+ LIST_FOR_EACH (ofconn, node, &mgr->all_conns) {
+ if (ofconn->monitor_paused && !ofconn->monitor_counter->n_packets) {
+ COVERAGE_INC(ofmonitor_resume);
+ ofmonitor_resume(ofconn);
+ }
+ }
+}
+
+static void
+ofmonitor_wait(struct connmgr *mgr)
+{
+ struct ofconn *ofconn;
+
+ LIST_FOR_EACH (ofconn, node, &mgr->all_conns) {
+ if (ofconn->monitor_paused && !ofconn->monitor_counter->n_packets) {
+ poll_immediate_wake();
+ }
+ }
+}
diff --git a/ofproto/connmgr.h b/ofproto/connmgr.h
index dec5b71c..24a33fb0 100644
--- a/ofproto/connmgr.h
+++ b/ofproto/connmgr.h
@@ -17,6 +17,7 @@
#ifndef CONNMGR_H
#define CONNMGR_H 1
+#include "classifier.h"
#include "hmap.h"
#include "list.h"
#include "ofp-errors.h"
@@ -30,6 +31,7 @@ struct ofopgroup;
struct ofputil_flow_removed;
struct ofputil_packet_in;
struct ofputil_phy_port;
+struct rule;
struct simap;
struct sset;
@@ -159,4 +161,34 @@ bool connmgr_may_set_up_flow(struct connmgr *, const struct flow *,
/* Fail-open and in-band implementation. */
void connmgr_flushed(struct connmgr *);
+/* A flow monitor managed by NXST_FLOW_MONITOR and related requests. */
+struct ofmonitor {
+ struct ofconn *ofconn; /* Owning 'ofconn'. */
+ struct hmap_node ofconn_node; /* In ofconn's 'monitors' hmap. */
+ uint32_t id;
+
+ enum nx_flow_monitor_flags flags;
+
+ /* Matching. */
+ uint16_t out_port;
+ uint8_t table_id;
+ struct cls_rule match;
+};
+
+struct ofputil_flow_monitor_request;
+
+enum ofperr ofmonitor_create(const struct ofputil_flow_monitor_request *,
+ struct ofconn *, struct ofmonitor **);
+struct ofmonitor *ofmonitor_lookup(struct ofconn *, uint32_t id);
+void ofmonitor_destroy(struct ofmonitor *);
+
+void ofmonitor_report(struct connmgr *, struct rule *,
+ enum nx_flow_update_event, enum ofp_flow_removed_reason,
+ const struct ofconn *abbrev_ofconn, ovs_be32 abbrev_xid);
+void ofmonitor_flush(struct connmgr *);
+
+void ofmonitor_collect_resume_rules(struct ofmonitor *, uint64_t seqno,
+ struct list *rules);
+void ofmonitor_compose_refresh_updates(struct list *rules, struct list *msgs);
+
#endif /* connmgr.h */
diff --git a/ofproto/ofproto-provider.h b/ofproto/ofproto-provider.h
index f22c9f61..6eef1063 100644
--- a/ofproto/ofproto-provider.h
+++ b/ofproto/ofproto-provider.h
@@ -187,6 +187,11 @@ struct rule {
struct ofpact *ofpacts; /* Sequence of "struct ofpacts". */
unsigned int ofpacts_len; /* Size of 'ofpacts', in bytes. */
+
+ /* Flow monitors. */
+ enum nx_flow_monitor_flags monitor_flags;
+ uint64_t add_seqno; /* Sequence number when added. */
+ uint64_t modify_seqno; /* Sequence number when changed. */
};
static inline struct rule *
@@ -199,9 +204,15 @@ void ofproto_rule_update_used(struct rule *, long long int used);
void ofproto_rule_expire(struct rule *, uint8_t reason);
void ofproto_rule_destroy(struct rule *);
+bool ofproto_rule_has_out_port(const struct rule *, uint16_t out_port);
+
void ofoperation_complete(struct ofoperation *, enum ofperr);
struct rule *ofoperation_get_victim(struct ofoperation *);
+bool ofoperation_has_out_port(const struct ofoperation *, uint16_t out_port);
+
+bool ofproto_rule_is_hidden(const struct rule *);
+
/* ofproto class structure, to be defined by each ofproto implementation.
*
*
diff --git a/ofproto/ofproto.c b/ofproto/ofproto.c
index 93401919..b187c86f 100644
--- a/ofproto/ofproto.c
+++ b/ofproto/ofproto.c
@@ -126,13 +126,17 @@ struct ofoperation {
struct ofpact *ofpacts;
size_t ofpacts_len;
+ /* OFOPERATION_DELETE. */
+ enum ofp_flow_removed_reason reason; /* Reason flow was removed. */
+
ovs_be64 flow_cookie; /* Rule's old flow cookie. */
enum ofperr error; /* 0 if no error. */
};
static struct ofoperation *ofoperation_create(struct ofopgroup *,
struct rule *,
- enum ofoperation_type);
+ enum ofoperation_type,
+ enum ofp_flow_removed_reason);
static void ofoperation_destroy(struct ofoperation *);
/* oftable. */
@@ -188,7 +192,6 @@ static void reinit_ports(struct ofproto *);
static void ofproto_rule_destroy__(struct rule *);
static void ofproto_rule_send_removed(struct rule *, uint8_t reason);
static bool rule_is_modifiable(const struct rule *);
-static bool rule_is_hidden(const struct rule *);
/* OpenFlow. */
static enum ofperr add_flow(struct ofproto *, struct ofconn *,
@@ -952,7 +955,8 @@ ofproto_flush__(struct ofproto *ofproto)
cls_cursor_init(&cursor, &table->cls, NULL);
CLS_CURSOR_FOR_EACH_SAFE (rule, next_rule, cr, &cursor) {
if (!rule->pending) {
- ofoperation_create(group, rule, OFOPERATION_DELETE);
+ ofoperation_create(group, rule, OFOPERATION_DELETE,
+ OFPRR_DELETE);
oftable_remove_rule(rule);
ofproto->ofproto_class->rule_destruct(rule);
}
@@ -1445,7 +1449,7 @@ ofproto_delete_flow(struct ofproto *ofproto, const struct cls_rule *target)
} else {
/* Initiate deletion -> success. */
struct ofopgroup *group = ofopgroup_create_unattached(ofproto);
- ofoperation_create(group, rule, OFOPERATION_DELETE);
+ ofoperation_create(group, rule, OFOPERATION_DELETE, OFPRR_DELETE);
oftable_remove_rule(rule);
ofproto->ofproto_class->rule_destruct(rule);
ofopgroup_submit(group);
@@ -1894,13 +1898,36 @@ ofproto_rule_destroy(struct rule *rule)
/* Returns true if 'rule' has an OpenFlow OFPAT_OUTPUT or OFPAT_ENQUEUE action
* that outputs to 'port' (output to OFPP_FLOOD and OFPP_ALL doesn't count). */
-static bool
-rule_has_out_port(const struct rule *rule, uint16_t port)
+bool
+ofproto_rule_has_out_port(const struct rule *rule, uint16_t port)
{
return (port == OFPP_NONE
|| ofpacts_output_to_port(rule->ofpacts, rule->ofpacts_len, port));
}
+/* Returns true if a rule related to 'op' has an OpenFlow OFPAT_OUTPUT or
+ * OFPAT_ENQUEUE action that outputs to 'out_port'. */
+bool
+ofoperation_has_out_port(const struct ofoperation *op, uint16_t out_port)
+{
+ if (ofproto_rule_has_out_port(op->rule, out_port)) {
+ return true;
+ }
+
+ switch (op->type) {
+ case OFOPERATION_ADD:
+ return op->victim && ofproto_rule_has_out_port(op->victim, out_port);
+
+ case OFOPERATION_DELETE:
+ return false;
+
+ case OFOPERATION_MODIFY:
+ return ofpacts_output_to_port(op->ofpacts, op->ofpacts_len, out_port);
+ }
+
+ NOT_REACHED();
+}
+
/* Executes the actions indicated by 'rule' on 'packet' and credits 'rule''s
* statistics appropriately. 'packet' must have at least sizeof(struct
* ofp_packet_in) bytes of headroom.
@@ -1925,8 +1952,8 @@ rule_execute(struct rule *rule, uint16_t in_port, struct ofpbuf *packet)
* Rules with priority higher than UINT16_MAX are set up by ofproto itself
* (e.g. by in-band control) and are intentionally hidden from the
* controller. */
-static bool
-rule_is_hidden(const struct rule *rule)
+bool
+ofproto_rule_is_hidden(const struct rule *rule)
{
return rule->cr.priority > UINT16_MAX;
}
@@ -2393,7 +2420,8 @@ collect_rules_loose(struct ofproto *ofproto, uint8_t table_id,
if (rule->pending) {
return OFPROTO_POSTPONE;
}
- if (!rule_is_hidden(rule) && rule_has_out_port(rule, out_port)
+ if (!ofproto_rule_is_hidden(rule)
+ && ofproto_rule_has_out_port(rule, out_port)
&& !((rule->flow_cookie ^ cookie) & cookie_mask)) {
list_push_back(rules, &rule->ofproto_node);
}
@@ -2437,7 +2465,8 @@ collect_rules_strict(struct ofproto *ofproto, uint8_t table_id,
if (rule->pending) {
return OFPROTO_POSTPONE;
}
- if (!rule_is_hidden(rule) && rule_has_out_port(rule, out_port)
+ if (!ofproto_rule_is_hidden(rule)
+ && ofproto_rule_has_out_port(rule, out_port)
&& !((rule->flow_cookie ^ cookie) & cookie_mask)) {
list_push_back(rules, &rule->ofproto_node);
}
@@ -2855,6 +2884,9 @@ add_flow(struct ofproto *ofproto, struct ofconn *ofconn,
rule->ofpacts_len = fm->ofpacts_len;
rule->evictable = true;
rule->eviction_group = NULL;
+ rule->monitor_flags = 0;
+ rule->add_seqno = 0;
+ rule->modify_seqno = 0;
/* Insert new rule. */
victim = oftable_replace_rule(rule);
@@ -2886,7 +2918,7 @@ add_flow(struct ofproto *ofproto, struct ofconn *ofconn,
}
group = ofopgroup_create(ofproto, ofconn, request, fm->buffer_id);
- op = ofoperation_create(group, rule, OFOPERATION_ADD);
+ op = ofoperation_create(group, rule, OFOPERATION_ADD, 0);
op->victim = victim;
error = ofproto->ofproto_class->rule_construct(rule);
@@ -2950,7 +2982,7 @@ modify_flows__(struct ofproto *ofproto, struct ofconn *ofconn,
continue;
}
- op = ofoperation_create(group, rule, OFOPERATION_MODIFY);
+ op = ofoperation_create(group, rule, OFOPERATION_MODIFY, 0);
rule->flow_cookie = new_cookie;
if (actions_changed) {
op->ofpacts = rule->ofpacts;
@@ -3029,7 +3061,7 @@ delete_flow__(struct rule *rule, struct ofopgroup *group)
ofproto_rule_send_removed(rule, OFPRR_DELETE);
- ofoperation_create(group, rule, OFOPERATION_DELETE);
+ ofoperation_create(group, rule, OFOPERATION_DELETE, OFPRR_DELETE);
oftable_remove_rule(rule);
ofproto->ofproto_class->rule_destruct(rule);
}
@@ -3094,7 +3126,7 @@ ofproto_rule_send_removed(struct rule *rule, uint8_t reason)
{
struct ofputil_flow_removed fr;
- if (rule_is_hidden(rule) || !rule->send_flow_removed) {
+ if (ofproto_rule_is_hidden(rule) || !rule->send_flow_removed) {
return;
}
@@ -3144,7 +3176,7 @@ ofproto_rule_expire(struct rule *rule, uint8_t reason)
ofproto_rule_send_removed(rule, reason);
group = ofopgroup_create_unattached(ofproto);
- ofoperation_create(group, rule, OFOPERATION_DELETE);
+ ofoperation_create(group, rule, OFOPERATION_DELETE, reason);
oftable_remove_rule(rule);
ofproto->ofproto_class->rule_destruct(rule);
ofopgroup_submit(group);
@@ -3397,6 +3429,255 @@ handle_barrier_request(struct ofconn *ofconn, const struct ofp_header *oh)
return 0;
}
+static void
+ofproto_compose_flow_refresh_update(const struct rule *rule,
+ enum nx_flow_monitor_flags flags,
+ struct list *msgs)
+{
+ struct ofoperation *op = rule->pending;
+ struct ofputil_flow_update fu;
+
+ if (op && op->type == OFOPERATION_ADD && !op->victim) {
+ /* We'll report the final flow when the operation completes. Reporting
+ * it now would cause a duplicate report later. */
+ return;
+ }
+
+ fu.event = (flags & (NXFMF_INITIAL | NXFMF_ADD)
+ ? NXFME_ADDED : NXFME_MODIFIED);
+ fu.reason = 0;
+ fu.idle_timeout = rule->idle_timeout;
+ fu.hard_timeout = rule->hard_timeout;
+ fu.table_id = rule->table_id;
+ fu.cookie = rule->flow_cookie;
+ fu.match = (struct cls_rule *) &rule->cr;
+ if (!(flags & NXFMF_ACTIONS)) {
+ fu.ofpacts = NULL;
+ fu.ofpacts_len = 0;
+ } else if (!op) {
+ fu.ofpacts = rule->ofpacts;
+ fu.ofpacts_len = rule->ofpacts_len;
+ } else {
+ /* An operation is in progress. Use the previous version of the flow's
+ * actions, so that when the operation commits we report the change. */
+ switch (op->type) {
+ case OFOPERATION_ADD:
+ /* We already verified that there was a victim. */
+ fu.ofpacts = op->victim->ofpacts;
+ fu.ofpacts_len = op->victim->ofpacts_len;
+ break;
+
+ case OFOPERATION_MODIFY:
+ if (op->ofpacts) {
+ fu.ofpacts = op->ofpacts;
+ fu.ofpacts_len = op->ofpacts_len;
+ } else {
+ fu.ofpacts = rule->ofpacts;
+ fu.ofpacts_len = rule->ofpacts_len;
+ }
+ break;
+
+ case OFOPERATION_DELETE:
+ fu.ofpacts = rule->ofpacts;
+ fu.ofpacts_len = rule->ofpacts_len;
+ break;
+
+ default:
+ NOT_REACHED();
+ }
+ }
+
+ if (list_is_empty(msgs)) {
+ ofputil_start_flow_update(msgs);
+ }
+ ofputil_append_flow_update(&fu, msgs);
+}
+
+void
+ofmonitor_compose_refresh_updates(struct list *rules, struct list *msgs)
+{
+ struct rule *rule;
+
+ LIST_FOR_EACH (rule, ofproto_node, rules) {
+ enum nx_flow_monitor_flags flags = rule->monitor_flags;
+ rule->monitor_flags = 0;
+
+ ofproto_compose_flow_refresh_update(rule, flags, msgs);
+ }
+}
+
+static void
+ofproto_collect_ofmonitor_refresh_rule(const struct ofmonitor *m,
+ struct rule *rule, uint64_t seqno,
+ struct list *rules)
+{
+ enum nx_flow_monitor_flags update;
+
+ if (ofproto_rule_is_hidden(rule)) {
+ return;
+ }
+
+ if (!(rule->pending
+ ? ofoperation_has_out_port(rule->pending, m->out_port)
+ : ofproto_rule_has_out_port(rule, m->out_port))) {
+ return;
+ }
+
+ if (seqno) {
+ if (rule->add_seqno > seqno) {
+ update = NXFMF_ADD | NXFMF_MODIFY;
+ } else if (rule->modify_seqno > seqno) {
+ update = NXFMF_MODIFY;
+ } else {
+ return;
+ }
+
+ if (!(m->flags & update)) {
+ return;
+ }
+ } else {
+ update = NXFMF_INITIAL;
+ }
+
+ if (!rule->monitor_flags) {
+ list_push_back(rules, &rule->ofproto_node);
+ }
+ rule->monitor_flags |= update | (m->flags & NXFMF_ACTIONS);
+}
+
+static void
+ofproto_collect_ofmonitor_refresh_rules(const struct ofmonitor *m,
+ uint64_t seqno,
+ struct list *rules)
+{
+ const struct ofproto *ofproto = ofconn_get_ofproto(m->ofconn);
+ const struct ofoperation *op;
+ const struct oftable *table;
+
+ FOR_EACH_MATCHING_TABLE (table, m->table_id, ofproto) {
+ struct cls_cursor cursor;
+ struct rule *rule;
+
+ cls_cursor_init(&cursor, &table->cls, &m->match);
+ CLS_CURSOR_FOR_EACH (rule, cr, &cursor) {
+ assert(!rule->pending); /* XXX */
+ ofproto_collect_ofmonitor_refresh_rule(m, rule, seqno, rules);
+ }
+ }
+
+ HMAP_FOR_EACH (op, hmap_node, &ofproto->deletions) {
+ struct rule *rule = op->rule;
+
+ if (((m->table_id == 0xff
+ ? !(ofproto->tables[rule->table_id].flags & OFTABLE_HIDDEN)
+ : m->table_id == rule->table_id))
+ && cls_rule_is_loose_match(&rule->cr, &m->match)) {
+ ofproto_collect_ofmonitor_refresh_rule(m, rule, seqno, rules);
+ }
+ }
+}
+
+static void
+ofproto_collect_ofmonitor_initial_rules(struct ofmonitor *m,
+ struct list *rules)
+{
+ if (m->flags & NXFMF_INITIAL) {
+ ofproto_collect_ofmonitor_refresh_rules(m, 0, rules);
+ }
+}
+
+void
+ofmonitor_collect_resume_rules(struct ofmonitor *m,
+ uint64_t seqno, struct list *rules)
+{
+ ofproto_collect_ofmonitor_refresh_rules(m, seqno, rules);
+}
+
+static enum ofperr
+handle_flow_monitor_request(struct ofconn *ofconn,
+ const struct ofp_stats_msg *osm)
+{
+ struct ofproto *ofproto = ofconn_get_ofproto(ofconn);
+ struct ofmonitor **monitors;
+ size_t n_monitors, allocated_monitors;
+ struct list replies;
+ enum ofperr error;
+ struct list rules;
+ struct ofpbuf b;
+ size_t i;
+
+ error = 0;
+ ofpbuf_use_const(&b, osm, ntohs(osm->header.length));
+ monitors = NULL;
+ n_monitors = allocated_monitors = 0;
+ for (;;) {
+ struct ofputil_flow_monitor_request request;
+ struct ofmonitor *m;
+ int retval;
+
+ retval = ofputil_decode_flow_monitor_request(&request, &b);
+ if (retval == EOF) {
+ break;
+ } else if (retval) {
+ error = retval;
+ goto error;
+ }
+
+ if (request.table_id != 0xff
+ && request.table_id >= ofproto->n_tables) {
+ error = OFPERR_OFPBRC_BAD_TABLE_ID;
+ goto error;
+ }
+
+ error = ofmonitor_create(&request, ofconn, &m);
+ if (error) {
+ goto error;
+ }
+
+ if (n_monitors >= allocated_monitors) {
+ monitors = x2nrealloc(monitors, &allocated_monitors,
+ sizeof *monitors);
+ }
+ monitors[n_monitors++] = m;
+ }
+
+ list_init(&rules);
+ for (i = 0; i < n_monitors; i++) {
+ ofproto_collect_ofmonitor_initial_rules(monitors[i], &rules);
+ }
+
+ ofputil_start_stats_reply(osm, &replies);
+ ofmonitor_compose_refresh_updates(&rules, &replies);
+ ofconn_send_replies(ofconn, &replies);
+
+ free(monitors);
+
+ return 0;
+
+error:
+ for (i = 0; i < n_monitors; i++) {
+ ofmonitor_destroy(monitors[i]);
+ }
+ free(monitors);
+ return error;
+}
+
+static enum ofperr
+handle_flow_monitor_cancel(struct ofconn *ofconn, const struct ofp_header *oh)
+{
+ struct ofmonitor *m;
+ uint32_t id;
+
+ id = ofputil_decode_flow_monitor_cancel(oh);
+ m = ofmonitor_lookup(ofconn, id);
+ if (!m) {
+ return OFPERR_NXBRC_FM_BAD_ID;
+ }
+
+ ofmonitor_destroy(m);
+ return 0;
+}
+
static enum ofperr
handle_openflow__(struct ofconn *ofconn, const struct ofpbuf *msg)
{
@@ -3462,6 +3743,9 @@ handle_openflow__(struct ofconn *ofconn, const struct ofpbuf *msg)
/* Nothing to do. */
return 0;
+ case OFPUTIL_NXT_FLOW_MONITOR_CANCEL:
+ return handle_flow_monitor_cancel(ofconn, oh);
+
case OFPUTIL_NXT_SET_ASYNC_CONFIG:
return handle_nxt_set_async_config(ofconn, oh);
@@ -3489,6 +3773,9 @@ handle_openflow__(struct ofconn *ofconn, const struct ofpbuf *msg)
case OFPUTIL_OFPST_PORT_DESC_REQUEST:
return handle_port_desc_stats_request(ofconn, msg->data);
+ case OFPUTIL_NXST_FLOW_MONITOR_REQUEST:
+ return handle_flow_monitor_request(ofconn, msg->data);
+
case OFPUTIL_MSG_INVALID:
case OFPUTIL_OFPT_HELLO:
case OFPUTIL_OFPT_ERROR:
@@ -3510,8 +3797,11 @@ handle_openflow__(struct ofconn *ofconn, const struct ofpbuf *msg)
case OFPUTIL_NXT_ROLE_REPLY:
case OFPUTIL_NXT_FLOW_REMOVED:
case OFPUTIL_NXT_PACKET_IN:
+ case OFPUTIL_NXT_FLOW_MONITOR_PAUSED:
+ case OFPUTIL_NXT_FLOW_MONITOR_RESUMED:
case OFPUTIL_NXST_FLOW_REPLY:
case OFPUTIL_NXST_AGGREGATE_REPLY:
+ case OFPUTIL_NXST_FLOW_MONITOR_REPLY:
default:
return (oh->type == OFPT10_STATS_REQUEST ||
oh->type == OFPT10_STATS_REPLY
@@ -3600,6 +3890,10 @@ static void
ofopgroup_complete(struct ofopgroup *group)
{
struct ofproto *ofproto = group->ofproto;
+
+ struct ofconn *abbrev_ofconn;
+ ovs_be32 abbrev_xid;
+
struct ofoperation *op, *next_op;
int error;
@@ -3630,8 +3924,28 @@ ofopgroup_complete(struct ofopgroup *group)
}
}
+ if (!error && !list_is_empty(&group->ofconn_node)) {
+ abbrev_ofconn = group->ofconn;
+ abbrev_xid = group->request->xid;
+ } else {
+ abbrev_ofconn = NULL;
+ abbrev_xid = htonl(0);
+ }
LIST_FOR_EACH_SAFE (op, next_op, group_node, &group->ops) {
struct rule *rule = op->rule;
+
+ if (!op->error && !ofproto_rule_is_hidden(rule)) {
+ /* Check that we can just cast from ofoperation_type to
+ * nx_flow_update_event. */
+ BUILD_ASSERT_DECL(OFOPERATION_ADD == NXFME_ADDED);
+ BUILD_ASSERT_DECL(OFOPERATION_DELETE == NXFME_DELETED);
+ BUILD_ASSERT_DECL(OFOPERATION_MODIFY == NXFME_MODIFIED);
+
+ ofmonitor_report(ofproto->connmgr, rule,
+ (enum nx_flow_update_event) op->type,
+ op->reason, abbrev_ofconn, abbrev_xid);
+ }
+
rule->pending = NULL;
switch (op->type) {
@@ -3685,6 +3999,8 @@ ofopgroup_complete(struct ofopgroup *group)
ofoperation_destroy(op);
}
+ ofmonitor_flush(ofproto->connmgr);
+
if (!list_is_empty(&group->ofproto_node)) {
assert(ofproto->n_pending > 0);
ofproto->n_pending--;
@@ -3704,11 +4020,15 @@ ofopgroup_complete(struct ofopgroup *group)
/* Initiates a new operation on 'rule', of the specified 'type', within
* 'group'. Prior to calling, 'rule' must not have any pending operation.
*
+ * For a 'type' of OFOPERATION_DELETE, 'reason' should specify the reason that
+ * the flow is being deleted. For other 'type's, 'reason' is ignored (use 0).
+ *
* Returns the newly created ofoperation (which is also available as
* rule->pending). */
static struct ofoperation *
ofoperation_create(struct ofopgroup *group, struct rule *rule,
- enum ofoperation_type type)
+ enum ofoperation_type type,
+ enum ofp_flow_removed_reason reason)
{
struct ofproto *ofproto = group->ofproto;
struct ofoperation *op;
@@ -3720,6 +4040,7 @@ ofoperation_create(struct ofopgroup *group, struct rule *rule,
list_push_back(&group->ops, &op->group_node);
op->rule = rule;
op->type = type;
+ op->reason = reason;
op->flow_cookie = rule->flow_cookie;
group->n_running++;
@@ -3891,7 +4212,8 @@ ofproto_evict(struct ofproto *ofproto)
break;
}
- ofoperation_create(group, rule, OFOPERATION_DELETE);
+ ofoperation_create(group, rule,
+ OFOPERATION_DELETE, OFPRR_EVICTION);
oftable_remove_rule(rule);
ofproto->ofproto_class->rule_destruct(rule);
}