guicaizhou 2020-01-25
使用Kafka作为消息中间件消费数据时,监控Kafka消费的进度很重要。其中,在监控消费进度的过程中,主要关注消费Lag。
常用监控Kafka消费进度的方法有三种,分别是使用Kafka自带的命令行工具、使用Kafka Consumer API和Kafka自带的JMX监控指标,这里介绍前两种方法。
注: 内网IP:10.12.100.126 10.12.100.127 10.12.100.128 外网IP:47.90.133.76 47.90.133.77 47.90.133.78 用户名:server1 server2 server3
针对Kafka高级消费API,使用kafka自带的命令行工具kafka-consumer-groups.sh脚本直接查看Kafka消费进度
(base) :/opt/kafka/kafka_2.11-0.10.2.2/bin# kafka-consumer-groups.sh new-consumer --bootstrap-server 10.12.100.126:9092,10.12.100.127:9092,10.12.100.128:9092 --list Note: This will only show information about consumers that use the Java consumer API (non-ZooKeeper-based consumers). SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/opt/hadoop/hadoop-2.7.6/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/opt/kafka/kafka_2.11-0.10.2.2/libs/slf4j-log4j12-1.7.21.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] consumer consumers
(base) :/opt/kafka/kafka_2.11-0.10.2.2/bin# kafka-consumer-groups.sh --bootstrap-server 10.12.100.126:9092,10.12.100.127:9092,10.12.100.128:9092 --describe --group consumers Note: This will only show information about consumers that use the Java consumer API (non-ZooKeeper-based consumers). SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/opt/hadoop/hadoop-2.7.6/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/opt/kafka/kafka_2.11-0.10.2.2/libs/slf4j-log4j12-1.7.21.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID test 0 17734 17734 0 consumer-1-dd342b6b-176a-4127-9a0b-81a3b46bd388 /47.90.133.76 consumer-1 test 1 17736 17736 0 consumer-1-dd342b6b-176a-4127-9a0b-81a3b46bd388 /47.90.133.76 consumer-1 test 2 17735 17735 0 consumer-1-dd342b6b-176a-4127-9a0b-81a3b46bd388 /47.90.133.76 consumer-1
GROUP TOPIC PID OFFSET LOGSIZE LAG
消费者组 话题id 分区id 当前已消费的条数 总条数 未消费的条数
注意:LAG的单位时消息条数,LAG为0,表示消费者实时消费生产者产生的消息,无滞后;LAG越大,表示消费者不能及时消费生产者生产的消息,有滞后。
from kafka import SimpleClient, KafkaConsumer from kafka.common import OffsetRequestPayload, TopicPartition def get_topic_offset(brokers, topic): client = SimpleClient(brokers) partitions = client.topic_partitions[topic] offset_requests = [OffsetRequestPayload(topic, p, -1, 1) for p in partitions.keys()] offsets_responses = client.send_offset_request(offset_requests) return sum([r.offsets[0] for r in offsets_responses]) def get_group_offset(brokers, group_id, topic): consumer = KafkaConsumer(bootstrap_servers=brokers, group_id=group_id, ) pts = [TopicPartition(topic=topic, partition=i) for i in consumer.partitions_for_topic(topic)] result = consumer._coordinator.fetch_committed_offsets(pts) return sum([r.offset for r in result.values()]) if __name__ == '__main__': topic_offset = get_topic_offset("47.90.133.76:9092,47.90.133.77:9092,47.90.133.78:9092", "test") group_offset = get_group_offset("47.90.133.76:9092,47.90.133.77:9092,47.90.133.78:9092", "consumers", "test") lag = topic_offset - group_offset print(topic_offset) # topic的offset总和 print(group_offset) # topic特定group已消费的offset的总和 print(lag) # 未消费的条数 (base) :~# python getKafkaLag.py 17735 17735 0
代码参考:https://www.jianshu.com/p/e48af92e199d