鲁氏汤包王 2019-10-25
对于flume的增量抽取,首先想到的就是常用的那几种方法,监控日志,触发器,版本号,时间戳等等,虽然可以实现,但都对数据源有了一定限制,假如客户的系统内部表没有那些东西,这就是一件很难搞的事了,毕竟对方数据库不能随便动。
这个时候可以采用 $,它可以表示增量列上一次查询的值。,将它加入sql语句中所查询的数据就实现了增量,当然在navicat中使用是不支持这个符号的,flume可能封装了一些方法对$进行了解析,在这方面并没有太多了解。
a1.channels=ch1 a1.channels.ch1.type=memory a1.sources = src-1 a1.sources.src-1.channels=ch1 a1.sources.src-1.type = org.keedio.flume.source.SQLSource a1.sources.src-1.run.query.delay=60000 a1.sources.sql-source.start.from=0 #所采集数据库的地址和数据库名 a1.sources.src-1.hibernate.connection.url= #数据库用户名 a1.sources.src-1.hibernate.connection.user = #数据库密码 a1.sources.src-1.hibernate.connection.password = a1.sources.src-1.hibernate.connection.autocommit = true a1.sources.src-1.hibernate.dialect=org.hibernate.dialect.SQLServerDialect #驱动类名 a1.sources.src-1.hibernate.connection.driver_class=com.microsoft.sqlserver.jdbc.SQLServerDriver #通过sql语句进行抽取,当需要实现增量抽取 $ 表示增量列上一次查询的 #值,记录在status文件中,所以查询值中也必须有该值以及需要有一个主键ID。#其他条件可根据业务//情况作更改。 a1.sources.src-1.custom.query=select test1.id,test1.name,test2.address from test1 full join test2 on test1.id=test2.id where test1.id> $ or test2.id>$ #status文件的存放路径,当执行flume该文件会在路径下自动生成 a1.sources.src-1.status.file.path=/home/bigdata/ #status文件名 a1.sources.src-1.status.file.name = src-1.ss.status a1.sources.src-1.batch.size = 6000 a1.sources.src-1.max.rows = 1000 a1.channels.ch1.capacity = 10000 a1.channels.ch1.transactionCapacity = 1000 a1.sinks=k1 #自定义下沉jar包名 a1.sinks.k1.type=MysqlSink #所下沉到的数据库地址及数据库名 a1.sinks.k1.url= #下沉到的数据库表名 a1.sinks.k1.tableName= #数据库用户名 a1.sinks.k1.user= #数据库密码 a1.sinks.k1.password= #字段名和上面的sql查询结果要一致 a1.sinks.k1.column_name=id,name,address a1.sinks.k1.channel=ch1 a1.sinks.k1.batchSize=100
以上是我做过的一个案例实现了flume链表的增量抽取。