1.fluem 实时监测服务器端文件
1.下载插件git clone https://github.com/keedio/flume-ftp-source.git2.编译mvn clean package -DskipTests3.将生产的jar拷贝到$FLUME_HOME/lib/cp flume-ftp-source-2.2.0.jar $FLUME_HOME/lib/4.下载依赖jarwget http://central.maven.org/maven2/commons-net/commons-net/3.3/commons-net-3.3.jarwget http://central.maven.org/maven2/com/jcraft/jsch/0.1.54/jsch-0.1.54.jar5. 将commons-net-3.3.jar 和 jsch-0.1.54.jar 拷贝到$FLUME_HOME/lib/cp commons-net-3.3.jar $FLUME_HOME/lib/cp jsch-0.1.54.jar $FLUME_HOME/lib/6. 修改conf文件touch flume-ng-ftp-source-FTP.conf7. 添加修改的内容(flume收集文件后保存到文件中)## Sources Definition for agent "agent"#ACTIVE LISTagent.sources = ftp1agent.sinks = k1agent.channels = ch1##### SOURCE IS ftp server# Type of source for ftp sourcesagent.sources.ftp1.type = org.keedio.flume.source.ftp.source.Sourceagent.sources.ftp1.client.source = ftp# Connection properties for ftp serveragent.sources.ftp1.name.server = 192.168.1.64agent.sources.ftp1.port = 21agent.sources.ftp1.user =testagent.sources.ftp1.password =12345# Process files inagent.sources.ftp1.working.directory = /# Proces files matches (java regex for ftp-ftps)agent.sources.ftp1.filter.pattern = .+\\.txt# keep file track status in folderagent.sources.ftp1.folder = /root/download# file track status nameagent.sources.ftp1.file.name = ftp1-status-file.ser# Discover delay, each configured milisecond directory will be exploredagent.sources.ftp1.run.discover.delay=5000# Process by linesagent.sources.ftp1.flushlines = true# Discover and process files under user's home directoryagent.sources.ftp1.search.recursive = true# Do not process file while it is being written.agent.sources.ftp1.processInUse = false# If file must not be processed while it is being written, wait timeout.agent.sources.ftp1.processInUseTimeout = 30agent.sinks.k1.type = file_rollagent.sinks.k1.sink.directory = /var/log/flume-ftpagent.sinks.k1.sink.rollInterval = 7200agent.channels.ch1.type = memoryagent.channels.ch1.capacity = 10000agent.channels.ch1.transactionCapacity = 1000agent.sources.ftp1.channels = ch1agent.sinks.k1.channel = ch18. 运行flume-ng./bin/flume-ng agent -c conf -conf-file conf/flume-ng-ftp-source-FTP.conf --name agent -Dflume.root.logger=INFO,console
注意如果使用cdh的flume,需要将commons-net-3.3.jar, jsch-0.1.54.jar, flume-ftp-source-2.2.0.jar复制到一下目录
cp ./* /opt/cloudera/parcels/CDH-5.14.0-1.cdh5.14.0.p0.24/lib/flume-ng/lib
如果配置flume采集后的数据发送kafka中消费
2. fpt-flume-kakfa.conf
## Sources Definition for agent "agent"#ACTIVE LISTagent.sources = ftp1agent.sinks = k1agent.channels = ch1##### SOURCE IS ftp server# Type of source for ftp sourcesagent.sources.ftp1.type = org.keedio.flume.source.ftp.source.Sourceagent.sources.ftp1.client.source = ftp# Connection properties for ftp serveragent.sources.ftp1.name.server = 192.168.1.64agent.sources.ftp1.port = 21agent.sources.ftp1.user =testagent.sources.ftp1.password =12345# Process files inagent.sources.ftp1.working.directory = /# Proces files matches (java regex for ftp-ftps)agent.sources.ftp1.filter.pattern = .+\\.txt# keep file track status in folderagent.sources.ftp1.folder = /root/download# file track status nameagent.sources.ftp1.file.name = ftp1-status-file.ser# Discover delay, each configured milisecond directory will be exploredagent.sources.ftp1.run.discover.delay=5000# Process by linesagent.sources.ftp1.flushlines = true# Discover and process files under user's home directoryagent.sources.ftp1.search.recursive = true# Do not process file while it is being written.agent.sources.ftp1.processInUse = false# If file must not be processed while it is being written, wait timeout.agent.sources.ftp1.processInUseTimeout = 30#agent.sinks.k1.type = file_rollagent.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSinkagent.sinks.kafka.topic = testagent.sinks.k1.brokerList = 192.168.1.64:9092a1.sinks.k1.kafka.flumeBatchSize = 20a1.sinks.k1.kafka.producer.acks = 1#agent.sinks.k1.sink.directory = /var/log/flume-ftp#agent.sinks.k1.sink.rollInterval = 7200agent.channels.ch1.type = memoryagent.channels.ch1.capacity = 10000agent.channels.ch1.transactionCapacity = 1000agent.sources.ftp1.channels = ch1agent.sinks.k1.channel = ch1
