mirahs 2019-06-28
FloodlightProvider 使用 Netty 库来处理到交换机的线程和连接。
每个 OpenFlow 消息将通过一个 Netty 的线程进行处理,并执行与所有模块的消息相关联的所有逻辑
其他模块也可以注册类似交换机连接或断开和端口状态通知特定时间。
为了使模块注册为基于 OpenFlow 消息的,必须实现 IOFMessageListener 接口
要监听 OpenFlow 消息,要先向 FloodlightProvider 注册
调用 IFloodlightProviderService(具体由 Controller 类实现)的 addOFMessageListener 方法进行注册订阅
核心工作是在 ListenerDispatcher 类来完成。
每次增加观察者都会判断是否是终结点(也就是不被其他的 Listener 所依赖),因为最终确定这些观察者顺序的时候就是由这些终结点开始往前进行 DFS 遍历得到
@Override public synchronized void addOFMessageListener(OFType type, IOFMessageListener listener) { //先判断与type对应的 ListenerDispatcher对象是否存在 ListenerDispatcher<OFType, IOFMessageListener> ldd = messageListeners.get(type); if (ldd == null) { ldd = new ListenerDispatcher<OFType, IOFMessageListener>(); messageListeners.put(type, ldd); } //注册监听type这个消息; ldd.addListener(type, listener); }
volatile List<T> listeners = new ArrayList<T>();
//每个OF msg都有唯一的ListenerDispatcher对象,观察者存在listeners链表中
private boolean ispre(U type, T l1, T l2) { return (l2.isCallbackOrderingPrereq(type, l1.getName()) || l1.isCallbackOrderingPostreq(type, l2.getName())); }
返回两个传入的监听器的顺序
public void addListener(U type, T listener) { List<T> newlisteners = new ArrayList<T>(); if (listeners != null) newlisteners.addAll(listeners); newlisteners.add(listener); // Find nodes without outgoing edges // 查找没有出边的节点 List<T> terminals = new ArrayList<T>(); for (T i : newlisteners) { boolean isterm = true; for (T j : newlisteners) { if (ispre(type, i, j)) { //两个都不关心前后顺序的时候 isterm = false; break; } } if (isterm) { //关乎有前后顺序的监听模块存入 terminals.add(i); } } if (terminals.size() == 0) { logger.error("No listener dependency solution: " + "No listeners without incoming dependencies"); listeners = newlisteners; return; } // visit depth-first traversing in the opposite order from // the dependencies. Note we will not generally detect cycles /** * 以相反顺序访问深度优先遍历依赖。 注意我们通常不会检测周期 */ HashSet<T> visited = new HashSet<T>(); List<T> ordering = new ArrayList<T>(); for (T term : terminals) { //进行排序 visit(newlisteners, type, visited, ordering, term); } listeners = ordering; }
private void visit(List<T> newlisteners, U type, HashSet<T> visited, List<T> ordering, T listener) { if (!visited.contains(listener)) { visited.add(listener); for (T i : newlisteners) { if (ispre(type, i, listener)) { visit(newlisteners, type, visited, ordering, i); } } ordering.add(listener); // } }
public interface IListener<T> public enum Command { CONTINUE, STOP } 状态值,用来判断是否继续执行 public String getName(); //用来判断 name 的这个模块是否要在当前对象之前执行 public boolean isCallbackOrderingPrereq(T type, String name); //用来判断 name 的这个模块是否要在当前对象之后执行 public boolean isCallbackOrderingPostreq(T type, String name); IOFMessageListener接口继承了 IListener 接口,同时定义了 receive 方法 public Command receive(IOFSwitch sw, OFMessage msg, FloodlightContext cntx); 返回 CONTINUE 或者 STOP,继续看每个继承这个接口的模块的重写
TopologyManager 模块的IOFMessageListener 重写的方法:
@Override public String getName() { return MODULE_NAME; //此处为 topology,每个模块都有自己的 MODULE_NAME } @Override public boolean isCallbackOrderingPrereq(OFType type, String name) { //从此处可以看出,在执行这个模块之前,需要先执行 MODULE_NAME 为 linkiscovery 的模块 return "linkdiscovery".equals(name); } @Override public boolean isCallbackOrderingPostreq(OFType type, String name) { return false; } @Override public Command receive(IOFSwitch sw, OFMessage msg, FloodlightContext cntx) { switch (msg.getType()) { case PACKET_IN: ctrIncoming.increment();//计数器,加一 //调用这里的执行方法 return this.processPacketInMessage(sw, (OFPacketIn) msg, cntx); default: break; } return Command.CONTINUE; }
通过 Type Hierarchy 可以找到Packet-In消息处理顺序的几个模块
基本数据结构,这是一个上下文对象,Floodlight代码监听器可以注册它,稍后可以检索与事件相关联的上下文信息
public class FloodlightContext { protected ConcurrentHashMap<String, Object> storage = new ConcurrentHashMap<String, Object>(); public ConcurrentHashMap<String, Object> getStorage() { return storage; } }
创建了一个 HashMap storage,
public class FloodlightContextStore<V> { @SuppressWarnings("unchecked") public V get(FloodlightContext bc, String key) { return (V)bc.storage.get(key); } public void put(FloodlightContext bc, String key, V value) { bc.storage.put(key, value); } public void remove(FloodlightContext bc, String key) { bc.storage.remove(key); } }
一个FloodlightContextStore对象,可用于PACKET-IN有效内容,消息对象是Ethernet类型
public static final FloodlightContextStore<Ethernet> bcStore = new FloodlightContextStore<Ethernet>();
IOFMessageListener 的 receive 方法
@Override public Command receive(IOFSwitch sw, OFMessage msg, FloodlightContext cntx) { switch (msg.getType()) { case PACKET_IN: ctrIncoming.increment(); return this.handlePacketIn(sw.getId(), (OFPacketIn) msg, cntx); default: break; } return Command.CONTINUE; }
主要使用了 handlePacketIn()方法
protected Command handlePacketIn(DatapathId sw, OFPacketIn pi, FloodlightContext cntx) { //提取 Packet-In 的有效分组内容 Ethernet eth = IFloodlightProviderService.bcStore.get(cntx, IFloodlightProviderService.CONTEXT_PI_PAYLOAD); OFPort inPort = (pi.getVersion().compareTo(OFVersion.OF_12) < 0 ? pi.getInPort() : pi.getMatch().get(MatchField.IN_PORT)); if (eth.getPayload() instanceof BSN) { BSN bsn = (BSN) eth.getPayload(); if (bsn == null) return Command.STOP; if (bsn.getPayload() == null) return Command.STOP; // It could be a packet other than BSN LLDP, therefore // continue with the regular processing. // 它可以是除BSN LLDP之外的分组,因此继续进行常规处理。 if (bsn.getPayload() instanceof LLDP == false) return Command.CONTINUE; return handleLldp((LLDP) bsn.getPayload(), sw, inPort, false, cntx); } else if (eth.getPayload() instanceof LLDP) { return handleLldp((LLDP) eth.getPayload(), sw, inPort, true, cntx); } else if (eth.getEtherType().getValue() < 1536 && eth.getEtherType().getValue() >= 17) { long destMac = eth.getDestinationMACAddress().getLong(); if ((destMac & LINK_LOCAL_MASK) == LINK_LOCAL_VALUE) { ctrLinkLocalDrops.increment(); if (log.isTraceEnabled()) { log.trace("Ignoring packet addressed to 802.1D/Q " + "reserved address."); } return Command.STOP; } } else if (eth.getEtherType().getValue() < 17) { log.error("Received invalid ethertype of {}.", eth.getEtherType()); return Command.STOP; } if (ignorePacketInFromSource(eth.getSourceMACAddress())) { ctrIgnoreSrcMacDrops.increment(); return Command.STOP; } // If packet-in is from a quarantine port, stop processing. NodePortTuple npt = new NodePortTuple(sw, inPort); if (quarantineQueue.contains(npt)) { ctrQuarantineDrops.increment(); return Command.STOP; } return Command.CONTINUE; }
IOFMessageListener 的 receive 方法 @Override public Command receive(IOFSwitch sw, OFMessage msg, FloodlightContext cntx) { switch (msg.getType()) { case PACKET_IN: ctrIncoming.increment(); return this.processPacketInMessage(sw, (OFPacketIn) msg, cntx); default: break; } return Command.CONTINUE; } 主要使用了processPacketInMessage()方法 protected Command processPacketInMessage(IOFSwitch sw, OFPacketIn pi, FloodlightContext cntx) { // get the packet-in switch. Ethernet eth = IFloodlightProviderService.bcStore. get(cntx,IFloodlightProviderService.CONTEXT_PI_PAYLOAD); if (eth.getPayload() instanceof BSN) { BSN bsn = (BSN) eth.getPayload(); if (bsn == null) return Command.STOP; if (bsn.getPayload() == null) return Command.STOP; // 可能不是 BSN LLDP,继续常规处理 if (bsn.getPayload() instanceof LLDP == false) return Command.CONTINUE; doFloodBDDP(sw.getId(), pi, cntx); return Command.STOP; } else { return dropFilter(sw.getId(), pi, cntx); } }
设备管理器通过 PACKET-IN 消息请求了解设备,通过 PACKET-IN 消息获取信息,根据实体如何建立进行分类。默认情况下,entity classifies 使用 MAC 地址和 VLAN 来识别设备。这两个属性定义一个独一无二的设备。设备管理器将了解其他属性,如 IP 地址。
信息中的一个重要的部分是设备的连接点,如果一个交换机接受到一个 PACKET-IN 消息,则交换机将会创建一个连接点,设备也会根据时间清空连接点,IP 地址,以及设备本身,最近看到的时间戳是用来保持清空过程的控制
IOFMessageListener 的 receive 方法
@Override public Command receive(IOFSwitch sw, OFMessage msg, FloodlightContext cntx) { switch (msg.getType()) { case PACKET_IN: cntIncoming.increment(); return this.processPacketInMessage(sw, (OFPacketIn) msg, cntx); default: break; } return Command.CONTINUE; }
主要使用了processPacketInMessage()方法
protected Command processPacketInMessage(IOFSwitch sw, OFPacketIn pi, FloodlightContext cntx) { Ethernet eth = IFloodlightProviderService.bcStore.get(cntx,IFloodlightProviderService.CONTEXT_PI_PAYLOAD); OFPort inPort = (pi.getVersion().compareTo(OFVersion.OF_12) < 0 ? pi.getInPort() : pi.getMatch().get(MatchField.IN_PORT)); // Extract source entity information Entity srcEntity = getSourceEntityFromPacket(eth, sw.getId(), inPort); if (srcEntity == null) { cntInvalidSource.increment(); return Command.STOP; } // Learn from ARP packet for special VRRP settings. // In VRRP settings, the source MAC address and sender MAC // addresses can be different. In such cases, we need to learn // the IP to MAC mapping of the VRRP IP address. The source // entity will not have that information. Hence, a separate call // to learn devices in such cases. learnDeviceFromArpResponseData(eth, sw.getId(), inPort); // Learn/lookup device information Device srcDevice = learnDeviceByEntity(srcEntity); if (srcDevice == null) { cntNoSource.increment(); return Command.STOP; } // Store the source device in the context fcStore.put(cntx, CONTEXT_SRC_DEVICE, srcDevice); // Find the device matching the destination from the entity // classes of the source. if (eth.getDestinationMACAddress().getLong() == 0) { cntInvalidDest.increment(); return Command.STOP; } Entity dstEntity = getDestEntityFromPacket(eth); Device dstDevice = null; if (dstEntity != null) { dstDevice = findDestByEntity(srcDevice.getEntityClass(), dstEntity); if (dstDevice != null) fcStore.put(cntx, CONTEXT_DST_DEVICE, dstDevice); else cntNoDest.increment(); } else { cntNoDest.increment(); } if (logger.isTraceEnabled()) { logger.trace("Received PI: {} on switch {}, port {} *** eth={}" + " *** srcDev={} *** dstDev={} *** ", new Object[] { pi, sw.getId().toString(), inPort, eth, srcDevice, dstDevice }); } snoopDHCPClientName(eth, srcDevice); return Command.CONTINUE; }
在 Floodlight 启动时,没有虚拟网络创建,这时主机之间不能相互通信。
一旦用户创建虚拟网络,则主机就能够被添加。
在 PACKET-IN 消息转发实现前,模块将启动。
一旦,一条 PACKET-IN 消息被接受,模块将查看源 MAC 地址和目的 MAC 地址,如果2个 MAC 地址是同一个虚拟网络,模块将返回 Command.CONINUE消息,并且继续处理流。如果MAC 地址不在同一个虚拟网络则返回 Command.STOP 消息,并丢弃包
该模块可用于 OpenStack 的部署
包含此模块的默认配置文件位置:
src/main/resources/neutron.properties
IOFMessageListener 的 receive 方法
@Override public Command receive(IOFSwitch sw, OFMessage msg, FloodlightContext cntx) { switch (msg.getType()) { case PACKET_IN: return processPacketIn(sw, (OFPacketIn)msg, cntx); default: break; } log.warn("Received unexpected message {}", msg); return Command.CONTINUE; }
主要使用了processPacketIn()方法
protected Command processPacketIn(IOFSwitch sw, OFPacketIn msg, FloodlightContext cntx) { Ethernet eth = IFloodlightProviderService.bcStore.get(cntx, IFloodlightProviderService.CONTEXT_PI_PAYLOAD); Command ret = Command.STOP; String srcNetwork = macToGuid.get(eth.getSourceMACAddress()); // If the host is on an unknown network we deny it. // We make exceptions for ARP and DHCP. if (eth.isBroadcast() || eth.isMulticast() || isDefaultGateway(eth) || isDhcpPacket(eth)) { ret = Command.CONTINUE; } else if (srcNetwork == null) { log.trace("Blocking traffic from host {} because it is not attached to any network.", eth.getSourceMACAddress().toString()); ret = Command.STOP; } else if (oneSameNetwork(eth.getSourceMACAddress(), eth.getDestinationMACAddress())) { // if they are on the same network continue ret = Command.CONTINUE; } if (log.isTraceEnabled()) log.trace("Results for flow between {} and {} is {}", new Object[] {eth.getSourceMACAddress(), eth.getDestinationMACAddress(), ret}); /* * TODO - figure out how to still detect gateways while using * drop mods if (ret == Command.STOP) { if (!(eth.getPayload() instanceof ARP)) doDropFlow(sw, msg, cntx); } */ return ret; }
IOFMessageListener 的 receive 方法
@Override public net.floodlightcontroller.core.IListener.Command receive(IOFSwitch sw, OFMessage msg, FloodlightContext cntx) { switch (msg.getType()) { case PACKET_IN: return processPacketIn(sw, (OFPacketIn)msg, cntx); default: break; } log.warn("Received unexpected message {}", msg); return Command.CONTINUE; }
主要使用了processPacketIn()方法
private net.floodlightcontroller.core.IListener.Command processPacketIn(IOFSwitch sw, OFPacketIn pi, FloodlightContext cntx) { Ethernet eth = IFloodlightProviderService.bcStore.get(cntx, IFloodlightProviderService.CONTEXT_PI_PAYLOAD); IPacket pkt = eth.getPayload(); if (eth.isBroadcast() || eth.isMulticast()) { // handle ARP for VIP if (pkt instanceof ARP) { // retrieve arp to determine target IP address ARP arpRequest = (ARP) eth.getPayload(); IPv4Address targetProtocolAddress = arpRequest.getTargetProtocolAddress(); if (vipIpToId.containsKey(targetProtocolAddress.getInt())) { String vipId = vipIpToId.get(targetProtocolAddress.getInt()); vipProxyArpReply(sw, pi, cntx, vipId); return Command.STOP; } } } else { // currently only load balance IPv4 packets - no-op for other traffic if (pkt instanceof IPv4) { IPv4 ip_pkt = (IPv4) pkt; // If match Vip and port, check pool and choose member int destIpAddress = ip_pkt.getDestinationAddress().getInt(); if (vipIpToId.containsKey(destIpAddress)){ IPClient client = new IPClient(); client.ipAddress = ip_pkt.getSourceAddress(); client.nw_proto = ip_pkt.getProtocol(); if (ip_pkt.getPayload() instanceof TCP) { TCP tcp_pkt = (TCP) ip_pkt.getPayload(); client.srcPort = tcp_pkt.getSourcePort(); client.targetPort = tcp_pkt.getDestinationPort(); } if (ip_pkt.getPayload() instanceof UDP) { UDP udp_pkt = (UDP) ip_pkt.getPayload(); client.srcPort = udp_pkt.getSourcePort(); client.targetPort = udp_pkt.getDestinationPort(); } if (ip_pkt.getPayload() instanceof ICMP) { client.srcPort = TransportPort.of(8); client.targetPort = TransportPort.of(0); } LBVip vip = vips.get(vipIpToId.get(destIpAddress)); if (vip == null) // fix dereference violations return Command.CONTINUE; LBPool pool = pools.get(vip.pickPool(client)); if (pool == null) // fix dereference violations return Command.CONTINUE; LBMember member = members.get(pool.pickMember(client)); if(member == null) //fix dereference violations return Command.CONTINUE; // for chosen member, check device manager and find and push routes, in both directions pushBidirectionalVipRoutes(sw, pi, cntx, client, member); // packet out based on table rule pushPacket(pkt, sw, pi.getBufferId(), (pi.getVersion().compareTo(OFVersion.OF_12) < 0) ? pi.getInPort() : pi.getMatch().get(MatchField.IN_PORT), OFPort.TABLE, cntx, true); return Command.STOP; } } } // bypass non-load-balanced traffic for normal processing (forwarding) return Command.CONTINUE; }
IOFMessageListener 的 receive 方法
@Override public Command receive(IOFSwitch sw, OFMessage msg, FloodlightContext cntx) { switch (msg.getType()) { case PACKET_IN: IRoutingDecision decision = null; if (cntx != null) { decision = RoutingDecision.rtStore.get(cntx, IRoutingDecision.CONTEXT_DECISION); } return this.processPacketInMessage(sw, (OFPacketIn) msg, decision, cntx); default: break; } return Command.CONTINUE; }
主要使用了processPacketInMessage()方法
public abstract Command processPacketInMessage(IOFSwitch sw, OFPacketIn pi, IRoutingDecision decision, FloodlightContext cntx);
所有继承了 ForwardingBase 的子类Forwarding重写了这个方法,实现具体的操作
@Override public Command processPacketInMessage(IOFSwitch sw, OFPacketIn pi, IRoutingDecision decision, FloodlightContext cntx) { Ethernet eth = IFloodlightProviderService.bcStore.get(cntx, IFloodlightProviderService.CONTEXT_PI_PAYLOAD); // We found a routing decision (i.e. Firewall is enabled... it's the only thing that makes RoutingDecisions) if (decision != null) { if (log.isTraceEnabled()) { log.trace("Forwarding decision={} was made for PacketIn={}", decision.getRoutingAction().toString(), pi); } switch(decision.getRoutingAction()) { case NONE: // don't do anything return Command.CONTINUE; case FORWARD_OR_FLOOD: case FORWARD: doForwardFlow(sw, pi, decision, cntx, false); return Command.CONTINUE; case MULTICAST: // treat as broadcast doFlood(sw, pi, decision, cntx); return Command.CONTINUE; case DROP: doDropFlow(sw, pi, decision, cntx); return Command.CONTINUE; default: log.error("Unexpected decision made for this packet-in={}", pi, decision.getRoutingAction()); return Command.CONTINUE; } } else { // No routing decision was found. Forward to destination or flood if bcast or mcast. if (log.isTraceEnabled()) { log.trace("No decision was made for PacketIn={}, forwarding", pi); } if (eth.isBroadcast() || eth.isMulticast()) { doFlood(sw, pi, decision, cntx); } else { doForwardFlow(sw, pi, decision, cntx, false); } } return Command.CONTINUE; }