agent.channels.ch1.type = memoryagent.sources.sql-source.channels = ch1agent.channels = ch1agent.sinks = HDFSagent.sources = sql-sourceagent.sources.sql-source.type = org.keedio.flume.source.SQLSourceagent.sources.sql-source.connection.url = jdbc:mysql://172.16.1.127:3306/testagent.sources.sql-source.user = rootagent.sources.sql-source.password = 123456agent.sources.sql-source.table = wlslogagent.sources.sql-source.columns.to.select = *agent.sources.sql-source.incremental.column.name = idagent.sources.sql-source.incremental.value = 0agent.sources.sql-source.run.query.delay=5000agent.sources.sql-source.status.file.path = /var/lib/flumeagent.sources.sql-source.status.file.name = sql-source.statusagent.sinks.HDFS.type = logger-----agent.sinks.HDFS.channel = ch1agent.sinks.HDFS.type = hdfsagent.sinks.HDFS.hdfs.path = hdfs://mycluster/flume/mysqlagent.sinks.HDFS.hdfs.fileType = DataStreamagent.sinks.HDFS.hdfs.writeFormat = Textagent.sinks.HDFS.hdfs.rollSize = 268435456agent.sinks.HDFS.hdfs.rollInterval = 0agent.sinks.HDFS.hdfs.rollCount = 0------
f2.sources = r1f2.channels = c1f2.sinks = k1# 这里用 自己定义的 SQLSourcef2.sources.r1.type = org.keedio.flume.source.SQLSourcef2.sources.r1.connectionurl = jdbc:mysql://172.16.1.127:3306/testf2.sources.r1.user = yyjf2.sources.r1.password = yyj#f2.sources.r1.driverclass= oracle.jdbc.driver.OracleDriverf2.sources.r1.filepath = /var/log/sqllogf2.sources.r1.filename = sqlSource.status#f2.sources.r1.customquery = select a.id,a.COUPON_id,b.id from USER_COUPON_CODE_1 a ,COUPON_CODE b where a.COUPON_id = b.id#f2.sources.r1.customquery = select a.id,a.COUPON_id,b.id from USER_COUPON_CODE_1 a ,COUPON_CODE b where a.COUPON_id = b.id and a.id > $@$f2.sources.r1.customquery = select * from USER_COUPON_CODE_1f2.sources.r1.begin = 0f2.sources.r1.autoincrementfield = a.idf2.sources.r1.batchsize = 1000#具体定义channelf2.channels.c1.type = memoryf2.channels.c1.capacity = 1000f2.channels.c1.transactionCapacity = 100#具体定义sinkf2.sinks.k1.type = logger#组装source、channel、sinkf2.sources.r1.channels = c1f2.sinks.k1.channel = c1
测试过:
agent.sources = mysqlagent.sinks = k1agent.channels = ch1agent.sources.mysql.type = org.keedio.flume.source.SQLSourceagent.sources.mysql.hibernate.connection.url = jdbc:mysql://192.168.1.64:3306/testagent.sources.mysql.hibernate.connection.user = rootagent.sources.mysql.hibernate.connection.password = 123456agent.sources.mysql.table = wlslogagent.sources.mysql.columns.to.select = *agent.sources.mysql.hibernate.connection.autocommit = trueagent.sources.mysql.incremental.column.name = idagent.sources.mysql.incremental.value = 0agent.sources.mysql.status.file.path = /var/lib/flumeagent.sources.mysql.status.file.name = sql-source.status1agent.channels.ch1.type = memoryagent.channels.ch1.capacity = 10000agent.channels.ch1.transactionCapacity = 1000agent.sinks.k1.type=loggeragent.sources.mysql.channels=ch1agent.sinks.k1.channel=ch1
