易伸缩、易扩展!我的Flume 使用学习小结

发孖 2016-06-18

概 述

在做埋点数据离线存储到odps中,用到了Flume。一边使用,一边学习了下Flume。其中感受到Flume确实易伸缩、易扩展。其中的组件都可以根据自己的业务特点方便的自定义使用。

Flume可进行大量日志数据采集、聚合和并转移到存储中,并提供数据在流转中的事务机制;

可适用场景:日志--->Flume--->实时计算(如MQ+Storm) 、日志--->Flume--->离线计算(如ODPS、HDFS、HBase)、日志--->Flume--->ElasticSearch等。

Flume架构

Flume主要分为 Source、Channel、Sink三个组件,他们包含在一个Agent中,一个Agent相当于一个独立的application。数据从源头经过Agent的这几个组件最后到达目的地。一个Flume 服务可同时运行多个Agent,大致架构可参照下图:

易伸缩、易扩展!我的Flume 使用学习小结

对照这个图,作一些说明:

Event 一条日志数据在Flume中对应一个Event对象。不过给他添加了header属性,就是一个Map,放 一些额外信息,可以针对每条Event做特殊处理,比如Channel的选择。这些额外的键值对可以在Event从Source到Channel之间的interceptor(拦截器)中set。

Source 负责日志流入,比如从文件、网络、MQ等数据源流入数据。

Channel 负责数据聚合/暂存,以供Sink消费掉,事务机制主要在这里实现

Sink 负责数据转移到存储,比如从Channel拿到日志后直接存储到ODPS、ElasticSearch等。

拦截器 如果配置了拦截器,则Event从Source 进入Channel前,经过拦截器链做过滤或其他处理;如识别不需要的数据等。

选择器 Flume默认实现有ReplicatingChannelSelector(复制,Event可同时发往多个Channel)和MultiplexingChannelSelector(复用,可根据header中某个字段值,发往不同的Channel)。

下图是个简单的多Channel、Sink情况;Flume还包含一些其他的高级的特性和使用方法,有时间可以继续研究。

易伸缩、易扩展!我的Flume 使用学习小结

Flume实际使用

现在做的埋点数据导入ODPS的情况是,每天夜里1点左右把前一天的日志文件copy到Flume监控的目录,Flume处理新加入的文件。最终数据存储到ODPS。

遇到的问题:

日志数据中包含空行等不正确格式的记录,导致从Channel中take日志记录后保存到ODPS失败;失败的操作被事务回滚,结果是数据流传在这个地方错误循环下去。

日志数据按实际生成的日期为分区保存在OPDS表的分区中,导入日志数据的日期为实际日期的后一天。

在尝试了几种方法后,最后选择自定义了一个拦截器实现

(UaLogFilteringInterceptor),能很好的达到目前的需求,他主要做如下两件事:

过滤掉不需要、不规范的数据,并且把过滤掉的这些数据存储到指定的文件里,每天一个文件(如果有异常记录)。

在每条Event的header中加入qt值(qt值为前一天的日期,格式为yyyyMMdd),每条Event根据该值保持到ODPS的对应表分区中。

这里顺带说下ODPS的Flume插件,他主要根据ODPS的特点自定义实现了一个Sink,在Flume的配置文件中配置使用该Sink,配置好该Sink的各个配置项,主要包含连接ODPS和使用对应表的。

说说Flume的事务

主要用到事务实现有针对MemoryChannel的MemoryTransaction和FileChannel的FileBackedTransaction;

Event从Source PUT到Channel和从Channel Take到Sink后落地,这两个步骤都包裹在事务中;我这里说下MemoryTransaction大致实现。

MemoryTransaction 主要用到了两个双向阻塞队列(LinkedBlockingDeque)putList和takeList作为缓冲区,同时配合使用MemoryChannel中的LinkedBlockingDeque queue;队列的大小通过Flume的配置初始化好;

  • PUT事务

  1. 批量数据循环PUT到putList中

  2. Commit,把putList队列中数据offer到queue队列中,然后释放信号量,清空(clear)putList队列

  3. Rollback,清空(clear)putList队列 这里其实没有做太多事。

Take事务

  1. 检查takeList队列大小是否够用,从queue队列中poll

  2. Event到takeList队列中

  3. Commit,表明被Sink正确消费掉,清空(clear)takeList队列

  4. Rollback,异常出现,则把takeList队列中的Event返还到queue队列顶部

更多深度技术内容,请关注云栖社区微信公众号:yunqiinsight。

相关推荐

wsong / 0评论 2020-04-15