安装
1. 下载并解压tar zxvf apache-flume-1.8.0-bin.tar.gz -C /soft2. 修改flume的环境变量cp flume-env.sh.template flume-env.sh3. 修改JAVA_HOME变量的值export JAVA_HOME=/soft/jdk
flume
收集日志、移动、聚合框架。基于事件。
agent
source //接收数据,生产者//put()//NetcatSource//ExecSource,实时收集 tail -F xxx.txt//spooldir//seq//Stress//avroSourcechannel //暂存数据,缓冲区,//非永久性:MemoryChannel//永久性 :FileChannel,磁盘.//SpillableMemoryChannel :Mem + FileChannel.Capacitysink //输出数据,消费者//从channel提取take()数据,write()destination.//HdfsSink//HbaseSink//avroSink
1.创建配置文件-hello.conf
1.创建配置文件[/soft/flume/conf/hello.conf]#声明三种组件a1.sources = r1a1.channels = c1a1.sinks = k1#定义source信息a1.sources.r1.type=netcata1.sources.r1.bind=localhosta1.sources.r1.port=8888#定义sink信息a1.sinks.k1.type=logger#定义channel信息a1.channels.c1.type=memory#绑定在一起a1.sources.r1.channels=c1a1.sinks.k1.channel=c12.运行a)启动flume agent$>bin/flume-ng agent -f ../conf/helloworld.conf -n a1 -Dflume.root.logger=INFO,consoleb)启动nc的客户端$>nc localhost 8888$nc>hello worldc)在flume的终端输出hello world.
2.实时收集日志并将日志打印到console
实时日志收集,实时收集日志。a1.sources = r1a1.sinks = k1a1.channels = c1a1.sources.r1.type=exec# 实时检查该文件是否发生变化,将变化的文件发送到sinka1.sources.r1.command=tail -F /home/centos/test.txta1.sinks.k1.type=loggera1.channels.c1.type=memorya1.sources.r1.channels=c1a1.sinks.k1.channel=c1
3.批量收集
监控一个文件夹,静态文件。收集完之后,会重命名文件成新文件。.compeleted.a)配置文件[spooldir_r.conf]a1.sources = r1a1.channels = c1a1.sinks = k1a1.sources.r1.type=spooldira1.sources.r1.spoolDir=/home/centos/spoola1.sources.r1.fileHeader=truea1.sinks.k1.type=loggera1.channels.c1.type=memorya1.sources.r1.channels=c1a1.sinks.k1.channel=c1b)创建目录$>mkdir ~/spoolc)启动flume$>bin/flume-ng agent -f ../conf/helloworld.conf -n a1 -Dflume.root.logger=INFO,console
4.flume + Kafka
flume收集的数据送往Kafka消息队列
配置文件
a1.sources = r1a1.sinks = k1a1.channels = c1a1.sources.r1.type=exec#-F 最后10行,如果从头开始收集 -c +0 -F:持续收集后续数据,否则进程停止。a1.sources.r1.command=tail -F -c +0 /home/centos01/callLog/callLog.loga1.channels.c1.type=memorya1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSinka1.sinks.k1.kafka.topic = calllog#a1.sinks.k1.kafka.bootstrap.servers = sa0:9092 sa1:9092 sa2:9092#a1.sinks.k1.brokerList = sa0:9092 sa1:9092 sa2:9092a1.sinks.k1.brokerList = sa0:9092#a1.sinks.k1.kafka.bootstrap.servers = 192.168.0.121:9092 192.168.0.122:9092 192.168.0.123:9092a1.sinks.k1.kafka.flumeBatchSize = 20a1.sinks.k1.kafka.producer.acks = 1a1.sources.r1.channels = c1a1.sinks.k1.channel = c1
5.hbase + flume
将flume收集的数据存储到hbase中
a1.sources = r1a1.channels = c1a1.sinks = k1a1.sources.r1.type = netcata1.sources.r1.bind = localhosta1.sources.r1.port = 8888a1.sinks.k1.type = hbasea1.sinks.k1.table = ns1:t12a1.sinks.k1.columnFamily = f1a1.sinks.k1.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializera1.channels.c1.type=memorya1.sources.r1.channels = c1a1.sinks.k1.channel = c1
6.flume + hdfs
1.hdfsa1.sources = r1a1.channels = c1a1.sinks = k1a1.sources.r1.type = netcata1.sources.r1.bind = localhosta1.sources.r1.port = 8888a1.sinks.k1.type = hdfsa1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H/%M/%Sa1.sinks.k1.hdfs.filePrefix = events-#是否是产生新目录,每十分钟产生一个新目录,一般控制的目录方面。#2017-12-12 -->#2017-12-12 -->%H%M%Sa1.sinks.k1.hdfs.round = truea1.sinks.k1.hdfs.roundValue = 10a1.sinks.k1.hdfs.roundUnit = seconda1.sinks.k1.hdfs.useLocalTimeStamp=true#是否产生新文件。a1.sinks.k1.hdfs.rollInterval=10a1.sinks.k1.hdfs.rollSize=10a1.sinks.k1.hdfs.rollCount=3a1.channels.c1.type=memorya1.sources.r1.channels = c1a1.sinks.k1.channel = c1
flume + ftp
flume-ng-ftp-source-FTP.conf
### wwww.keedio.com# example file, protocol is ftp, process by lines, and sink to file_roll# for testing poporses.## Sources Definition for agent "agent"#ACTIVE LISTagent.sources = ftp1agent.sinks = k1agent.channels = ch1##### SOURCE IS ftp server# Type of source for ftp sourcesagent.sources.ftp1.type = org.keedio.flume.source.ftp.source.Sourceagent.sources.ftp1.client.source = ftp# Connection properties for ftp serveragent.sources.ftp1.name.server = 192.168.0.4agent.sources.ftp1.port = 21agent.sources.ftp1.user = mortadeloagent.sources.ftp1.password = secret# Process files inagent.sources.ftp1.working.directory = /subdirA/subdirAA# Proces files matches (java regex for ftp-ftps)agent.sources.ftp1.filter.pattern = .+\\.csv# keep file track status in folderagent.sources.ftp1.folder = /var/log/flume-ftp# file track status nameagent.sources.ftp1.file.name = ftp1-status-file.ser# Discover delay, each configured milisecond directory will be exploredagent.sources.ftp1.run.discover.delay=5000# Process by linesagent.sources.ftp1.flushlines = true# Discover and process files under user's home directoryagent.sources.ftp1.search.recursive = true# Do not process file while it is being written.agent.sources.ftp1.processInUse = false# If file must not be processed while it is being written, wait timeout.agent.sources.ftp1.processInUseTimeout = 30agent.sinks.k1.type = file_rollagent.sinks.k1.sink.directory = /var/log/flume-ftpagent.sinks.k1.sink.rollInterval = 7200agent.channels.ch1.type = memoryagent.channels.ch1.capacity = 10000agent.channels.ch1.transactionCapacity = 1000agent.sources.ftp1.channels = ch1agent.sinks.k1.channel = ch1
flume-ng-ftp-source-FTPS.conf
# www.keedio.com# example configuration file for FTP SECURE## Sources Definition for agent "agent"#ACTIVE LISTagent.sources = ftps1agent.sinks = k1agent.channels = ch1##### SOURCE IS ftp server# Type of source for ftp sourcesagent.sources.ftps1.type = org.keedio.flume.source.ftp.source.Sourceagent.sources.ftps1.client.source = ftps#agent.sources.ftp1.type = org.apache.flume.source.FTPSource# Connection properties for ftp serveragent.sources.ftps1.name.server = 192.168.0.2agent.sources.ftps1.port = 21agent.sources.ftps1.user = mortadeloagent.sources.ftps1.password = secretagent.sources.ftps1.folder = /var/log/flume-ftpagent.sources.ftps1.file.name = ftps1-status-file.ser##secureagent.sources.ftps1.security.enabled = trueagent.sources.ftps1.security.cipher = TLSagent.sources.ftps1.security.certificate.enabled =false# Discover delay, each configured milisecond directory will be exploredagent.sources.ftps1.run.discover.delay=5000##process by chunks of bytesagent.sources.ftps1.flushlines = false# source will write events in sink file_roll (testing porposes).agent.sinks.k1.type = file_rollagent.sinks.k1.sink.directory = /var/log/flume-ftpagent.sinks.k1.sink.rollInterval = 7200agent.channels.ch1.type = memoryagent.channels.ch1.capacity = 10000agent.channels.ch1.transactionCapacity = 1000
flume-ng-ftp-source-SFTP.conf
# www.keedio.com# example configuration for SFTP## Sources Definition for agent "agent"#ACTIVE LISTagent.sources = sftp1agent.sinks = k1agent.channels = ch1##### SOURCE IS sftp server# Type of source for sftp sourcesagent.sources.sftp1.type = org.keedio.flume.source.ftp.source.Sourceagent.sources.sftp1.client.source = sftp#agent.sources.sftp1.type = org.apache.flume.source.SFTPSource# Connection properties for ftp serveragent.sources.sftp1.name.server = 192.168.0.2agent.sources.sftp1.port = 22agent.sources.sftp1.user = filemonagent.sources.sftp1.password = secret# Process files inagent.sources.sftp1.working.directory = /home/filemon/subdirA# Proces files matches (java regex for sftp)agent.sources.sftp1.filter.pattern = .+\\.csv# keep file track status in folderagent.sources.sftp1.folder = /var/log/flume-ftp# file track status nameagent.sources.sftp1.file.name = sftp1-status-file.ser## root is launching flume binary.agent.sources.sftp1.knownHosts = /root/.ssh/known_hosts## for testing porposes only, default is yesagent.sources.sftp1.strictHostKeyChecking = no# Discover delay, each configured milisecond directory will be exploredagent.sources.sftp1.run.discover.delay=5000#process by lines.agent.sources.sftp1.flushlines = true# Whether a recursive search should be conducted on working directoryagent.sources.sftp1.search.recursive = false# Whether files that are currently being written to should be skippedagent.sources.sftp1.search.processInUse = falseagent.sources.sftp1.search.processInUseTimeout = 30 # Seconds ago used to determine whether file is still being written to# If source files are compressed, they can be decompressed on the fly# Specify compression format like this. The existence of this property implies that source files are compressedagent.sources.sftp1.compressed = gzip # Source files are GZIP compressedagent.sinks.k1.type = file_rollagent.sinks.k1.sink.directory = /var/log/flume-ftpagent.sinks.k1.sink.rollInterval = 7200agent.channels.ch1.type = memoryagent.channels.ch1.capacity = 10000agent.channels.ch1.transactionCapacity = 1000agent.sources.sftp1.channels = ch1agent.sinks.k1.channel = ch1
