hongsheyoumo 2019-11-16
sonic在处理路由,接口up/down,接口地址变化,team等事件上极大的依赖内核。sonic通过监听rtnl事件来响应linux事件。从而感知相关信息变化。
sonic使用libnl库来操作netlink事件,详细内容可以访问http://www.infradead.org/~tgr...。sonic在libnl库基础上封装了类NetLink进行netlink操作。
class NetLink : public Selectable { public: NetLink(int pri = 0); ~NetLink() override; void registerGroup(int rtnlGroup);//注册想要监听的事件,加入组播组 void dumpRequest(int rtmGetCommand); int getFd() override;//判断句柄是否在select的可用事件中 void readData() override;//获取socket中的信息,触发回调函数 private: static int onNetlinkMsg(struct nl_msg *msg, void *arg);//回调函数 nl_sock *m_socket;//套接口描述符 };
NetLink::NetLink(int pri) : Selectable(pri), m_socket(NULL) { m_socket = nl_socket_alloc();//申请描述符 if (!m_socket) { SWSS_LOG_ERROR("Unable to allocated netlink socket"); throw system_error(make_error_code(errc::address_not_available), "Unable to allocated netlink socket"); } nl_socket_disable_seq_check(m_socket);//不进行序列号检查 //注册回调函数,读取信息时,会自动回调该函数onNetlinkMsg nl_socket_modify_cb(m_socket, NL_CB_VALID, NL_CB_CUSTOM, onNetlinkMsg, this); //连接内核netlink int err = nl_connect(m_socket, NETLINK_ROUTE); if (err < 0) { SWSS_LOG_ERROR("Unable to connect netlink socket: %s", nl_geterror(err)); nl_socket_free(m_socket); m_socket = NULL; throw system_error(make_error_code(errc::address_not_available), "Unable to connect netlink socket"); } //非阻塞 nl_socket_set_nonblocking(m_socket); /* Set socket buffer size to 256KB */ nl_socket_set_buffer_size(m_socket, 2097152, 0); }
void NetLink::registerGroup(int rtnlGroup) { int err = nl_socket_add_membership(m_socket, rtnlGroup);//加入组播组 if (err < 0) { SWSS_LOG_ERROR("Unable to register to group %d: %s", rtnlGroup, nl_geterror(err)); throw system_error(make_error_code(errc::address_not_available), "Unable to register group"); } }
int NetLink::getFd()//获取套接口句柄 { return nl_socket_get_fd(m_socket); }
void NetLink::readData() { int err; do { err = nl_recvmsgs_default(m_socket);//读取数据,有libnl触发回调函数,处理业务 } while(err == -NLE_INTR); // Retry if the process was interrupted by a signal if (err < 0) { if (err == -NLE_NOMEM) SWSS_LOG_ERROR("netlink reports out of memory on reading a netlink socket. High possiblity of a lost message"); else if (err == -NLE_AGAIN) SWSS_LOG_DEBUG("netlink reports NLE_AGAIN on reading a netlink socket"); else SWSS_LOG_ERROR("netlink reports an error=%d on reading a netlink socket", err); } }
//回调函数,读取消息时回调该函数,该函数是一个消息分发器 int NetLink::onNetlinkMsg(struct nl_msg *msg, void *arg) { NetDispatcher::getInstance().onNetlinkMessage(msg); return NL_OK; }
void NetLink::dumpRequest(int rtmGetCommand)//用于获取信息,实现get命令,查看内核相关信息 { int err = nl_rtgen_request(m_socket, rtmGetCommand, AF_UNSPEC, NLM_F_DUMP); if (err < 0) { SWSS_LOG_ERROR("Unable to request dump on group %d: %s", rtmGetCommand, nl_geterror(err)); throw system_error(make_error_code(errc::address_not_available), "Unable to request dump"); } }
class NetDispatcher { public: /**/ static NetDispatcher& getInstance();//获取消息分发器实例,消息分发器全局一个,静态函数 /* * Register callback class according to message-type. * * Throw exception if,注册消息处理函数 */ void registerMessageHandler(int nlmsg_type, NetMsg *callback); /* * Called by NetLink or FpmLink classes as indication of new packet arrival * 给netlink的回调函数 */ void onNetlinkMessage(struct nl_msg *msg); private: NetDispatcher() = default; /* nl_msg_parse callback API */ static void nlCallback(struct nl_object *obj, void *context); std::map<int, NetMsg * > m_handlers;//回调函数存储map }; class NetMsg { public: /* Called by NetDispatcher when netmsg matches filters */ virtual void onMsg(int nlmsg_type, struct nl_object *obj) = 0; }; }
NetDispatcher& NetDispatcher::getInstance()//消息分发器实例获取函数 { static NetDispatcher gInstance;//定义了一个静态分发器,全局一个 return gInstance; }
void NetDispatcher::registerMessageHandler(int nlmsg_type, NetMsg *callback)//注册回调函数 { if (m_handlers.find(nlmsg_type) != m_handlers.end()) throw "Trying to registered on already registerd netlink message"; m_handlers[nlmsg_type] = callback; }
void NetDispatcher::nlCallback(struct nl_object *obj, void *context) { NetMsg *callback = (NetMsg *)context; callback->onMsg(nl_object_get_msgtype(obj), obj); }
void NetDispatcher::onNetlinkMessage(struct nl_msg *msg)//netlink回调函数的真实实现 { struct nlmsghdr *nlmsghdr = nlmsg_hdr(msg);//获取netlink消息头 auto callback = m_handlers.find(nlmsghdr->nlmsg_type);//获取消息类型对应的NetMsg描述结构 /* Drop not registered messages */ if (callback == m_handlers.end())//没有对应的消息处理函数 return; //解析消息,调用NetDispatcher::nlCallback nl_msg_parse(msg, NetDispatcher::nlCallback, (void *)(callback->second)); }
我们以接口管理为例进行说明。
class IntfSync : public NetMsg { public: enum { MAX_ADDR_SIZE = 64 }; IntfSync(DBConnector *db);//连接数据库 virtual void onMsg(int nlmsg_type, struct nl_object *obj); private: ProducerStateTable m_intfTable; }; } //消息处理函数 void IntfSync::onMsg(int nlmsg_type, struct nl_object *obj) { char addrStr[MAX_ADDR_SIZE + 1] = {0}; struct rtnl_addr *addr = (struct rtnl_addr *)obj; string key; string scope = "global"; string family; //响应新地址,获取地址,删除地址三个信息 if ((nlmsg_type != RTM_NEWADDR) && (nlmsg_type != RTM_GETADDR) && (nlmsg_type != RTM_DELADDR)) return; /* Don't sync local routes,不同步local地址信息 */ if (rtnl_addr_get_scope(addr) != RT_SCOPE_UNIVERSE) { scope = "local"; return; } if (rtnl_addr_get_family(addr) == AF_INET) family = IPV4_NAME; else if (rtnl_addr_get_family(addr) == AF_INET6) family = IPV6_NAME; else // Not supported return; //获取接口名字以及地址,组合成key key = LinkCache::getInstance().ifindexToName(rtnl_addr_get_ifindex(addr)); key+= ":"; nl_addr2str(rtnl_addr_get_local(addr), addrStr, MAX_ADDR_SIZE); key+= addrStr; if (nlmsg_type == RTM_DELADDR)//地址删除,删除key { m_intfTable.del(key); return; } //添加key std::vector<FieldValueTuple> fvVector; FieldValueTuple f("family", family); FieldValueTuple s("scope", scope); fvVector.push_back(s); fvVector.push_back(f); m_intfTable.set(key, fvVector); }
int main(int argc, char **argv) { swss::Logger::linkToDbNative("intfsyncd"); DBConnector db(APPL_DB, DBConnector::DEFAULT_UNIXSOCKET, 0);//连接APPL_DB IntfSync sync(&db);//实例化netmsg //订阅消息,加入组播组 NetDispatcher::getInstance().registerMessageHandler(RTM_NEWADDR, &sync); NetDispatcher::getInstance().registerMessageHandler(RTM_DELADDR, &sync); while (1) { try { NetLink netlink; Select s; netlink.registerGroup(RTNLGRP_IPV4_IFADDR); netlink.registerGroup(RTNLGRP_IPV6_IFADDR); cout << "Listens to interface messages..." << endl; netlink.dumpRequest(RTM_GETADDR);//打印所有地址 s.addSelectable(&netlink);//加入select事件 while (true) { Selectable *temps; s.select(&temps);//监听select事件 } } catch (const std::exception& e) { cout << "Exception \"" << e.what() << "\" had been thrown in deamon" << endl; return 0; } } return 1; }