官方文档参数解释:http://flume.apache.org/FlumeUserGuide.html#hdfs-sink
需要注意:文件格式,fileType=DataStream 默认为SequenceFile,是hadoop的文件格式,改为DataStream就可直接读了(SqeuenceFile怎么用还不知道。。)
配置文件:
hdfs.conf
a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = spooldir a1.sources.r1.channels = c1 a1.sources.r1.spoolDir = /usr/local/hadoop/apache-flume-1.6.0-bin/logs a1.sources.r1.fileHeader = true # Describe the sink a1.sinks.k1.type = hdfs a1.sinks.k1.channel = c1 a1.sinks.k1.hdfs.path = hdfs://node4:9000/user/flume/logs/%Y-%m-%d-%H a1.sinks.k1.hdfs.filePrefix = Syslog #a1.sinks.k1.hdfs.fileSuffix = .log #设定后缀 a1.sinks.k1.hdfs.round = true a1.sinks.k1.hdfs.roundValue = 10 a1.sinks.k1.hdfs.roundUnit = minute #--触发roll操作的文件大小in bytes (0: never roll based on file size) a1.sinks.k1.hdfs.rollSize = 128000000 #--在roll操作之前写入文件的事件数量(0 = never roll based on number of events) a1.sinks.k1.hdfs.rollCount = 0 #--文件格式:默认SequenceFile,可选 DataStream \ CompressedStream a1.sinks.k1.hdfs.fileType = DataStream #DataStream可以直接读出来 #--Format for sequence file records. “Text” or “Writable” a1.sinks.k1.hdfs.writeFormat = Text #--使用local time来替换转移字符 (而不是使用event header的timestamp) a1.sinks.k1.hdfs.useLocalTimeStamp = true # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 |
启动hadoop
启动flume:
./flume-ng agent -c . -f /usr/local/hadoop/apache-flume-1.6.0-bin/conf/hdfs.conf -n a1 -Dflume.root.logger=INFO,console
在被监听的文件夹下生成日志文件:
for i in {1000..2000}; do echo "test line $i" >> /usr/local/hadoop/apache-flume-1.6.0-bin/logs/spool_text$i.log; done;
查看hdfs: http://node4:50070