amwayy 2019-11-20
由于Zookeeper并不适合大批量的频繁写入操作,新版Kafka已推荐将consumer的位移信息保存在Kafka内部的topic中,即__consumer_offsets topic,并且默认提供了kafka_consumer_groups.sh脚本供用户查看consumer信息。
默认情况下__consumer_offsets有50个分区。
下面是在windows平台上进行演示的,Linux 平台的命令基本一致,只有命令行路径(bin\windows)以及后缀(.bat)有差异。
运行如下命令,可以看到__consumer_offsets topic,从名称上来推测,这个topic是和consumer的位移相关的。
bin\windows\kafka-topics.bat --list --bootstrap-server localhost:9092
进一步查看topic详细信息
bin\windows\kafka-topics.bat --describe --bootstrap-server localhost:9092
可以看到__consumer_offsets有50个分区。
__consumer_offest不受server.properties中num.partitions和default.replication.factor参数的制约。相反地,它的分区数和备份因子分别由offsets.topic.num.partitions和offsets.topic.replication.factor参数决定。这两个参数的默认值分别是50和1,表示该topic有50个分区,副本因子是1。
consumers 将最后使用的消息位移量(或偏移量 offset id)存储在基于consumer group id的kafka topic:__consumer_offsets中。这样,不同的consumers(显然具有不同的consumer id)能够在上一次消费的消息之后,处理下一条消息,并避免重复的消息处理。
针对Kafka 2.x版本,可以使用如下命令从__consumer_offsets topic中读取消息:
kafka-console-consumer --bootstrap-server localhost:9092 --topic __consumer_offsets --formatter "kafka.coordinator.group.GroupMetadataManager$OffsetsMessageFormatter"
下面是输出结果:
可以看到__consumer_offsets topic 中的每一日志项的格式都是:
[Group, Topic, Partition]::[OffsetMetadata[Offset, Metadata], CommitTime, ExpirationTime]
其中test-consumer-group (Group)是在consumer.properties 配置的consumer group id:
kafka-consumer-groups.sh命令可以查看consumer信息。
1. 查看consumer group列表,使用--list参数
bin\windows\kafka-consumer-groups.bat --list --bootstrap-server localhost:9092
2. 查看特定consumer group 详情,使用--group与--describe参数
bin\windows\kafka-consumer-groups.bat --bootstrap-server localhost:9092 --group console-consumer-43322 --describe
其中,current-offset 和 log-end-offset还有 lag ,分别为当前位移量,结束的位移量,落后的位移量。