一:实现流量监控
(一)流量监控原理
其中控制器向交换机周期下发获取统计消息,请求交换机消息------是主动下发过程
流速公式:是(t1时刻的流量-t0时刻的流量)/(t1-t0)
剩余带宽公式:链路总带宽-流速--------是这一个这一个,例如s2-s3(不是一条,例如:h1->s1->s2->s3->h2)的剩余带宽
路径有效带宽是只:这一整条路径中,按照最小的剩余带宽处理
二:代码实现
(一)代码框架
from ryu.app import simple_switch_13 from ryu.controller.handler import set_ev_cls from ryu.controller import ofp_event from ryu.controller.handler import MAIN_DISPATCHER,DEAD_DISPATCHER class MyMonitor(simple_switch_13): #simple_switch_13 is same as the last experiment which named self_learn_switch \'\'\' design a class to achvie managing the quantity of flow \'\'\' def __init__(self,*args,**kwargs): super(MyMonitor,self).__init__(*args,**kwargs) @set_ev_cls(ofp_event.EventOFPStateChange,[MAIN_DISPATCHER,DEAD_DISPATCHER]) def _state_change_handler(self,ev): \'\'\' design a handler to get switch state transition condition \'\'\' pass def _monitor(self): \'\'\' design a monitor on timing system to request switch infomations about port and flow \'\'\' pass def _request_stats(self,datapath): \'\'\' the function is to send requery to datapath \'\'\' pass @set_ev_cls(ofp_event.EventOFPPortStatsReply,MAIN_DISPATCHER) def _port_stats_reply_handler(self,ev): \'\'\' monitor to require the port state, then this function is to get infomation for port`s info \'\'\' pass @set_ev_cls(ofp_event.EventOFPFlowStatsReply,MAIN_DISPATCHER) def _port_stats_reply_handler(self,ev): \'\'\' monitor to require the flow state, then this function is to get infomation for flow`s info \'\'\' pass
(二)推文:协程https://www.cnblogs.com/ssyfj/p/9030165.html
(三)全部代码实现
from operator import attrgetter from ryu.app import simple_switch_13 from ryu.controller.handler import set_ev_cls from ryu.controller import ofp_event from ryu.controller.handler import MAIN_DISPATCHER,DEAD_DISPATCHER from ryu.lib import hub class MyMonitor(simple_switch_13.SimpleSwitch13): #simple_switch_13 is same as the last experiment which named self_learn_switch \'\'\' design a class to achvie managing the quantity of flow \'\'\' def __init__(self,*args,**kwargs): super(MyMonitor,self).__init__(*args,**kwargs) self.datapaths = {} #use gevent to start monitor self.monitor_thread = hub.spawn(self._monitor) @set_ev_cls(ofp_event.EventOFPStateChange,[MAIN_DISPATCHER,DEAD_DISPATCHER]) def _state_change_handler(self,ev): \'\'\' design a handler to get switch state transition condition \'\'\' #first get ofprocotol info datapath = ev.datapath ofproto = datapath.ofproto ofp_parser = datapath.ofproto_parser #judge datapath`s status to decide how to operate if datapath.state == MAIN_DISPATCHER: #should save info to dictation if datapath.id not in self.datapaths: self.datapaths[datapath.id] = datapath self.logger.debug("Regist datapath: %16x",datapath.id) elif datapath.state == DEAD_DISPATCHER: #should remove info from dictation if datapath.id in self.datapaths: del self.datapaths[datapath.id] self.logger.debug("Unregist datapath: %16x",datapath.id) def _monitor(self): \'\'\' design a monitor on timing system to request switch infomations about port and flow \'\'\' while True: #initiatie to request port and flow info all the time for dp in self.datapaths.values(): self._request_stats(dp) hub.sleep(5) #pause to sleep to wait reply, and gave time to other gevent to request def _request_stats(self,datapath): \'\'\' the function is to send requery to datapath \'\'\' self.logger.debug("send stats reques to datapath: %16x for port and flow info",datapath.id) ofproto = datapath.ofproto parser = datapath.ofproto_parser req = parser.OFPFlowStatsRequest(datapath) datapath.send_msg(req) req = parser.OFPPortStatsRequest(datapath, 0, ofproto.OFPP_ANY) datapath.send_msg(req) @set_ev_cls(ofp_event.EventOFPPortStatsReply,MAIN_DISPATCHER) def _port_stats_reply_handler(self,ev): \'\'\' monitor to require the port state, then this function is to get infomation for port`s info print("6666666666port info:") print(ev.msg) print(dir(ev.msg)) \'\'\' body = ev.msg.body self.logger.info(\'datapath port \' \'rx_packets tx_packets\' \'rx_bytes tx_bytes\' \'rx_errors tx_errors\' ) self.logger.info(\'--------------- --------\' \'-------- --------\' \'-------- --------\' \'-------- --------\' ) for port_stat in sorted(body,key=attrgetter(\'port_no\')): self.logger.info(\'%016x %8x %8d %8d %8d %8d %8d %8d\', ev.msg.datapath.id,port_stat.port_no,port_stat.rx_packets,port_stat.tx_packets, port_stat.rx_bytes,port_stat.tx_bytes,port_stat.rx_errors,port_stat.tx_errors ) @set_ev_cls(ofp_event.EventOFPFlowStatsReply,MAIN_DISPATCHER) def _flow_stats_reply_handler(self,ev): \'\'\' monitor to require the flow state, then this function is to get infomation for flow`s info print("777777777flow info:") print(ev.msg) print(dir(ev.msg)) \'\'\' body = ev.msg.body self.logger.info(\'datapath \' \'in_port eth_src\' \'out_port eth_dst\' \'packet_count byte_count\' ) self.logger.info(\'--------------- \' \'---- -----------------\' \'---- -----------------\' \'--------- ---------\' ) for flow_stat in sorted([flow for flow in body if flow.priority==1], key=lambda flow:(flow.match[\'in_port\'],flow.match[\'eth_src\'])): self.logger.info(\'%016x %8x %17s %8x %17s %8d %8d\', ev.msg.datapath.id,flow_stat.match[\'in_port\'],flow_stat.match[\'eth_src\'], flow_stat.instructions[0].actions[0].port,flow_stat.match[\'eth_dst\'], flow_stat.packet_count,flow_stat.byte_count )
补充:注意---每个事件的属性可能不同,需要我们进行Debug,例如上面就出现了ev.msg.body(之前hub实现中没有)
(四)代码讲解
1.class MyMonitor(simple_switch_13.SimpleSwitch13):
simple_switch_13.SimpleSwitch13是样例代码,其中实现了和我们上一次实验中,自学习交换机类似的功能
(稍微多了个关于交换机是否上传全部packet还是只上传buffer_id),所以我们直接继承,可以减少写代码时间
2.协程实现伪并发self.monitor_thread = hub.spawn(self._monitor)
def __init__(self,*args,**kwargs): super(MyMonitor,self).__init__(*args,**kwargs) self.datapaths = {} #use gevent to start monitor self.monitor_thread = hub.spawn(self._monitor)
3.在协程中实现周期请求交换机信息
def _monitor(self): \'\'\' design a monitor on timing system to request switch infomations about port and flow \'\'\' while True: #initiatie to request port and flow info all the time for dp in self.datapaths.values(): self._request_stats(dp) hub.sleep(5) #pause to sleep to wait reply, and gave time to other gevent to request
4.主动下发消息,请求交换机信息OFPFlowStatsRequest------注意:我们这里请求两个(端口和协议信息),所以我们要使用两个函数来分别处理port和flow响应
def _request_stats(self,datapath): \'\'\' the function is to send requery to datapath \'\'\' self.logger.debug("send stats reques to datapath: %16x for port and flow info",datapath.id) ofproto = datapath.ofproto parser = datapath.ofproto_parser req = parser.OFPFlowStatsRequest(datapath) datapath.send_msg(req) req = parser.OFPPortStatsRequest(datapath, 0, ofproto.OFPP_ANY) #可以向上面一样省略默认参数 datapath.send_msg(req)
源码查看参数
@_set_stats_type(ofproto.OFPMP_FLOW, OFPFlowStats) @_set_msg_type(ofproto.OFPT_MULTIPART_REQUEST) class OFPFlowStatsRequest(OFPFlowStatsRequestBase): """ Individual flow statistics request message The controller uses this message to query individual flow statistics. ================ ====================================================== Attribute Description ================ ====================================================== flags Zero or ``OFPMPF_REQ_MORE`` table_id ID of table to read out_port Require matching entries to include this as an output port out_group Require matching entries to include this as an output group cookie Require matching entries to contain this cookie value cookie_mask Mask used to restrict the cookie bits that must match match Instance of ``OFPMatch`` ================ ====================================================== Example:: def send_flow_stats_request(self, datapath): ofp = datapath.ofproto ofp_parser = datapath.ofproto_parser cookie = cookie_mask = 0 match = ofp_parser.OFPMatch(in_port=1) req = ofp_parser.OFPFlowStatsRequest(datapath, 0, ofp.OFPTT_ALL, ofp.OFPP_ANY, ofp.OFPG_ANY, cookie, cookie_mask, match) datapath.send_msg(req) """ def __init__(self, datapath, flags=0, table_id=ofproto.OFPTT_ALL, out_port=ofproto.OFPP_ANY, out_group=ofproto.OFPG_ANY, cookie=0, cookie_mask=0, match=None, type_=None):
5.获取端口响应信息ofp_event.EventOFPPortStatsReply
@set_ev_cls(ofp_event.EventOFPPortStatsReply,MAIN_DISPATCHER) def _port_stats_reply_handler(self,ev): \'\'\' monitor to require the port state, then this function is to get infomation for port`s info print("6666666666port info:") print(ev.msg) print(dir(ev.msg)) \'\'\' body = ev.msg.body self.logger.info(\'datapath port \' \'rx_packets tx_packets\' \'rx_bytes tx_bytes\' \'rx_errors tx_errors\' ) self.logger.info(\'--------------- --------\' \'-------- --------\' \'-------- --------\' \'-------- --------\' ) for port_stat in sorted(body,key=attrgetter(\'port_no\')): self.logger.info(\'%016x %8x %8d %8d %8d %8d %8d %8d\', ev.msg.datapath.id,port_stat.port_no,port_stat.rx_packets,port_stat.tx_packets, port_stat.rx_bytes,port_stat.tx_bytes,port_stat.rx_errors,port_stat.tx_errors )
端口信息:《参考》
6666666666port info: version=0x4,msg_type=0x13,msg_len=0x1d0,xid=0x8dcd9187, OFPPortStatsReply( body=[ OFPPortStats(port_no=4294967294,rx_packets=0,tx_packets=0,rx_bytes=0,tx_bytes=0,rx_dropped=65,tx_dropped=0,rx_errors=0,tx_errors=0,rx_frame_err=0,rx_over_err=0,rx_crc_err=0,collisions=0,duration_sec=1912,duration_nsec=331000000), OFPPortStats(port_no=1,rx_packets=154,tx_packets=225,rx_bytes=11660,tx_bytes=19503,rx_dropped=0,tx_dropped=0,rx_errors=0,tx_errors=0,rx_frame_err=0,rx_over_err=0,rx_crc_err=0,collisions=0,duration_sec=1912,duration_nsec=333000000), OFPPortStats(port_no=2,rx_packets=186,tx_packets=257,rx_bytes=14516,tx_bytes=22343,rx_dropped=0,tx_dropped=0,rx_errors=0,tx_errors=0,rx_frame_err=0,rx_over_err=0,rx_crc_err=0,collisions=0,duration_sec=1912,duration_nsec=334000000), OFPPortStats(port_no=3,rx_packets=220,tx_packets=232,rx_bytes=18439,tx_bytes=19311,rx_dropped=0,tx_dropped=0,rx_errors=0,tx_errors=0,rx_frame_err=0,rx_over_err=0,rx_crc_err=0,collisions=0,duration_sec=1912,duration_nsec=333000000) ] ,flags=0,type=4) OFPPortStats( port_no=4294967294, ---------- rx_packets=0, ---------- tx_packets=0, ---------- rx_bytes=0, ---------- tx_bytes=0, ---------- rx_dropped=65, tx_dropped=0, rx_errors=0, ---------- tx_errors=0, ---------- rx_frame_err=0, rx_over_err=0, rx_crc_err=0, collisions=0, duration_sec=1912, duration_nsec=331000000) [\'_STATS_MSG_TYPES\', \'_TYPE\', \'__class__\', \'__delattr__\', \'__dict__\', \'__dir__\', \'__doc__\', \'__eq__\', \'__format__\', \'__ge__\', \'__getattribute__\', \'__gt__\', \'__hash__\', \'__init__\', \'__init_subclass__\', \'__le__\', \'__lt__\', \'__module__\', \'__ne__\', \'__new__\', \'__reduce__\', \'__reduce_ex__\', \'__repr__\', \'__setattr__\', \'__sizeof__\', \'__str__\', \'__subclasshook__\', \'__weakref__\', \'_base_attributes\', \'_class_prefixes\', \'_class_suffixes\', \'_decode_value\', \'_encode_value\', \'_get_decoder\', \'_get_default_decoder\', \'_get_default_encoder\', \'_get_encoder\', \'_get_type\', \'_is_class\', \'_opt_attributes\', \'_restore_args\', \'_serialize_body\', \'_serialize_header\', \'_serialize_pre\', \'body\', \'buf\', \'cls_body_single_struct\', \'cls_from_jsondict_key\', \'cls_msg_type\', \'cls_stats_body_cls\', \'cls_stats_type\', \'datapath\' ---------- , \'flags\', \'from_jsondict\', \'msg_len\', \'msg_type\', \'obj_from_jsondict\', \'parser\', \'parser_stats\', \'parser_stats_body\', \'register_stats_type\', \'serialize\', \'set_buf\', \'set_classes\', \'set_headers\', \'set_xid\', \'stringify_attrs\', \'to_jsondict\', \'type\', \'version\', \'xid\']
6.获取flow协议响应信息ofp_event.EventOFPFlowStatsReply
@set_ev_cls(ofp_event.EventOFPFlowStatsReply,MAIN_DISPATCHER) def _flow_stats_reply_handler(self,ev): \'\'\' monitor to require the flow state, then this function is to get infomation for flow`s info print("777777777flow info:") print(ev.msg) print(dir(ev.msg)) \'\'\' body = ev.msg.body self.logger.info(\'datapath \' \'in_port eth_src\' \'out_port eth_dst\' \'packet_count byte_count\' ) self.logger.info(\'--------------- \' \'---- -----------------\' \'---- -----------------\' \'--------- ---------\' ) for flow_stat in sorted([flow for flow in body if flow.priority==1], key=lambda flow:(flow.match[\'in_port\'],flow.match[\'eth_src\'])): self.logger.info(\'%016x %8x %17s %8x %17s %8d %8d\', ev.msg.datapath.id,flow_stat.match[\'in_port\'],flow_stat.match[\'eth_src\'], flow_stat.instructions[0].actions[0].port,flow_stat.match[\'eth_dst\'], flow_stat.packet_count,flow_stat.byte_count )
协议信息《参考》
777777777flow info: version=0x4,msg_type=0x13,msg_len=0x200,xid=0x9e448a1a, OFPFlowStatsReply( body=[ OFPFlowStats(byte_count=5446,cookie=0,duration_nsec=552000000,duration_sec=1893,flags=0,hard_timeout=0,idle_timeout=0,instructions=[OFPInstructionActions(actions=[OFPActionOutput(len=16,max_len=65509,port=1,type=0)],len=24,type=4)], length=104,match=OFPMatch(oxm_fields={\'in_port\': 2, \'eth_src\': \'8a:06:6a:2c:10:fc\', \'eth_dst\': \'26:20:2f:85:5a:9a\'}),packet_count=71,priority=1,table_id=0), OFPFlowStats(byte_count=5348,cookie=0,duration_nsec=549000000,duration_sec=1893,flags=0,hard_timeout=0,idle_timeout=0,instructions=[OFPInstructionActions(actions=[OFPActionOutput(len=16,max_len=65509,port=2,type=0)],len=24,type=4)], length=104,match=OFPMatch(oxm_fields={\'in_port\': 1, \'eth_src\': \'26:20:2f:85:5a:9a\', \'eth_dst\': \'8a:06:6a:2c:10:fc\'}),packet_count=70,priority=1,table_id=0), OFPFlowStats(byte_count=8302,cookie=0,duration_nsec=438000000,duration_sec=1887,flags=0,hard_timeout=0,idle_timeout=0,instructions=[OFPInstructionActions(actions=[OFPActionOutput(len=16,max_len=65509,port=1,type=0)],len=24,type=4)], length=104,match=OFPMatch(oxm_fields={\'in_port\': 2, \'eth_src\': \'ca:9e:a1:af:b9:5f\', \'eth_dst\': \'26:20:2f:85:5a:9a\'}),packet_count=103,priority=1,table_id=0), OFPFlowStats(byte_count=8204,cookie=0,duration_nsec=436000000,duration_sec=1887,flags=0,hard_timeout=0,idle_timeout=0,instructions=[OFPInstructionActions(actions=[OFPActionOutput(len=16,max_len=65509,port=2,type=0)],len=24,type=4)] ,length=104,match=OFPMatch(oxm_fields={\'in_port\': 1, \'eth_src\': \'26:20:2f:85:5a:9a\', \'eth_dst\': \'ca:9e:a1:af:b9:5f\'}),packet_count=102,priority=1,table_id=0), OFPFlowStats(byte_count=6739,cookie=0,duration_nsec=807000000,duration_sec=9,flags=0,hard_timeout=0,idle_timeout=0,instructions=[OFPInstructionActions(actions=[OFPActionOutput(len=16,max_len=65535,port=4294967293,type=0)],len=24,type=4)], length=80,match=OFPMatch(oxm_fields={}),packet_count=74,priority=0,table_id=0) ] ,flags=0,type=1) OFPFlowStats( byte_count=5446, ---------- cookie=0, duration_nsec=552000000, duration_sec=1893, flags=0, hard_timeout=0, idle_timeout=0, instructions=[ OFPInstructionActions( actions=[ OFPActionOutput( len=16, max_len=65509, port=1, ---------- type=0) ], len=24, type=4 ) ], length=104, match=OFPMatch(oxm_fields={ \'in_port\': 2, ---------- \'eth_src\': \'8a:06:6a:2c:10:fc\', ---------- \'eth_dst\': \'26:20:2f:85:5a:9a\' ---------- }), packet_count=71, ---------- priority=1, table_id=0 ) [\'_STATS_MSG_TYPES\', \'_TYPE\', \'__class__\', \'__delattr__\', \'__dict__\', \'__dir__\', \'__doc__\', \'__eq__\', \'__format__\', \'__ge__\', \'__getattribute__\', \'__gt__\', \'__hash__\', \'__init__\', \'__init_subclass__\', \'__le__\', \'__lt__\', \'__module__\', \'__ne__\', \'__new__\', \'__reduce__\', \'__reduce_ex__\', \'__repr__\', \'__setattr__\', \'__sizeof__\', \'__str__\', \'__subclasshook__\', \'__weakref__\', \'_base_attributes\', \'_class_prefixes\', \'_class_suffixes\', \'_decode_value\', \'_encode_value\', \'_get_decoder\', \'_get_default_decoder\', \'_get_default_encoder\', \'_get_encoder\', \'_get_type\', \'_is_class\', \'_opt_attributes\', \'_restore_args\', \'_serialize_body\', \'_serialize_header\', \'_serialize_pre\', \'body\', \'buf\', \'cls_body_single_struct\', \'cls_from_jsondict_key\', \'cls_msg_type\', \'cls_stats_body_cls\', \'cls_stats_type\', \'datapath\' ---------- , \'flags\', \'from_jsondict\', \'msg_len\', \'msg_type\', \'obj_from_jsondict\', \'parser\', \'parser_stats\', \'parser_stats_body\', \'register_stats_type\', \'serialize\', \'set_buf\', \'set_classes\', \'set_headers\', \'set_xid\', \'stringify_attrs\', \'to_jsondict\', \'type\', \'version\', \'xid\']
三:实验演示
(一)开启Ryu
ryu-manager my_monitor.py
(二)开启Mininet
sudo mn --topo=tree,2,2 --controller=remote --mac
(三)Ryu显示结果