wsong 2020-02-17
Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。
Flume最早是Cloudera提供的日志收集系统,目前是Apache下的一个孵化项目,Flume支持在日志系统中定制各类数据发送方,用于收集数据。
恒生数据接收中间件---file.txt 哪个端口进行监控 --- 数据监控—接收数据----内存—存储本地硬盘
Flume—对哪个ip 哪个端口进行监控 --- 数据监控—接收数据----内存—存储本地硬盘
Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。 Flume提供了从Console(控制台)、RPC(Thrift-RPC)、Text(文件)、Tail(UNIX tail)、Syslog(Syslog日志系统,支持TCP和UDP等2种模式),exec(命令执行)等数据源上收集数据的能力。
Flume OG架构
Flume逻辑上分三层架构:Agent,Collector,Storage。
Flume OG采用了多Master的方式。为了保证配置数据的一致性,Flume引入了ZooKeeper,用于保存配置数据,ZooKeeper本身可保证配置数据的一致性和高可用,另外,在配置数据发生变化时,ZooKeeper可以通知Flume Master节点。Flume Master间使用gossip协议同步数据。
FLUM OG 的特点是:
Flume NG架构
Flume NG最明显的改动就是取消了集中管理配置的 Master 和 Zookeeper,变为一个纯粹的传输工具。Flume NG另一个主要的不同点是读入数据和写出数据现在由不同的工作线程处理(称为Runner)。在 Flume NG 中,读入线程同样做写出工作(除了故障重试)。如果写出慢的话(不是完全失败),它将阻塞 Flume 接收数据的能力。这种异步的设计使读入线程可以顺畅的工作而无需关注下游的任何问题。
FLUME NG 的特点是:
Flume以Agent为最小的独立运行单位。Agent是Flume中产生数据流的地方,一个Agent就是一个JVM。单Agent由Source、Sink和Channel三大组件构成,如下图:
对现有程序改动最小的使用方式是使用是直接读取程序原来记录的日志文件,基本可以实现无缝接入,不需要对现有程序进行任何改动。
² Source
flume有许多类型的Source,见官网用户手册:
http://flume.apache.org/FlumeUserGuide.html#flume-sources
简单的归纳如下:
Source类型 | 说明 |
Avro Source | 支持Avro协议(实际上是Avro RPC),提供一个Avro的接口,需要往设置的地址和端口发送Avro消息,Source就能接收到,如:Log4j Appender通过Avro Source将消息发送到Agent |
Thrift Source | 支持Thrift协议,提供一个Thrift接口,类似Avro |
Exec Source | Source启动的时候会运行一个设置的UNIX命令(比如 cat file),该命令会不断地往标准输出(stdout)输出数据,这些数据就会被打包成Event,进行处理 |
JMS Source | 从JMS系统(消息、主题)中读取数据,类似ActiveMQ |
Spooling Directory Source | 监听某个目录,该目录有新文件出现时,把文件的内容打包成Event,进行处理 |
Netcat Source | 监控某个端口,将流经端口的每一个文本行数据作为Event输入 |
Sequence Generator Source | 序列生成器数据源,生产序列数据 |
Syslog Sources | 读取syslog数据,产生Event,支持UDP和TCP两种协议 |
HTTP Source | 基于HTTP POST或GET方式的数据源,支持JSON、BLOB表示形式 |
Legacy Sources | 兼容老的Flume OG中Source(0.9.x版本) |
自定义Source | 使用者通过实现Flume提供的接口来定制满足需求的Source。 |
对于直接读取文件Source, 主要有两种方式:
ü Exec source
可通过写Unix command的方式组织数据,最常用的就是tail -F [file]。
可以实现实时传输,但在flume不运行和脚本错误时,会丢数据,也不支持断点续传功能。因为没有记录上次文件读到的位置,从而没办法知道,下次再读时,从什么地方开始读。特别是在日志文件一直在增加的时候。flume的source挂了。等flume的source再次开启的这段时间内,增加的日志内容,就没办法被source读取到了。不过flume有一个execStream的扩展,可以自己写一个监控日志增加情况,把增加的日志,通过自己写的工具把增加的内容,传送给flume的node。再传送给sink的node。要是能在tail类的source中能支持,在node挂掉这段时间的内容,等下次node开启后在继续传送,那就更完美了。
ü Spooling Directory Source
SpoolSource:是监测配置的目录下新增的文件,并将文件中的数据读取出来,可实现准实时。需要注意两点:
1、拷贝到spool目录下的文件不可以再打开编辑。
2、spool目录下不可包含相应的子目录。在实际使用的过程中,可以结合log4j使用,使用log4j的时候,将log4j的文件分割机制设为1分钟一次,将文件拷贝到spool的监控目录。log4j有一个TimeRolling的插件,可以把log4j分割的文件到spool目录。基本实现了实时的监控。Flume在传完文件之后,将会修改文件的后缀,变为.COMPLETED(后缀也可以在配置文件中灵活指定)
注:ExecSource,SpoolSource对比
ExecSource可以实现对日志的实时收集,但是存在Flume不运行或者指令执行出错时,将无法收集到日志数据,无法何证日志数据的完整性。SpoolSource虽然无法实现实时的收集数据,但是可以使用以分钟的方式分割文件,趋近于实时。如果应用无法实现以分钟切割日志文件的话,可以两种收集方式结合使用。
² Channel
当前有几个 Channel 可供选择,分别是 Memory Channel, JDBC Channel , File Channel,Psuedo Transaction Channel。比较常见的是前三种 Channel。
v Memory Channel 可以实现高速的吞吐,但是无法保证数据的完整性。
v Memory Recover Channel 在官方文档的建议上已经建义使用File Channel来替换。
v File Channel保证数据的完整性与一致性。在具体配置File Channel时,建议File Channel设置的目录和程序日志文件保存的目录设成不同的磁盘,以便提高效率。
File Channel 是一个持久化的隧道(Channel),它持久化所有的事件,并将其存储到磁盘中。因此,即使 Java 虚拟机当掉,或者操作系统崩溃或重启,再或者事件没有在管道中成功地传递到下一个代理(agent),这一切都不会造成数据丢失。Memory Channel 是一个不稳定的隧道,其原因是由于它在内存中存储所有事件。如果 Java 进程死掉,任何存储在内存的事件将会丢失。另外,内存的空间收到 RAM大小的限制,而 File Channel 这方面是它的优势,只要磁盘空间足够,它就可以将所有事件数据存储到磁盘上。
Flume Channel 支持的类型:
Channel类型 | 说明 |
Memory Channel | Event数据存储在内存中 |
JDBC Channel | Event数据存储在持久化存储中,当前Flume Channel内置支持Derby |
File Channel | Event数据存储在磁盘文件中 |
Spillable Memory Channel | Event数据存储在内存中和磁盘上,当内存队列满了,会持久化到磁盘文件(当前试验性的,不建议生产环境使用) |
Pseudo Transaction Channel | 测试用途 |
Custom Channel | 自定义Channel实现 |
² Sink
Sink在设置存储数据时,可以向文件系统中,数据库中,Hadoop中储数据,在日志数据较少时,可以将数据存储在文件系统中,并且设定一定的时间间隔保存数据。在日志数据较多时,可以将相应的日志数据存储到Hadoop中,便于日后进行相应的数据分析。
Flume Sink支持的类型
Sink类型 | 说明 |
HDFS Sink | 数据写入HDFS |
Logger Sink | 数据写入日志文件 |
Avro Sink | 数据被转换成Avro Event,然后发送到配置的RPC端口上 |
Thrift Sink | 数据被转换成Thrift Event,然后发送到配置的RPC端口上 |
IRC Sink | 数据在IRC上进行回放 |
File Roll Sink | 存储数据到本地文件系统 |
Null Sink | 丢弃到所有数据 |
HBase Sink | 数据写入HBase数据库 |
Morphline Solr Sink | 数据发送到Solr搜索服务器(集群) |
ElasticSearch Sink | 数据发送到Elastic Search搜索服务器(集群) |
Kite Dataset Sink | 写数据到Kite Dataset,试验性质的 |
Custom Sink | 自定义Sink实现 |
Flume提供了大量内置的Source、Channel和Sink类型。不同类型的Source,Channel和Sink可以自由组合。组合方式基于用户设置的配置文件,非常灵活。比如:Channel可以把事件暂存在内存里,也可以持久化到本地硬盘上。Sink可以把日志写入HDFS, HBase,甚至是另外一个Source等等。Flume支持用户建立多级流,也就是说,多个Agent可以协同工作,并且支持Fan-in、Fan-out、Contextual Routing、Backup Routes。如下图所示:
Flume NG中已经没有Collector的概念了,Collector的作用是将多个Agent的数据汇总后,加载到Storage中。
Storage是存储系统,可以是一个普通File,也可以是HDFS,HIVE,HBase等。
针对于OG版本。
Master是管理协调Agent和Collector的配置等信息,是Flume集群的控制器。
在Flume中,最重要的抽象是data flow(数据流),data flow描述了数据从产生,传输、处理并最终写入目标的一条路径。
对于Agent数据流配置就是从哪得到数据,把数据发送到哪个Collector。
对于Collector是接收Agent发过来的数据,把数据发送到指定的目标机器上。
(1) 可靠性
当节点出现故障时,日志能够被传送到其他节点上而不会丢失。Flume提供了三种级别的可靠性保障,从强到弱依次分别为:
(2) 可扩展性
Flume采用了三层架构,分别为agent,collector和storage,每一层均可以水平扩展。其中,所有agent和collector由master统一管理,这使得系统容易监控和维护,且master允许有多个(使用ZooKeeper进行管理和负载均衡),这就避免了单点故障问题。
(3) 可管理性
所有agent和colletor由master统一管理,这使得系统便于维护。多master情况,Flume利用ZooKeeper和gossip,保证动态配置数据的一致性。用户可以在master上查看各个数据源或者数据流执行情况,且可以对各个数据源配置和动态加载。Flume提供了web 和shell script command两种形式对数据流进行管理。
(4) 功能可扩展性
用户可以根据需要添加自己的agent,collector或者storage。此外,Flume自带了很多组件,包括各种agent(file, syslog等),collector和storage(file,HDFS等)。
小结:
Flume是一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统。支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(比如文本、HDFS、Hbase等)的能力。
Flume的数据流由事件(Event)贯穿始终。事件是Flume的基本数据单位,它携带日志数据(字节数组形式)并且携带有头信息,这些Event由Agent外部的Source生成,当Source捕获事件后会进行特定的格式化,然后Source会把事件推入(单个或多个)Channel中。你可以把Channel看作是一个缓冲区,它将保存事件直到Sink处理完该事件。Sink负责持久化日志或者把事件推向另一个Source。
当节点出现故障时,日志能够被传送到其他节点上而不会丢失。Flume提供了三种级别的可靠性保障,从强到弱依次分别为:
http://www.apache.org/dyn/closer.cgi/flume/1.6.0/apache-flume-1.6.0-bin.tar.gz
操作系统版本:CentOS 6.5
Hadoop版本:2.7.1
JDK版本:1.7.0_45 (设置环境变量)
安装Flume版本:apache-flume-1.6.0-bin
下载Flume最新版本,现在服务器上安装的是apache-flume-1.6.0-bin.tar.gz的版本,下载地址是http://www.apache.org/dyn/closer.cgi/flume/1.6.0/apache-flume-1.6.0-bin.tar.gz。
将Flume的安装包放到Linux系统中的/home/bigdata目录下。
# > tar -xvf apache-flume-1.6.0-bin.tar.gz
# > mv apache-flume-1.6.0-bin flume
export FLUME_HOME=/home/bigdata/flume
export PATH=$PATH:$FLUME_HOME/bin:
# > vim /etc/profile
# > source /etc/profile
安装完毕后,在控制台运行如下命令
# > flume-ng version
会看到以下输出:
修改 flume-env.sh 配置文件,主要是JAVA_HOME变量设置
Flume的配置文件位置:$FLUME_HOME/conf
# > /home/bigdata/flume/conf
# > cp flume-env.sh.template flume-env.sh
# > vim flume-env.sh
1) Avro
Avro可以发送一个给定的文件给Flume,Avro 源使用AVRO RPC机制。
# > vi /home/bigdata/flume/conf/avro.conf
添加以下内容:
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe configure the source
a1.sources.r1.type = avro
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 4141
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
对以上内容解释:
指定名称:a1是我们要启动的Agent名字
a1.sources = r1 命名Agent的sources为r1
a1.sinks = k1 命名Agent的sinks为k1
a1.channels = c1 命名Agent的channels 为c1
# Describe configure the source
a1.sources.r1.type = avro 指定r1的类型为AVRO
a1.sources.r1.bind = 0.0.0.0 将Source与IP地址绑定(这里指本机)
a1.sources.r1.port = 4141 指定通讯端口为4141
# Describe the sink
a1.sinks.k1.type = logger 指定k1的类型为Logger(不产生实体文件,只在控制台显示)
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
指定Channel的类型为Memory
设置Channel的最大存储event数量为1000
每次最大可以source中拿到或者送到sink中的event数量也是100
这里还可以设置Channel的其他属性:
a1.channels.c1.keep-alive=1000 event添加到通道中或者移出的允许时间(秒)
a1.channels.c1.byteCapacity = 800000 event的字节量的限制,只包括eventbody
a1.channels.c1.byteCapacityBufferPercentage = 20
event的缓存比例为20%(800000的20%),即event的最大字节量为800000*120%
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
将source、sink分别与Channel c1绑定
# > flume-ng agent -c . -f /home/bigdata/flume/conf/avro.conf -n a1 -Dflume.root.logger=INFO,console
-c:使用配置文件所在目录(这里指默认路径,即$FLUME_HOME/conf)
-f:flume定义组件的配置文件
-n:启动Agent的名称,该名称在组件配置文件中定义
-Dflume.root.logger:flume自身运行状态的日志,按需配置,详细信息,控制台打印
# > echo "hello world" > /home/data/log.00
# > flume-ng avro-client -c . -H hadoop01 -p 4141 -F /home/data/log.00
-H:指定主机
-p:指定端口
-F:制定要发送的文件
注:Flume框架对Hadoop和zookeeper的依赖只是在jar包上,并不要求flume启动时必须将Hadoop和zookeeper服务也启动。
2) Exec
# > vi /home/bigdata/flume/conf/exec_tail.conf
添加以下内容:
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/data/log_exec_tail
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
# > flume-ng agent -c . -f /home/bigdata/flume/conf/exec_tail.conf -n a1 -Dflume.root.logger=INFO,console
# > echo "exec tail 1" >> /home/data/log_exec_tail
# > echo "exec tail 2" >> /hadoop/flume/log_exec_tail
# for i in {1..100}
> do echo "flume +" $i >> /home/data/log_exec_tail
> done
3) Spool
Spool监测配置的目录下新增的文件,并将文件中的数据读取出来。需要注意两点:
² 拷贝到spool目录下的文件不可以再打开编辑。
² spool目录下不可包含相应的子目录
# > vi /home/bigdata/flume/conf/spool.conf
添加以下内容:
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe configure the source
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /home/data/logs
a1.sources.r1.fileHeader = true
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
创建/home/data/logs文件夹
# > mkdir / home/data /logs
# > flume-ng agent -c . -f /home/bigdata/flume/conf/spool.conf -n a1 -Dflume.root.logger=INFO,console
# > echo "spool test1" > /home/data/logs/spool_text.log
Spool2—自定义后缀
#a1.sources.r1.fileHeaderKey = QQ.com
a1.sources.r1.fileSuffix = .QQ.com
4)Syslogtcp
Syslogtcp监听TCP的端口做为数据源
# > vi /home/bigdata/flume/conf/syslog_tcp.conf
添加以下内容:
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = syslogtcp
a1.sources.r1.port = 5140
a1.sources.r1.host = localhost
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
# > flume-ng agent -c . -f /home/bigdata/flume/conf/syslog_tcp.conf -n a1 -Dflume.root.logger=INFO,console
Rpm –ivh nc-1.84-22.el6.x86_64
# > echo "hello idoall.org syslog" | nc localhost 5140
5)JSONHandler
# > vi /home/bigdata/flume/conf/post_json.conf
添加如下内容:
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = org.apache.flume.source.http.HTTPSource
a1.sources.r1.port = 8888
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
# > flume-ng agent -c . -f /home/bigdata/flume/conf/post_json.conf -n a1 -Dflume.root.logger=INFO,console
# > curl -X POST -d ‘[{ "headers" :{"a" : "a1","b" : "b1"},"body" : "idoall.org_body"}]‘ http://localhost:8888
6)HDFS sink
# > vi /home/bigdata/flume/conf/hdfs_sink.conf
添加以下内容:
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = syslogtcp
a1.sources.r1.port = 5140
a1.sources.r1.host = localhost
# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs:// zookeepertest01:8020/user/flume/syslogtcp
a1.sinks.k1.hdfs.filePrefix = Syslog
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 1
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.fileType=DataStream
a1.sinks.k1.hdfs.writeFormat=Text
a1.sinks.k1.hdfs.rollInterval=0
a1.sinks.k1.hdfs.rollSize=10240
a1.sinks.k1.hdfs.rollCount=0
a1.sinks.k1.hdfs.idleTimeout=60
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
补充:
# > flume-ng agent -c . -f /home/bigdata/flume/conf/hdfs_sink.conf -n a1 -Dflume.root.logger=INFO,console
# > echo "hello idoall flume -> hadoop testing one" | nc localhost 5140
# > hadoop fs -ls /user/flume/syslogtcp
# > hadoop fs -cat /user/flume/syslogtcp/Syslog.1407644509504
#for i in {1..30}; do echo “Flume +”$i |nc localhost 5140;done
7)hdfs sink 按照日期创建
vi conf/hdfsDate.conf
#定义agent名, source、channel、sink的名称
a5.sources = source1
a5.channels = channel1
a5.sinks = sink1
#配置source
a5.sources.source1.type = spooldir
a5.sources.source1.spoolDir = /home/data/beicai
a5.sources.source1.channels = channel1
a5.sources.source1.fileHeader = false
a5.sources.source1.interceptors = i1
a5.sources.source1.interceptors.i1.type = timestamp
#配置sink
a5.sinks.sink1.type = hdfs
a5.sinks.sink1.hdfs.path = hdfs://192.168.10.11:9000/usr/beicai
a5.sinks.sink1.hdfs.fileType = DataStream
a5.sinks.sink1.hdfs.writeFormat = TEXT
a5.sinks.sink1.hdfs.rollInterval = 1
a5.sinks.sink1.channel = channel1
a5.sinks.sink1.hdfs.filePrefix = %Y-%m-%d
#配置channel
a5.channels.channel1.type = memory
#flume-ng agent -n a5 -c conf -f conf/hdfsDate.conf -Dflume.root.logger=DEBUG,console
8)File Roll Sink
# > vi /home/bigdata/flume/conf/file_roll.conf
添加以下内容:
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = syslogtcp
a1.sources.r1.port = 5555
a1.sources.r1.host = localhost
# Describe the sink
a1.sinks.k1.type = file_roll
a1.sinks.k1.sink.directory = /home/data/logs2
a1.sinks.k1.sink.serializer = TEXT
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
# > flume-ng agent -c . -f /home/bigdata/flume/conf/file_roll.conf -n a1 -Dflume.root.logger=INFO,console
# > echo "hello idoall.org syslog" | nc localhost 5555
# > echo "hello idoall.org syslog 2" | nc localhost 5555
# > ll /home/data/logs2
9)channels通道类型为文件形式
vi conf/channelsFile.conf
a1.sources = s1
a1.channels = c1
a1.sinks = k1
# For each one of the sources, the type is defined
a1.sources.s1.type = syslogtcp
a1.sources.s1.host = localhost
a1.sources.s1.port = 5180
# Each sink‘s type must be defined
a1.sinks.k1.type = logger
# Each channel‘s type is defined.
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /home/bigdata/flume/logs/checkpoint
a1.channels.c1.dataDir = /home/bigdata/flume/logs/data
#Bind the source and sinks to channels
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
#flume-ng agent -n a1 -c conf -f conf/ channelsFile.conf -Dflume.root.logger=DEBUG,console