Flume SinkProcessor

xiaoxiaojavacsdn 2020-06-08

SinkProcessor共 有 三 种 类 型 , 分 别 是DefaultSinkProcessor 、LoadBalancingSinkProcessor 和 FailoverSinkProcessor。DefaultSinkProcessor 对 应 的 是 单 个 的 Sink , LoadBalancingSinkProcessor 和
FailoverSinkProcessor 对应的是 Sink Group,LoadBalancingSinkProcessor 可以实现负载均衡的功能,FailoverSinkProcessor 可以实现故障转移的功能。
 
一 FailoverSinkProcessor 可以实现故障转移
需求:flume1采集端口数据,发送给flume2或flume3。当flume2或3挂掉后,发送给另一台flume。
flume1配置:NetCat Source -> Memory Channel -> Avro Sink
# 给三大组件取名
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1

#配置 NetCat Source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# 配置 Memory Channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# 配置 Avro Sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop102
a1.sinks.k1.port = 4141

a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop102
a1.sinks.k2.port = 4142

# 配置 sink groups
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
# 最大退避时间(期间不重试)
a1.sinkgroups.g1.processor.maxpenalty = 10000

# 配置三大组件的绑定关系
a1.sources.r1.channels = c1
# c1的数据发给k1或k2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1

flume2配置:Avro Source -> Memory Channel -> Logger Sink

# 给三大组件取名
a2.sources = r1
a2.channels = c1
a2.sinks = k1

# 配置 Avro Source
a2.sources.r1.type = avro
a2.sources.r1.bind = hadoop102
a2.sources.r1.port = 4141

# 配置 Memory Channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100

# 配置 Logger Sink
a2.sinks.k1.type = logger

# 配置三大组件的绑定关系
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1

flume3配置:Avro Source -> Memory Channel -> Logger Sink

# 给三大组件取名
a3.sources = r1
a3.channels = c1
a3.sinks = k1

# 配置 Avro Source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop102
a3.sources.r1.port = 4142

# 配置 Memory Channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100

# 配置 Logger Sink
a3.sinks.k1.type = logger

# 配置三大组件的绑定关系
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1

测试略

 

相关推荐

wsong / 0评论 2020-04-15