forrestou 2019-06-27
kafka每个topic有多个partition,单个partition内消息有序。Partition在物理存储上由多个segment组成,每个segment内包含两个文件,index文件和log文件。
物理实体 index文件和log文件
逻辑实体 topic > partition > segment
1.partition存储
在kafka文件存储中,同一个Topic下有多个不同的partition,每个partition为一个目录,partition命名规则为topic名称+有序序号,第一个partition序号从0开始,序号最大值为partition数量减1。
每个partition(目录)相当于一个大型文件被平均分配到大小(可配置log.segment.bytes)相等的segment数据文件中,但每个segment file消息数量不一定相等,取决于消息大小,方便快速删除。
2.segment存储
Segment file由两个部分组成,分别是index file和data file,一一对应,成对出现,后缀为.index和.log,分别对应索引文件和数据文件。Segment的文件命名第一个从0开始,后续每个segment文件名为上一个segment文件最后一条消息的offset值。
.index文件是索引文件,每行数据包括两个值,第几条消息+该消息在log文件的物理偏移量。.log文件存储消息的实际数据,每行由offset+message组成。具体如下图所示:
message参数说明:
关键字 解释说明
8 byte offset 在parition(分区)内的每条消息都有一个有序的id号,这个id号被称为偏移(offset),它可以唯一确定每条消息在parition(分区)内的位置。即offset表示partiion的第多少message
4 byte message size message大小
4 byte CRC32 用crc32校验message
1 byte “magic" 表示本次发布Kafka服务程序协议版本号
1 byte “attributes" 表示为独立版本、或标识压缩类型、或编码类型。
4 byte key length 表示key的长度,当key为-1时,K byte key字段不填
Kbyte key 可选
value bytes payload 表示实际消息数据。
以上图中查找offset=36876的message为例,需要通过以下两个步骤:
第一步查找segment file,其中00000000000000000000.index表示最开始的文件,起始偏移量(offset)为0。第二个文件00000000000000368769.index的消息量起始偏移量为368770 = 368769 + 1。同样,第三个文件00000000000000737337.index的起始偏移量为737338=737337 + 1,其他后续文件依次类推,以起始偏移量命名并排序这些文件,只要根据offset 二分查找文件列表,就可以快速定位到具体文件。当offset=368776时定位到00000000000000368769.index|log
第二步通过segment file查找message,通过第一步定位到segment file,当offset=368776时,依次定位到00000000000000368769.index的元数据物理位置和00000000000000368769.log的物理偏移地址,然后再通过00000000000000368769.log顺序查找直到offset=368776为止。
生产者可与任一broker连接(生产者不会与zookeeper通信),获得topic的partition信息(每个broker都有所有topic信息),找到每个partition的leader所在broker,再与该broker建立连接。发送消息时,通过轮询或者随机选取partition的方式,决定消息被发送到哪一个partition。
kafka的消息发送包括同步和异步两种方式。同步发送可配置acks参数,该参数可配置消息的确认级别。当acks=-1,则要求所有ISR中的replica都确定拿到消息后再返回给生产者成功(leader会先将消息落盘,ISR中的replica拿到后不一定落盘,到内存就算成功);acks=0则直接返回成功(不用leader确认);acks=1,则leader把消息落盘后再返回。异步发送则直接返回发送成功,由后台线程扫描队列长度,达到一定长度或者配置时间再批量发送消息到leader。
a. 创建topic时会往zk注册topic的分区信息
b. 生产者从broker获取topic的所有分区
c. 根据一定的负载均衡算法决定将消息发往哪个分区
d. 最终根据分区所在的leader broker将消息发送到broker
e. 当topic分区变化时,生产者会重新从broker获取新的分区信息
Kafka的消息生产者使用Producer.scala,客户端通过producer.type配置可使用sync和async两种模式。客户端调用Producer.send发送消息。
在同步模式下,首先调用DefaultEventHandler.handle方法,序列化消息,序列化方式是默认的Encoder,可自定义实现(producer配置serializer.class),之后在最大重试次数(默认三次)内尝试发送消息,调用dispatchSerializedData方法,在该方法内选择消息的partition。
如果消息没有key,且是该客户端对应topic下首条消息,则随机选择一个partition,并缓存对应的partition和topic的关系到sendPartitionPerTopicCache,之后该topic下没有key的消息都将发往该分区。sendPartitionPerTopicCache将在对应的配置时间(topic.metadata.refresh.interval.ms,默认为600000)内clear,防止所有消息都发往同一个partition。
如果消息key不为空,则调用默认的分区方法DefaultPartitioner.partition。key hash之后的值再对分区值取模,得到消息对应的分区。可自行实现Partitioner接口,实现自定义的分区策略(producer新增配置partitioner.class)。
消息到达broker后,leader先将该消息落盘。再根据acks参数决定是否返回消息写入成功,如果acks=-1,则需等待ISR中的replica复制消息,全部复制完成后再返回成功,如果等待时间超时,则返回消息发送失败。
如果要严格保证消息不丢失,可给该topic配置两个以上replica,同时生产者的acks设置为-1,每条消息都要求副本确认复制后再返回成功。
消息发送流程图如下:
创建topic——kafka源码探究之一
https://segmentfault.com/a/11...
broker的高可用及高伸缩——kafka源码探究之二
https://segmentfault.com/a/11...