Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。
Flume主要由3个重要的组件构成:
Source:完成对日志数据的收集,分成transtion 和 event 打入到channel之中。
Channel:主要提供一个队列的功能,对source提供中的数据进行简单的缓存。
Sink:取出Channel中的数据,进行相应的存储文件系统,数据库,或者提交到远程服务器。
Flume逻辑上分三层架构:agent,collector,storage
agent用于采集数据,agent是flume中产生数据流的地方,同时,agent会将产生的数据流传输到collector。
collector的作用是将多个agent的数据汇总后,加载到storage中。
storage是存储系统,可以是一个普通file,也可以是HDFS,HIVE,HBase等。
Flume的架构主要有一下几个核心概念:
Event:一个数据单元,带有一个可选的消息头
Flow:Event从源点到达目的点的迁移的抽象
Client:操作位于源点处的Event,将其发送到Flume Agent
Agent:一个独立的Flume进程,包含组件Source、Channel、Sink
Source:用来消费传递到该组件的Event
Channel:中转Event的一个临时存储,保存有Source组件传递过来的Event
Sink:从Channel中读取并移除Event,将Event传递到Flow Pipeline中的下一个Agent(如果有的话)
关于Flume更多内容,可以参考网络文献:Flume的原理和使用
一:安装flume
flume下载地址: flume下载官网
1.解压安装包
- sudo tar -zxvf apache-flume-1.7.0-bin.tar.gz -C /usr/local # 将apache-flume-1.7.0-bin.tar.gz解压到/usr/local目录下,这里一定要加上-C否则会出现归档找不到的错误
- sudo mv ./apache-flume-1.7.0-bin ./flume #将解压的文件修改名字为flume,简化操作
- sudo chown -R hadoop:hadoop ./flume #把/usr/local/flume目录的权限赋予当前登录Linux系统的用户,这里假设是hadoop用户
Shell
2.配置环境变量
然后在首行加入如下代码:
- export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64;
- export FLUME_HOME=/usr/local/flume
- export FLUME_CONF_DIR=$FLUME_HOME/conf
- export PATH=$PATH:$FLUME_HOME/bin
Shell
注意, 上面的JAVA_HOME,如果以前已经在.bashrc文件中设置过,就不要重复添加了,使用以前的设置即可。
比如,以前设置得JAVA_HOME可能是“export JAVA_HOME=/usr/lib/jvm/default-java”,则使用原来的设置即可。
接下来使环境变量生效:
修改 flume-env.sh 配置文件:
- cd /usr/local/flume/conf
- sudo cp ./flume-env.sh.template ./flume-env.sh
- sudo vim ./flume-env.sh
Shell
打开flume-env.sh文件以后,在文件的最开始位置增加一行内容,用于设置JAVA_HOME变量:
export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64;
注意,你的JAVA_HOME可能与上面的设置不一致,一定要根据你之前已经安装的Java路径来设置,比如,有的机器可能是:
export JAVA_HOME=/usr/lib/jvm/default-java
然后,保存flume-env.sh文件,并退出vim编辑器。
3.查看flume版本信息
- cd /usr/local/flume
- ./bin/flume-ng version #查看flume版本信息;
Shell
如果安装成功,出现如下图片
注意:如果系统里安装了hbase,会出现错误: 找不到或无法加载主类 org.apache.flume.tools.GetJavaProperty。如果没有安装hbase,这一步可以略过。
- cd /usr/local/hbase/conf
- sudo vim hbase-env.sh
Shell
- #1、将hbase的hbase.env.sh的这一行配置注释掉,即在export前加一个#
- #export HBASE_CLASSPATH=/home/hadoop/hbase/conf
- #2、或者将HBASE_CLASSPATH改为JAVA_CLASSPATH,配置如下
- export JAVA_CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
- #笔者用的是第一种方法
Shell
二:测试flume
1.案例1:Avro source
Avro可以发送一个给定的文件给Flume,Avro 源使用AVRO RPC机制。
a) 创建agent配置文件
- cd /usr/local/flume
- sudo vim ./conf/avro.conf #在conf目录下编辑一个avro.conf空文件
Shell
然后,我们在avro.conf写入以下内容
- a1.sources = r1
- a1.sinks = k1
- a1.channels = c1
-
- # Describe/configure the source
- a1.sources.r1.type = avro
- a1.sources.r1.channels = c1
- a1.sources.r1.bind = 0.0.0.0
- a1.sources.r1.port = 4141
- #注意这个端口名,在后面的教程中会用得到
-
- # Describe the sink
- a1.sinks.k1.type = logger
-
- # Use a channel which buffers events in memory
- a1.channels.c1.type = memory
- a1.channels.c1.capacity = 1000
- a1.channels.c1.transactionCapacity = 100
-
- # Bind the source and sink to the channel
- a1.sources.r1.channels = c1
- a1.sinks.k1.channel = c1
Shell
上面Avro Source参数说明如下:
Avro Source的别名是avro,也可以使用完整类别名称org.apache.flume.source.AvroSource,因此,上面有一行设置是a1.sources.r1.type = avro,表示数据源的类型是avro。
bind绑定的ip地址或主机名,使用0.0.0.0表示绑定机器所有的接口。a1.sources.r1.bind = 0.0.0.0,就表示绑定机器所有的接口。
port表示绑定的端口。a1.sources.r1.port = 4141,表示绑定的端口是4141。
a1.sinks.k1.type = logger,表示sinks的类型是logger。
b) 启动flume agent a1
- /usr/local/flume/bin/flume-ng agent -c . -f /usr/local/flume/conf/avro.conf -n a1 -Dflume.root.logger=INFO,console #启动日志控制台
Shell
这里我们把这个窗口称为agent窗口。
c) 创建指定文件
先打开另外一个终端,在/usr/local/flume下写入一个文件log.00,内容为hello,world:
- cd /usr/local/flume
- sudo sh -c 'echo "hello world" > /usr/local/flume/log.00'
Shell
我们再打开另外一个终端,执行:
- cd /usr/local/flume
- bin/flume-ng avro-client --conf conf -H localhost -p 4141 -F /usr/local/flume/log.00 #4141是avro.conf文件里的端口名
Shell
此时我们可以看到第一个终端(agent窗口)下的显示,也就是在日志控制台,就会把log.00文件的内容打印出来:
avro source执行成功!案例一over!
2.案例2:netcatsource
a) 创建agent配置文件
- cd /usr/local/flume
- sudo vim ./conf/example.conf #在conf目录创建example.conf
Shell
在example.conf里写入以下内容:
- #example.conf: A single-node Flume configuration
-
- # Name the components on this agent
- a1.sources = r1
- a1.sinks = k1
- a1.channels = c1
-
- # Describe/configure the source
- a1.sources.r1.type = netcat
- a1.sources.r1.bind = localhost
- a1.sources.r1.port = 44444
- #同上,记住该端口名
-
- # Describe the sink
- a1.sinks.k1.type = logger
-
- # Use a channel which buffers events in memory
- a1.channels.c1.type = memory
- a1.channels.c1.capacity = 1000
- a1.channels.c1.transactionCapacity = 100
-
- # Bind the source and sink to the channel
- a1.sources.r1.channels = c1
- a1.sinks.k1.channel = c1
Shell
b)启动flume agent (即打开日志控制台):
- /usr/local/flume/bin/flume-ng agent --conf ./conf --conf-file ./conf/example.conf --name a1 -Dflume.root.logger=INFO,console
Shell
如图:
再打开一个终端,输入命令:telnet localhost 44444
- telnet localhost 44444
- #前面编辑conf文件的端口名
Shell
然后我们可以在终端下输入任何字符,第一个终端的日志控制台也会有相应的显示,如我们输入”hello,world”,得出
第一个终端的日志控制台显示:
netcatsource运行成功!
这里补充一点,flume只能传递英文和字符,不能用中文,我们先可以在第二个终端输入“中国”两个字:
第一个终端的日志控制台显示: