guicaizhou 2015-08-27
topicconfigManager类
主要流程为
1.监控config/change节点,那个topic的config变化了
2.从zk上的topic的config目录,获取最新config信息
3.更新logmanager里指定topic的tplog(每个topic每个partition对应一个log)配置
/** * 注册config change的listener * Begin watching for config changes */ def startup() { ZkUtils.makeSurePersistentPathExists(zkClient, ZkUtils.TopicConfigChangesPath) //监听/config/changes的子节点,ConfigChangeListener zkClient.subscribeChildChanges(ZkUtils.TopicConfigChangesPath, ConfigChangeListener) //启动服务,检查是否有topic的config需要更新,使用跟ConfigChangeListener相同的方法processConfigChanges processAllConfigChanges() }
主要方法processConfigChanges
/** * change config topic需要 * 1.设置zk上的topic config; * 2.在zk上添加一个notification,标志哪个topic的config被改变 * Process the given list of config changes */ private def processConfigChanges(notifications: Seq[String]) { if (notifications.size > 0) { info("Processing config change notification(s)...") val now = time.milliseconds val logs = logManager.logsByTopicPartition.toBuffer //group by topic,Buffer[Log] buffer._2 := Log val logsByTopic = logs.groupBy(_._1.topic).mapValues(_.map(_._2)) for (notification <- notifications) { val changeId = changeNumber(notification) if (changeId > lastExecutedChange) {//changeid是比现在新的 val changeZnode = ZkUtils.TopicConfigChangesPath + "/" + notification val (jsonOpt, stat) = ZkUtils.readDataMaybeNull(zkClient, changeZnode) if(jsonOpt.isDefined) { val json = jsonOpt.get val topic = json.substring(1, json.length - 1) // hacky way to dequote if (logsByTopic.contains(topic)) { /* combine the default properties with the overrides in zk to create the new LogConfig */ val props = new Properties(logManager.defaultConfig.toProps) props.putAll(AdminUtils.fetchTopicConfig(zkClient, topic))//获得最新topic config和default prop的合并值 val logConfig = LogConfig.fromProps(props) for (log <- logsByTopic(topic))//获得当前logmanager对象中所有这个topic的log对象 log.config = logConfig info("Processed topic config change %d for topic %s, setting new config to %s.".format(changeId, topic, props)) purgeObsoleteNotifications(now, notifications) } } lastExecutedChange = changeId } } } }