You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
State modification messages from the controller may be executed in an arbitrary order by the switch. A barrier request can be used by the controller to set a synchronization point, ensuring that all previous state messages are completed before the barrier response is sent back to the controller.
I'm running some tests that creates many EVCs (mef_eline) in batch (see more details on how to replicate the tests in kytos/mef_eline#223). For each EVC, mef_eline sends a FlowMod to remove "old" flows based on the cookie and then send the FlowMods to actually add the specific forwarding OpenFlow rules. When running in batch with many events being submitted at the same time, it might happened that the switch process the events in a different order than the one sent. In fact, according to the specification the order is not guaranteed. As so, the use of BarrierRequest/BarrierReply is recommended.
In the scenario I'm testing, I've created 1200 EVCs one after other, and at the end some of them informs that they are Active (enable=True, active=True), but if you check the flows on the switch, some of the flows are missing.
I've enabled debug mode on OVS (ovs-appctl vlog/set vconn:dbg and ovs-appctl vlog/disable-rate-limit vconn), but no clue why the flows get removed (or not installed).
Anyway, it is a good practice to enable barrier request/barrier reply on the flow_mod messages.
Here is a example on how we can do it 1) waiting for all pending barrier_requests before sending a new flow_mod, and 2) waiting on a specific barrier_reply right after send the flow mod (my preferred way):
waiting for all pending barrier_requests before sending a new flow_mod:
--- a/main.py
+++ b/main.py
@@ -1,5 +1,6 @@
"""kytos/flow_manager NApp installs, lists and deletes switch flows."""
from collections import OrderedDict
+import time
from flask import jsonify, request
@@ -10,6 +11,9 @@ from napps.kytos.of_core.flow import FlowFactory
from .exceptions import InvalidCommandError
from .settings import FLOWS_DICT_MAX_SIZE
+from pyof.v0x01.controller2switch.barrier_request import BarrierRequest as BReq10
+from pyof.v0x04.controller2switch.barrier_request import BarrierRequest as BReq13
+
class Main(KytosNApp):
"""Main class to be used by Kytos controller."""
@@ -23,6 +27,7 @@ class Main(KytosNApp):
log.debug("flow-manager starting")
self._flow_mods_sent = OrderedDict()
self._flow_mods_sent_max_size = FLOWS_DICT_MAX_SIZE
+ self.pending_barrier_reply = dict()
def execute(self):
"""Run once on NApp 'start' or in a loop.
@@ -132,6 +137,8 @@ class Main(KytosNApp):
self._flow_mods_sent[xid] = (flow, command)
def _send_flow_mod(self, switch, flow_mod):
+ self._wait_for_pending_barrier_reply(switch)
+
event_name = 'kytos/flow_manager.messages.out.ofpt_flow_mod'
content = {'destination': switch.connection,
@@ -139,6 +146,35 @@ class Main(KytosNApp):
event = KytosEvent(name=event_name, content=content)
self.controller.buffers.msg_out.put(event)
+ self._send_barrier_request(switch)
+
+ def _send_barrier_request(self, switch):
+ event_name = 'kytos/flow_manager.messages.out.ofpt_barrier_request'
+
+ of_version = switch.connection.protocol.version
+ barrier_request = None
+ if of_version == 0x01:
+ barrier_request = BReq10()
+ elif of_version == 0x04:
+ barrier_request = BReq13()
+ else:
+ return
+ if switch.id not in self.pending_barrier_reply:
+ self.pending_barrier_reply[switch.id] = set()
+ self.pending_barrier_reply[switch.id].add(barrier_request.header.xid)
+ content = {'destination': switch.connection,
+ 'message': barrier_request}
+
+ event = KytosEvent(name=event_name, content=content)
+ self.controller.buffers.msg_out.put(event)
+
+ def _wait_for_pending_barrier_reply(self, switch):
+ timeout = time.time() + 10
+ while len(self.pending_barrier_reply.get(switch.id, [])) > 0 and time.time() <= timeout:
+ time.sleep(0.1)
+ if time.time() > timeout:
+ log.warning("Timeout waiting for barrier_reply from switch=%s pending_barrier_reply=%s" % (switch.id, self.pending_barrier_reply.get(switch.id, [])))
+
def _send_napp_event(self, switch, flow, command, **kwargs):
"""Send an Event to other apps informing about a FlowMod."""
@@ -156,6 +192,18 @@ class Main(KytosNApp):
event_app = KytosEvent(name, content)
self.controller.buffers.app.put(event_app)
+ @listen_to('.*of_core.*.ofpt_barrier_reply')
+ def handle_barrier_reply(self, event):
+ """Receive OpenFlow Barrier Reply.
+ """
+ switch = event.source.switch.id
+ xid = event.content["message"].header.xid.value
+ log.debug("barrier reply: switch=%s xid=%s" % (switch, xid))
+ try:
+ self.pending_barrier_reply[switch].remove(xid)
+ except Exception as e:
+ log.warning("failed to remove received barrier_reply (xid=%s) from pending_barrier_reply: %s" % (xid, e))
+
@listen_to('.*.of_core.*.ofpt_error')
def handle_errors(self, event):
"""Receive OpenFlow error and send a event.
waiting on a specific barrier_reply right after send the flow mod
diff --git a/main.py b/main.py
index 676a956..762e001 100644
--- a/main.py
+++ b/main.py
@@ -1,5 +1,6 @@
"""kytos/flow_manager NApp installs, lists and deletes switch flows."""
from collections import OrderedDict
+import time
from flask import jsonify, request
@@ -10,6 +11,9 @@ from napps.kytos.of_core.flow import FlowFactory
from .exceptions import InvalidCommandError
from .settings import FLOWS_DICT_MAX_SIZE
+from pyof.v0x01.controller2switch.barrier_request import BarrierRequest as BReq10
+from pyof.v0x04.controller2switch.barrier_request import BarrierRequest as BReq13
+
class Main(KytosNApp):
"""Main class to be used by Kytos controller."""
@@ -23,6 +27,7 @@ class Main(KytosNApp):
log.debug("flow-manager starting")
self._flow_mods_sent = OrderedDict()
self._flow_mods_sent_max_size = FLOWS_DICT_MAX_SIZE
+ self.pending_barrier_reply = dict()
def execute(self):
"""Run once on NApp 'start' or in a loop.
@@ -139,6 +144,36 @@ class Main(KytosNApp):
event = KytosEvent(name=event_name, content=content)
self.controller.buffers.msg_out.put(event)
+ self._send_barrier_request(switch)
+
+ def _send_barrier_request(self, switch):
+ event_name = 'kytos/flow_manager.messages.out.ofpt_barrier_request'
+
+ of_version = switch.connection.protocol.version
+ barrier_request = None
+ if of_version == 0x01:
+ barrier_request = BReq10()
+ elif of_version == 0x04:
+ barrier_request = BReq13()
+ else:
+ return
+ if switch.id not in self.pending_barrier_reply:
+ self.pending_barrier_reply[switch.id] = set()
+ self.pending_barrier_reply[switch.id].add(barrier_request.header.xid)
+ content = {'destination': switch.connection,
+ 'message': barrier_request}
+
+ event = KytosEvent(name=event_name, content=content)
+ self.controller.buffers.msg_out.put(event)
+ self._wait_for_pending_barrier_reply(switch, barrier_request.header.xid)
+
+ def _wait_for_pending_barrier_reply(self, switch, xid):
+ timeout = time.time() + 10
+ while xid in self.pending_barrier_reply.get(switch.id, []) and time.time() <= timeout:
+ time.sleep(0.05)
+ if time.time() > timeout:
+ log.warning("Timeout waiting for barrier_reply from switch=%s xid=%s" % (switch.id, xid))
+
def _send_napp_event(self, switch, flow, command, **kwargs):
"""Send an Event to other apps informing about a FlowMod."""
@@ -156,6 +191,18 @@ class Main(KytosNApp):
event_app = KytosEvent(name, content)
self.controller.buffers.app.put(event_app)
+ @listen_to('.*of_core.*.ofpt_barrier_reply')
+ def handle_barrier_reply(self, event):
+ """Receive OpenFlow Barrier Reply.
+ """
+ switch = event.source.switch.id
+ xid = event.content["message"].header.xid.value
+ log.debug("barrier reply: switch=%s xid=%s" % (switch, xid))
+ try:
+ self.pending_barrier_reply[switch].remove(xid)
+ except Exception as e:
+ log.warning("failed to remove received barrier_reply (xid=%s) from pending_barrier_reply: %s" % (xid, e))
+
@listen_to('.*.of_core.*.ofpt_error')
def handle_errors(self, event):
"""Receive OpenFlow error and send a event.
The text was updated successfully, but these errors were encountered:
Dear guys,
According to the OpenFlow specification:
State modification messages from the controller may be executed in an arbitrary order by the switch. A barrier request can be used by the controller to set a synchronization point, ensuring that all previous state messages are completed before the barrier response is sent back to the controller.
Source: http://flowgrammable.org/sdn/openflow/message-layer/barrier/
I'm running some tests that creates many EVCs (mef_eline) in batch (see more details on how to replicate the tests in kytos/mef_eline#223). For each EVC, mef_eline sends a FlowMod to remove "old" flows based on the cookie and then send the FlowMods to actually add the specific forwarding OpenFlow rules. When running in batch with many events being submitted at the same time, it might happened that the switch process the events in a different order than the one sent. In fact, according to the specification the order is not guaranteed. As so, the use of BarrierRequest/BarrierReply is recommended.
In the scenario I'm testing, I've created 1200 EVCs one after other, and at the end some of them informs that they are Active (enable=True, active=True), but if you check the flows on the switch, some of the flows are missing.
I've enabled debug mode on OVS (ovs-appctl vlog/set vconn:dbg and ovs-appctl vlog/disable-rate-limit vconn), but no clue why the flows get removed (or not installed).
Anyway, it is a good practice to enable barrier request/barrier reply on the flow_mod messages.
Here is a example on how we can do it 1) waiting for all pending barrier_requests before sending a new flow_mod, and 2) waiting on a specific barrier_reply right after send the flow mod (my preferred way):
The text was updated successfully, but these errors were encountered: