Kafka 2019-11-04
上一节讲到Cluster主要用来存放Node节点信息,TopicPartition分区信息和PartitionInfo分区详细信息,本节要讲的Metadata是将Cluster封装,提供一个统一的查询入口,此外还提供了对集群信息的更新操作。
通过阅读Metadata注释可以得出以下几点结论:
上面讲到了Metadata会在多线程环境下使用,因此Metadata通过synchronized修饰几乎所有方法来保证线程安全。
Metadata主要提供两个方法来实现集群信息的更新requestUpdate,awaitUpdate
public synchronized int requestUpdate(); public synchronized void awaitUpdate(final int lastVersion, final long maxWaitMs) throws InterruptedException;
/** * Request an update of the current cluster metadata info, return the current version before the update */ public synchronized int requestUpdate() { //将 更新位 置为true this.needUpdate = true; //返回当前元信息的版本 return this.version; }
needUpdate和version是Metadata的成员属性,needUpdate用来标记集群信息是否需要更新,
version用来表示元信息的当前版本。
/** * Wait for metadata update until the current version is larger than the last version we know of */ public synchronized void awaitUpdate(final int lastVersion, final long maxWaitMs) throws InterruptedException { //如果等待时间小于0,抛出非法参数异常 if (maxWaitMs < 0) throw new IllegalArgumentException("Max time to wait for metadata updates should not be < 0 milliseconds"); //当前时间 long begin = System.currentTimeMillis(); //剩余等待时间 long remainingWaitMs = maxWaitMs; //如果当前Metadata版本小于等于前一版本并且Metadata没有关闭,那么继续循环等待 //如果时间差大于剩余等待时间则抛出超时异常 //如果Metadata实例已经关闭,抛出自定义异常 while ((this.version <= lastVersion) && !isClosed()) { AuthenticationException ex = getAndClearAuthenticationException(); if (ex != null) throw ex; if (remainingWaitMs != 0) //wait挂起线程,等待其他线程notify wait(remainingWaitMs); long elapsed = System.currentTimeMillis() - begin; if (elapsed >= maxWaitMs) throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms."); remainingWaitMs = maxWaitMs - elapsed; } if (isClosed()) throw new KafkaException("Requested metadata update after close"); }
前面requestUpdate()不仅标记了需要更新元信息,还记录了元信息的版本version并作为参数传给awaitUpdate,awaitUpdate会将version和Metadata当前的版本this.version循环做比较,如果this.version小于或等于version则说明Metadata没有更新完成,那么用wait()挂起线程并释放掉Metadata的锁(注意这里用的是wait()不是sleep(),如果使用sleep不会释放Metadata的锁,这样sender线程无法更新Metadata从而造成死锁),并等待sender线程更新Metadata后使用notify通知主线程继续执行,如果等待超时或者Metdata已经关闭则抛出异常。
/** * Wait for cluster metadata including partitions for the given topic to be available. * @param topic The topic we want metadata for * @param partition A specific partition expected to exist in metadata, or null if there's no preference * @param maxWaitMs The maximum time in ms for waiting on the metadata * @return The cluster containing topic metadata and the amount of time we waited in ms * @throws KafkaException for all Kafka-related exceptions, including the case where this method is called after producer close */ private ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long maxWaitMs) throws InterruptedException { // add topic to metadata topic list if it is not there already and reset expiry //将Topic加入Metadata,并更新标志位 metadata.add(topic); Cluster cluster = metadata.fetch(); Integer partitionsCount = cluster.partitionCountForTopic(topic); // Return cached metadata if we have it, and if the record's partition is either undefined // or within the known partition range //如果不指定分区或者指定分区在已知分区范围内则不用等待 if (partitionsCount != null && (partition == null || partition < partitionsCount)) return new ClusterAndWaitTime(cluster, 0); long begin = time.milliseconds(); long remainingWaitMs = maxWaitMs; long elapsed; // Issue metadata requests until we have metadata for the topic or maxWaitTimeMs is exceeded. // In case we already have cached metadata for the topic, but the requested partition is greater // than expected, issue an update request only once. This is necessary in case the metadata // is stale and the number of partitions for this topic has increased in the meantime. //循环发出更新元数据请求直到获取最新分区数量 do { log.trace("Requesting metadata update for topic {}.", topic); metadata.add(topic); //发出请求,并返回当前版本 int version = metadata.requestUpdate(); //唤醒sender线程,由serder去查询更新元信息 sender.wakeup(); try { //等待元数据更新完成,或超时抛出异常 metadata.awaitUpdate(version, remainingWaitMs); } catch (TimeoutException ex) { // Rethrow with original maxWaitMs to prevent logging exception with remainingWaitMs throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms."); } cluster = metadata.fetch(); elapsed = time.milliseconds() - begin; if (elapsed >= maxWaitMs) throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms."); if (cluster.unauthorizedTopics().contains(topic)) throw new TopicAuthorizationException(topic); //更新剩余时间 remainingWaitMs = maxWaitMs - elapsed; partitionsCount = cluster.partitionCountForTopic(topic); } while (partitionsCount == null); //Topic 分区信息更新后指定分区仍然小于分区数量则抛出异常 if (partition != null && partition >= partitionsCount) { throw new KafkaException( String.format("Invalid partition given with record: %d is not in the range [0...%d).", partition, partitionsCount)); } return new ClusterAndWaitTime(cluster, elapsed); }
/** * Add the topic to maintain in the metadata. If topic expiry is enabled, expiry time * will be reset on the next update. */ public synchronized void add(String topic) { Objects.requireNonNull(topic, "topic cannot be null"); if (topics.put(topic, TOPIC_EXPIRY_NEEDS_UPDATE) == null) { requestUpdateForNewTopics(); } }
因为add并不会直接更新metadata,metadata内部维护了一个Topic和过期时间的Map(即topic expiry),Sender线程会定期更新过期Topic,add方法只有在添加新Topic时才会触发更新,如果Map中已经存在Topic那么不会立即更新,这也是上面第三步add后第四步手动requestUpdate的原因。