flume增量采集数据

鲁氏汤包王 2019-10-25

对于flume的增量抽取,首先想到的就是常用的那几种方法,监控日志,触发器,版本号,时间戳等等,虽然可以实现,但都对数据源有了一定限制,假如客户的系统内部表没有那些东西,这就是一件很难搞的事了,毕竟对方数据库不能随便动。

这个时候可以采用 $,它可以表示增量列上一次查询的值。,将它加入sql语句中所查询的数据就实现了增量,当然在navicat中使用是不支持这个符号的,flume可能封装了一些方法对$进行了解析,在这方面并没有太多了解。

a1.channels=ch1                         
a1.channels.ch1.type=memory

a1.sources = src-1

a1.sources.src-1.channels=ch1

a1.sources.src-1.type = org.keedio.flume.source.SQLSource
a1.sources.src-1.run.query.delay=60000
a1.sources.sql-source.start.from=0
#所采集数据库的地址和数据库名
a1.sources.src-1.hibernate.connection.url=
#数据库用户名
a1.sources.src-1.hibernate.connection.user =
#数据库密码
a1.sources.src-1.hibernate.connection.password =
a1.sources.src-1.hibernate.connection.autocommit = true
a1.sources.src-1.hibernate.dialect=org.hibernate.dialect.SQLServerDialect

#驱动类名
a1.sources.src-1.hibernate.connection.driver_class=com.microsoft.sqlserver.jdbc.SQLServerDriver
#通过sql语句进行抽取,当需要实现增量抽取 $ 表示增量列上一次查询的
#值,记录在status文件中,所以查询值中也必须有该值以及需要有一个主键ID。#其他条件可根据业务//情况作更改。
a1.sources.src-1.custom.query=select test1.id,test1.name,test2.address from test1 full join test2 on test1.id=test2.id where test1.id> $ or test2.id>$ 
#status文件的存放路径,当执行flume该文件会在路径下自动生成
a1.sources.src-1.status.file.path=/home/bigdata/
#status文件名
a1.sources.src-1.status.file.name = src-1.ss.status

a1.sources.src-1.batch.size = 6000
a1.sources.src-1.max.rows = 1000

a1.channels.ch1.capacity = 10000
a1.channels.ch1.transactionCapacity = 1000

a1.sinks=k1
#自定义下沉jar包名
a1.sinks.k1.type=MysqlSink
#所下沉到的数据库地址及数据库名
a1.sinks.k1.url=
#下沉到的数据库表名
a1.sinks.k1.tableName=
#数据库用户名
a1.sinks.k1.user=
#数据库密码
a1.sinks.k1.password=
#字段名和上面的sql查询结果要一致
a1.sinks.k1.column_name=id,name,address
a1.sinks.k1.channel=ch1

a1.sinks.k1.batchSize=100

以上是我做过的一个案例实现了flume链表的增量抽取。

相关推荐

wsong / 0评论 2020-04-15