ErixHao 2020-05-20
导入pom依赖 <dependencies> <dependency> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-core</artifactId> <version>1.7.0</version> </dependency> </dependencies> 编写代码 package com.baway.flume; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.EventDeliveryException; import org.apache.flume.PollableSource; import org.apache.flume.channel.ChannelProcessor; import org.apache.flume.conf.Configurable; import org.apache.flume.event.SimpleEvent; import org.apache.flume.source.AbstractSource; import java.util.HashMap; public class MySource extends AbstractSource implements Configurable, PollableSource { //前缀参数,从配置文件中获取 private String prefix; //后缀参数,从配置文件中获取 private String suffix; //数据生成延迟时间参数 private Long delay; //数据生成条数参数 private int n; public Status process() throws EventDeliveryException { ChannelProcessor channelProcessor = getChannelProcessor(); Status status; try { for(int i = 0; i < n; i++){ Event event = new SimpleEvent(); event.setBody((prefix + i + suffix).getBytes()); event.setHeaders(new HashMap<String, String>()); channelProcessor.processEvent(event); Thread.sleep(delay); } status = Status.READY; } catch (Exception e){ status = Status.BACKOFF; } return status; } //设置每次回滚等待增加的时间 public long getBackOffSleepIncrement() { return 0; } //设置回滚等待时间上限 public long getMaxBackOffSleepInterval() { return 0; } public void configure(Context context) { prefix = context.getString("prefix", "Default"); suffix = context.getString("suffix", "SDfault"); delay = context.getLong("delay",2000L); n = context.getInteger("count", 5); } } 5)测试 1.打包 将写好的代码打包,并放到flume的lib目录(/opt/module/flume)下。 2.配置文件 # Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = com.baway.flume.MySource a1.sources.r1.delay = 1000 a1.sources.r1.prefix = baway a1.sources.r1.count = 10 # Describe the sink a1.sinks.k1.type = logger # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 3.开启任务 [ flume]$ pwd /opt/module/flume [ flume]$ bin/flume-ng agent -c conf/ -f job/mysource.conf -n a1 -Dflume.root.logger=INFO,console 4.结果展示
package com.bawei; import org.apache.flume.*; import org.apache.flume.conf.Configurable; import org.apache.flume.sink.AbstractSink; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class MySink extends AbstractSink implements Configurable { //创建Logger对象 private static final Logger LOG = LoggerFactory.getLogger(AbstractSink.class); private String prefix; private String suffix; @Override public Status process() throws EventDeliveryException { //声明返回值状态信息 Status status; //获取当前Sink绑定的Channel Channel ch = getChannel(); //获取事务 Transaction txn = ch.getTransaction(); //声明事件 Event event; //开启事务 txn.begin(); //读取Channel中的事件,直到读取到事件结束循环 while (true) { event = ch.take(); if (event != null) { break; } } try { //处理事件(打印) LOG.info(prefix + new String(event.getBody()) + suffix); //事务提交 txn.commit(); status = Status.READY; } catch (Exception e) { //遇到异常,事务回滚 txn.rollback(); status = Status.BACKOFF; } finally { //关闭事务 txn.close(); } return status; } @Override public void configure(Context context) { //读取配置文件内容,有默认值 prefix = context.getString("prefix", "hello:"); //读取配置文件内容,无默认值 suffix = context.getString("suffix"); } } 4)测试 1.打包 将写好的代码打包,并放到flume的lib目录(/opt/module/flume)下。 2.配置文件 # Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44444 # Describe the sink a1.sinks.k1.type = com.bawei.MySink #a1.sinks.k1.prefix = bawei: a1.sinks.k1.suffix = :bawei # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 3.开启任务 [ flume]$ bin/flume-ng agent -c conf/ -f job/mysink.conf -n a1 -Dflume.root.logger=INFO,console [ ~]$ nc localhost 44444 hello OK Flume OK