Kafka-消息中间件

guicaizhou 2015-09-14

                        Kafka

       一个高吞吐量的(high-throughput)分布式消息系统

Features

  Fast

    A single Kafka broker can handle hundreds of megabytes(兆) of reads and writes per second   from thousands of clients.

  Scalable(可扩展)

   Kafka is designed to allow a single cluster to serve as the central data backbone for a  large organization.(Kafka 被设计为允许一个单一的集群作为骨干数据中心为一个大的组织服务)。It  can be elastically(弹性地) and transparently(透明地) expanded without downtime. Data  streams are partitioned (分段的)and spread over a cluster of machines to allow data streams  larger than the capability of any single machine and to allow clusters of co-ordinated(协调)  consumers.

 Durable(持久化)

   Messages are persisted on disk and replicated(备份) within the cluster to prevent data    loss. Each broker can handle terabytes(百兆字节) of messages without performance impact.

 Distributed by Design

   Kafka has a modern cluster-centric(cluster-centric ) design that offers strong  durability  and fault-tolerance(故障容错)guarantees.

 以上引自Kafka官网的介绍。

  在工作中有参与开发过一个基于消息中间件的框架,在这个项目中学到的不只是知识上的提升,还有非常多的自由,我可以按照自己的设计思路去完成框架的设计,编码有很大的自由度,优秀的同事和优秀的上司总是让自己的工作充满乐趣。我知道了一个首席架构师的工作状态,感受到他对知识的渴望,对自身能力提升的执着和不安于现状性格,我也知道牛逼的架构师不仅仅是代码写的好技术过硬,也有优秀的沟通技巧,使别人乐于接受他们的意见和指导。

   

背景:

  Kafka是Linkedin于2010年12月份开源的一个分布式的消息发布订阅系统,它主要用于处理活跃的流式数据,大数据量的数据处理上。它的主要目的是

  • 以时间复杂度为O(1)的方式提供消息持久化能力,即使对TB级以上数据也能保证常数时间复杂度的访问性能。
  • 高吞吐率。即使在非常廉价的商用机器上也能做到单机支持每秒100K条以上消息的传输。
  • 支持Kafka Server间的消息分区,及分布式消费,同时保证每个Partition内的消息顺序传输。顺序追加可以大大提高消息的写入速度,因为顺序写磁盘的效率比随机些内存还要快。
  • 同时支持离线数据处理和实时数据处理。
  • Scale out:支持在线水平扩展。

  以上只是Kafka本身的特点,但是我们为什么要在我们的系统中使用消息中间件呢?

1.解耦:在消息处理的两边的处理提供了接口,可以独立的扩展或修改两边的处理过程,确保遵守同样的接口约束。

2.冗余:消息队列将数据持久化直到其被处理,这样就规避了数据的丢失。

3.扩展性:消息队列解耦了我们的处理过程,所以加大消息的入队和出队的频率是很容易的事情。

4.灵活性&峰值处理能力:在访问量剧增的情况下,应用仍然需要抗住压力,但是为了这种比较少的情况去投入很多的计算资源是得不偿失的。使用消息队列可以是关键组件顶住压力而不至于超负荷的请求完全崩溃,我们的系统仍然可以向正常运行。

5.可恢复:当系统中的一部分组建挂掉之后,重启后仍然可以处理保存在消息队列中的任务。

6.顺序保证:大部分的消息是有顺序的,并且可以保证消息的按顺序处理。Kafka保证一个Partition中的数据是顺序的。

7.异步:其实异步处理也是我们选择消息中间件作为我们核心组件的重要原因,通过消息中间件将请求保存起来,到想处理的时候就可以进行处理。

Kafka的架构:

 
Kafka-消息中间件
Broker

     Kafka集群包含一个或多个服务器,这种服务器被称为broker

Topic

     每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处)

 Partition

      Parition是物理上的概念,每个Topic包含一个或多个Partition.

 Producer

      负责发布消息到指定的Topic,同时Producer也能决定将此消息发送到哪个partition;如果一个Topic有多个partitions时,你需要选择partition是算法,比如基于"round-robin"方式或者通过其他的一些算法等.无论如何选择partition路由算法,我们最直接的目的就是希望消息能够均匀的发送给每个partition,这样可以让consumer消费的消息量也能"均衡".

 Consumer

      消息消费者,向指定的Topic读取消息的客户端,其实上Consumer是直接和其对应的若干个Parition对应,在实际的配置中,Consumer Group中的consumer的数量不能大于其对应Topic中partition的数量。

Consumer Group

       每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)。一个partition中的消息只会被group中的一个consumer消费(同一时刻);每个group中consumer消息消费互相独立;我们可以认为一个group是一个"订阅"者,一个Topic中的每个partions,只会被一个"订阅者"中的一个consumer消费,不过一个consumer可以同时消费多个partitions中的消息.kafka只能保证一个partition中的消息被某个consumer消费时是顺序的.事实上,从Topic角度来说,当有多个partitions时,消息仍不是全局有序的.

    通常情况下,一个group中会包含多个consumer,这样不仅可以提高topic中消息的并发消费能力,而且还能提高"故障容错"性,如果group中的某个consumer失效,那么其消费的partitions将会有其他consumer自动接管.

从上图kafka的拓扑结构图我们可以看到Kafka通过Zookeeper管理其群的配置,其中

1,Zookeeper管理broker与consumer的动态加入与离开,进行负载均衡,使得一个consumer group内的多个consumer的订阅负载均衡。

2,Zookeeper维护消费关系以及每个partition的消费关系。

具体的细节如下:

1,每个broker启动后会在zookeeper上注册一个临时的broker registry,包含broker的ip地址和端口号,所存储的topics和partitions信息。

 2,每个consumer启动后会在zookeeper上注册一个临时的consumer registry:包含consumer所属的consumer group以及订阅的topics。

 3,zookeeper也会接受来自consumer向其注册的offset.

 Topics/logs

        一个Topic可以认为是一类消息,每个topic将被分成多个partition(区),每个partition在存储层面是append log文件.任何发布到此partition的消息都会直接追加到log文件的尾部,每条消息在文件中的位置称为offset(偏移量),offset为一个long型数字,它唯一的标记一条消息.kafka并没有提供其他额外的索引机制来存储offset,因为在kafka中几乎不允许对消息进行"随机读-写",一旦消息写入log日志之后,将不能被修改.

       
Kafka-消息中间件
 

 Kafka和JMS实现(ActiveMQ)不同的是:即使消息被消费,消息仍然不会被立即删除.日志文件将会根据broker中的配置要求,保留一定的时间之后删除;比如log文件保留2天,那么两天后,文件会被清除,无论其中的消息是否被消费.kafka通过这种简单的手段,来释放磁盘空间.此外,kafka的性能并不会因为日志文件的太多而低下,所以即使保留较多的log文件,也不不会有问题.

         对于consumer而言,它需要保存消费消息的offset,对于offset的保存和使用,有consumer来控制;当consumer正常消费消息时,offset将会"线性"的向前驱动,即消息将依次顺序被消费.事实上consumer可以使用任意顺序消费消息,它只需要将offset重置为任意值..(offset将会保存在zookeeper中)。

         partitions的设计目的有多个.最根本原因是kafka基于文件存储.通过分区,可以将日志内容分散到多个server上,来避免文件尺寸达到单机磁盘的上限,每个partiton都会被当前server(kafka实例)保存;可以将一个topic切分多任意多个partitions(备注:基于sharding),来消息保存/消费的效率.此外越多的partitions意味着可以容纳更多的consumer,有效提升并发消费的能力。

 Distribution

    一个Topic的多个partitions,被分布在kafka集群中的多个server上;每个server(kafka实例)负责partitions中消息的读写操作;此外kafka还可以配置每个partition需要备份的个数(replicas),每个partition将会被备份到多台机器上,以提高可用性.[replicas特性在0.8V才支持]

    基于replicated方案,那么就意味着需要对多个备份进行调度;一个partition可以在多个server上备份,那么其中一个server作为此partiton的leader;leader负责此partition所有的读写操作,如果leader失效,那么将会有其他follower来接管(成为新的leader);follower只是单调的和leader跟进,同步消息即可..由此可见作为leader的server承载了全部的请求压力,因此从集群的整体考虑,有多少个partitions就意味着有多少个"leader",kafka会将"leader"均衡的分散在每个实例上,来确保整体的性能稳定.[备注:kafka中将leader角色权限下放到partition这个层级]

 
Kafka-消息中间件
 

    Messaging

    和一些常规的消息系统相比,kafka仍然是个不错的选择;它具备partitons/replication和容错,可以使kafka具有良好的扩展性和性能优势.不过到目前为止,我们应该很清楚认识到,kafka并没有提供JMS中的"事务性""消息传输担保(消息确认机制)""消息分组"等企业级特性;kafka只能使用作为"常规"的消息系统,在一定程度上,尚未确保消息的发送与接收绝对可靠(比如,消息重发,消息发送丢失等)

    Log Aggregation

    kafka的特性决定它非常适合作为"日志收集中心";application可以将操作日志"批量""异步"的发送到kafka集群中,而不是保存在本地或者DB中;kafka可以批量提交消息/压缩消息等,这对producer端而言,几乎感觉不到性能的开支.此时consumer端可以使hadoop等其他系统化的存储和分析系统.

二. 设计原理

    kafka的设计初衷是希望做为一个统一的信息收集平台,能够实时的收集反馈信息,并需要能够支撑较大的数据量,且具备良好的容错能力.

    1.Persistence

    kafka使用文件存储消息(append only log),这就直接决定kafka在性能上严重依赖文件系统的本身特性.且无论任何OS下,对文件系统本身的优化是非常艰难的.文件缓存/直接内存映射等是常用的手段.因为kafka是对日志文件进行append操作,因此磁盘检索的开支是较小的;同时为了减少磁盘写入的次数,broker会将消息暂时buffer起来,当消息的个数(或尺寸)达到一定阀值时,再flush到磁盘,这样减少了磁盘IO调用的次数.对于kafka而言,较高性能的磁盘,将会带来更加直接的性能提升.

    2.Efficiency

    需要考虑的影响性能点很多,除磁盘IO之外,我们还需要考虑网络IO,这直接关系到kafka的吞吐量问题.kafka并没有提供太多高超的技巧;对于producer端,可以将消息buffer起来,当消息的条数达到一定阀值时,批量发送给broker;对于consumer端也是一样,批量fetch多条消息.不过消息量的大小可以通过配置文件来指定.对于kafka broker端,似乎有个sendfile系统调用可以潜在的提升网络IO的性能:将文件的数据映射到系统内存中,socket直接读取相应的内存区域即可,而无需进程再次copy和交换(这里涉及到"磁盘IO数据"/"内核内存"/"进程内存"/"网络缓冲区",多者之间的数据copy).

    其实对于producer/consumer/broker三者而言,CPU的开支应该都不大,因此启用消息压缩机制是一个良好的策略;压缩需要消耗少量的CPU资源,不过对于kafka而言,网络IO更应该需要考虑.可以将任何在网络上传输的消息都经过压缩.kafka支持gzip/snappy等多种压缩方式.

    3. Producer

    Load balancing

    kafka集群中的任何一个broker,都可以向producer提供metadata信息,这些metadata中包含"集群中存活的servers列表"/"partitions leader列表"等信息(请参看zookeeper中的节点信息). 当producer获取到metadata信心之后, producer将会和Topic下所有partition leader保持socket连接;消息由producer直接通过socket发送到broker,中间不会经过任何"路由层".事实上,消息被路由到哪个partition上,有producer客户端决定.比如可以采用"random""key-hash""轮询"等,如果一个topic中有多个partitions,那么在producer端实现"消息均衡分发"是必要的.在producer端的配置文件中,开发者可以指定partition路由的方式.

    Asynchronous send

    将多条消息暂且在客户端buffer起来,并将他们批量发送到broker;小数据IO太多,会拖慢整体的网络延迟,批量延迟发送事实上提升了网络效率;不过这也有一定的隐患,比如当producer失效时,那些尚未发送的消息将会丢失.

    4.Consumer

    consumer端向broker发送"fetch"请求,并告知其获取消息的offset;此后consumer将会获得一定条数的消息;consumer端也可以重置offset来重新消费消息.[备注:offset,消息偏移量,integer值,broker可以根据offset来决定消息的起始位置]

    在JMS实现中,Topic模型基于push方式,即broker将消息推送给consumer端.不过在kafka中,采用了pull方式,即consumer在和broker建立连接之后,主动去pull(或者说fetch)消息;这中模式有些优点,首先consumer端可以根据自己的消费能力适时的去fetch消息并处理,且可以控制消息消费的进度(offset);此外,消费者可以良好的控制消息消费的数量,batch fetch.

    其他JMS实现,消息消费的位置是有prodiver保留,以便避免重复发送消息或者将没有消费成功的消息重发等,同时还要控制消息的状态.这就要求JMS broker需要太多额外的工作.在kafka中,partition中的消息只有一个consumer在消费,且不存在消息状态的控制,也没有复杂的消息确认机制,可见kafka broker端是相当轻量级的.当消息被consumer接收之后,consumer可以在本地保存最后消息的offset,并间歇性的向zookeeper注册offset.由此可见,consumer客户端也很轻量级.

    这就意味着,kafka中consumer负责维护消息的消费记录,而broker则不关心这些,这种设计不仅提高了consumer端的灵活性,也适度的减轻了broker端设计的复杂度;这是和众多JMS prodiver的区别.此外,kafka中消息ACK的设计也和JMS有很大不同,kafka中的消息时批量(通常以消息的条数或者chunk的尺寸为单位)发送给consumer,当消息消费成功后,向zookeeper提交消息的offset,而不会向broker交付ACK.或许你已经意识到,这种"宽松"的设计,将会有"丢失"消息/"消息重发"的危险.

5.Message Delivery Semantics

    对于JMS实现,消息传输担保非常直接:有且只有一次(exactly once).在kafka中稍有不同,对于consumer而言:

    1) at most once: 最多一次,这个和JMS中"非持久化"消息类似.发送一次,无论成败,将不会重发.

    2) at least once: 消息至少发送一次,如果消息未能接受成功,可能会重发,直到接收成功.

    3) exactly once: 消息只会发送一次.

    at most once: 消费者fetch消息,然后保存offset,然后处理消息;当client保存offset之后,但是在消息处理过程中consumer进程失效(crash),导致部分消息未能继续处理.那么此后可能其他consumer会接管,但是因为offset已经提前保存,那么新的consumer将不能fetch到offset之前的消息(尽管它们尚没有被处理),这就是"at most once".

    at least once: 消费者fetch消息,然后处理消息,然后保存offset.如果消息处理成功之后,但是在保存offset阶段zookeeper异常或者consumer失效,导致保存offset操作未能执行成功,这就导致接下来再次fetch时可能获得上次已经处理过的消息,这就是"at least once".

    exactly once: kafka中并没有严格的去实现(基于2阶段提交,事务),我们认为这种策略在kafka中是没有必要的.

    因为"消息消费"和"保存offset"这两个操作的先后时机不同,导致了上述3种情况,通常情况下"at-least-once"是我们搜选.(相比at most once而言,重复接收数据总比丢失数据要好).

 
Kafka-消息中间件
 

 6. Replication

    kafka中,replication策略是基于partition,而不是topic;kafka将每个partition数据复制到多个server上,任何一个partition有一个leader和多个follower(可以没有);备份的个数可以通过broker配置文件来设定.leader处理所有的read-write请求,follower需要和leader保持同步.Follower就像一个"consumer",消费消息并保存在本地日志中;leader负责跟踪所有的follower状态,如果follower"落后"太多或者失效,leader将会把它从replicas同步列表中删除.当所有的follower都将一条消息保存成功,此消息才被认为是"committed",那么此时consumer才能消费它,这种同步策略,就要求follower和leader之间必须具有良好的网络环境.即使只有一个replicas实例存活,仍然可以保证消息的正常发送和接收,只要zookeeper集群存活即可.(备注:不同于其他分布式存储,比如hbase需要"多数派"存活才行)

    kafka判定一个follower存活与否的条件有2个:1) follower需要和zookeeper保持良好的链接 2) 它必须能够及时的跟进leader,不能落后太多.如果同时满足上述2个条件,那么leader就认为此follower是"活跃的".如果一个follower失效(server失效)或者落后太多,leader将会把它从同步列表中移除[备注:如果此replicas落后太多,它将会继续从leader中fetch数据,直到足够up-to-date,然后再次加入到同步列表中;kafka不会更换replicas宿主!因为"同步列表"中replicas需要足够快,这样才能保证producer发布消息时接受到ACK的延迟较小].

    当leader失效时,需在followers中选取出新的leader,可能此时follower落后于leader,因此需要选择一个"up-to-date"的follower.kafka中leader选举并没有采用"投票多数派"的算法,因为这种算法对于"网络稳定性"/"投票参与者数量"等条件有较高的要求,而且kafka集群的设计,还需要容忍N-1个replicas失效.对于kafka而言,每个partition中所有的replicas信息都可以在zookeeper中获得,那么选举leader将是一件非常简单的事情.选择follower时需要兼顾一个问题,就是新leader server上所已经承载的partition leader的个数,如果一个server上有过多的partition leader,意味着此server将承受着更多的IO压力.在选举新leader,需要考虑到"负载均衡",partition leader较少的broker将会更有可能成为新的leader.

    在整几个集群中,只要有一个replicas存活,那么此partition都可以继续接受读写操作.

部分转自http://blog.csdn.net/xiaolang85/article/details/37821209

相关推荐