flume服务器环境
centos 7.2
jdk 1.8
flume 1.8
hadoop平台环境
centos7.2
ambari 2.6.1
hdp 2.6.4
jdk 1.8
一、 ftp安装与配置
参考:https://blog.****.net/qq_39160721/article/details/80250975
二、flume安装与配置
参考 :https://blog.****.net/qq_39160721/article/details/80255194
三、创建flume配置文件
创建flumeftp.conf配置文件:
# Namethe components on this agent
ftpagent.sources= busS metroS busGPSS
ftpagent.sinks= busK metroK busGPSK
ftpagent.channels= busC metroC busGPSC
#Describe/configure the source
#bus info
ftpagent.sources.busS.channels= busC
ftpagent.sources.busS.type= spooldir
ftpagent.sources.busS.spoolDir= /usr/local/ftpdir/busInfo
ftpagent.sources.busS.fileHeader= true
#metroinfo
ftpagent.sources.metroS.channels= metroC
ftpagent.sources.metroS.type= spooldir
ftpagent.sources.metroS.spoolDir= /usr/local/ftpdir/metroinfo
ftpagent.sources.metroS.fileHeader= true
#bus GPS
ftpagent.sources.busGPSS.channels= busGPSC
ftpagent.sources.busGPSS.type= spooldir
ftpagent.sources.busGPSS.spoolDir= /usr/local/ftpdir/busGPS
ftpagent.sources.busGPSS.fileHeader= true
# Use achannel which buffers events in file
#bus info
ftpagent.channels.busC.type= memory
ftpagent.channels.busC.capacity=1000
ftpagent.channels.busC.transactionCapacity=100
#metroinfo
ftpagent.channels.metroC.type= memory
ftpagent.channels.metroC.capacity=1000
ftpagent.channels.metroC.transactionCapacity=100
#bus GPS
ftpagent.channels.busGPSC.type= file
ftpagent.channels.busGPSC.checkpointDir= /usr/local/flumeftp/checkpoint
ftpagent.channels.busGPSC.dataDirs= /usr/local/flumeftp/data
#Describe the sink
#bus info
ftpagent.sinks.busK.channel= busC
ftpagent.sinks.busK.type= hdfs
ftpagent.sinks.busK.hdfs.path= hdfs://10.250.11.52:8020/source/flume/ftp/busInfo
ftpagent.sinks.busK.hdfs.writeFormat= Text
ftpagent.sinks.busK.hdfs.fileType= DataStream
ftpagent.sinks.busK.hdfs.rollInterval= 10
ftpagent.sinks.busK.hdfs.rollSize= 0
ftpagent.sinks.busK.hdfs.rollCount= 0
ftpagent.sinks.busK.hdfs.filePrefix= %Y-%m-%d-%H-%M-%S
ftpagent.sinks.busK.hdfs.useLocalTimeStamp= true
#metroinfo
ftpagent.sinks.metroK.channel= metroC
ftpagent.sinks.metroK.type= hdfs
ftpagent.sinks.metroK.hdfs.path= hdfs://10.250.11.52:8020/source/flume/ftp/metroinfo
ftpagent.sinks.metroK.hdfs.writeFormat= Text
ftpagent.sinks.metroK.hdfs.fileType= DataStream
ftpagent.sinks.metroK.hdfs.rollInterval= 10
ftpagent.sinks.metroK.hdfs.rollSize= 0
ftpagent.sinks.metroK.hdfs.rollCount= 0
ftpagent.sinks.metroK.hdfs.filePrefix= %Y-%m-%d-%H-%M-%S
ftpagent.sinks.metroK.hdfs.useLocalTimeStamp= true
#bus GPS
ftpagent.sinks.busGPSK.channel= busGPSC
ftpagent.sinks.busGPSK.type= hdfs
ftpagent.sinks.busGPSK.hdfs.path= hdfs://10.250.11.52:8020/source/flume/ftp/busGPS
ftpagent.sinks.busGPSK.hdfs.writeFormat= Text
ftpagent.sinks.busGPSK.hdfs.fileType= DataStream
ftpagent.sinks.busGPSK.hdfs.rollInterval= 10
ftpagent.sinks.busGPSK.hdfs.rollSize= 0
ftpagent.sinks.busGPSK.hdfs.rollCount= 0
ftpagent.sinks.busGPSK.hdfs.filePrefix= %Y-%m-%d-%H-%M-%S
ftpagent.sinks.busGPSK.hdfs.useLocalTimeStamp= true
四、测试
1) ftp上传文件
2) 运行flume
>flume-ng agent --conf /usr/local/flume/conf --conf-file /usr/local/flume/conf/flumeftp.conf --name ftpagent -Dflume.root.logger=INFO,console
3)出现的错误与解决方法
- ava.lang.IllegalStateException: Directory does not exist: /usr/local/ftpdir/busInfo
解决方法:路径区分大小写,修改将配置中的/usr/local/ftpdir/busInfo 改为 /usr/local/ftpdir/businfo
- ava.lang.NoClassDefFoundError: org/apache/hadoop/io/SequenceFile$CompressionType
- Permission denied: user=root, access=WRITE, inode="/source/flume/ftp/busInfo/2018-05-09-15-56-47.1525852607458.tmp":hdfs:hdfs:drwxr-xr-x
解决方法:开放hdfs中文件夹权限:sudo -u hdfs hadoop fs -chmod -R 1777/source/flume/ftp
- The channel is full, and cannot write data now. The source will try again after 4000 milliseconds
将flumeftp.conf配置文件中capacity、transactionCapacity的配置100改为10000
ftpagent.channels.busC.capacity=10000
ftpagent.channels.busC.transactionCapacity=10000
4)查看结果
hadoop平台中文件