简介:
文档 https://flume.apache.org/FlumeUserGuide.html
hadoop 生态系统中,flume 的职责是收集数据,一般用作收集各种日志数据。
Source -> Channel -> Sink 这是一个基本的工作流程。
Source 定义了数据从哪里来,Channel 是一个数据暂存的位置 ( disk / mem ),Sink 定义将数据流向哪里!
一、flume 安装
shell > yum -y install jdk1..0_111 # java 环境必不可少 shell > java -version
java version "1.8.0_111"
Java(TM) SE Runtime Environment (build 1.8.0_111-b14)
Java HotSpot(TM) -Bit Server VM (build 25.111-b14, mixed mode) shell > cd /usr/local/src
shell > wget http://www.apache.org/dyn/closer.lua/flume/1.7.0/apache-flume-1.7.0-bin.tar.gz shell > tar zxf apache-flume-1.7.-bin.tar.gz -C /usr/local
二、httpd ( 模拟一个日志源 )
shell > yum -y install httpd shell > /etc/init.d/httpd start
三、flume 基本配置
1、修改 flume 环境变量配置文件
shell > cd /usr/local/apache-flume-1.7.-bin
shell > cp conf/flume-env.sh.template conf/flume-env.sh shell > vim conf/flume-env.sh
# 定义 JAVA_HOME
export JAVA_HOME=/usr/java/default
2、修改 flume 配置文件
shell > vim conf/flume-conf.properties
# 定义 sources、channels、sinks
flumeAgent.sources = r_log
flumeAgent.channels = c_memory
flumeAgent.sinks = s_logger # Source
flumeAgent.sources.r_log.type = exec
flumeAgent.sources.r_log.channels = c_memory
flumeAgent.sources.r_log.command = tail -F /var/log/httpd/access_log # Channel
flumeAgent.channels.c_memory.type = memory
flumeAgent.channels.c_memory.capacity =
flumeAgent.channels.c_memory.transactionCapacity = # Sink
flumeAgent.sinks.s_logger.type = logger
flumeAgent.sinks.s_logger.channel = c_memory # 这是一个测试的配置文件,flume 监听 access_log 并输出到 logger # flumeAgent.channels.c_memory.capacity Channel 存储事件的最大数
# flumeAgent.channels.c_memory.transactionCapacity 从 Source 到 Sink 每传输事物中的最大事件数
四、flume 启动
shell > sh bin/flume-ng agent -c conf -f conf/flume-conf.properties -n flumeAgent -Dflume.root.logger=INFO,console # 启动一个 agent,-c 指定配置文件目录,-f 指定配置文件,-n 指定 agent 名称,-D 指定日志级别、输出位置
# 当 access_log 有新记录时,就会有事件输出到终端
五、flume 收集日志写入 hdfs
shell > cd /usr/local/apache-flume-1.7.-bin shell > vim conf/flume-conf.properties
# 定义 sources、channels、sinks
flumeAgent.sources = r_log
flumeAgent.channels = c_file
flumeAgent.sinks = s_hdfs # Source
flumeAgent.sources.r_log.type = taildir
flumeAgent.sources.r_log.channels = c_file
flumeAgent.sources.r_log.filegroups = f1
flumeAgent.sources.r_log.filegroups.f1 = /var/log/httpd/access_log
flumeAgent.sources.r_log.positionFile = /data/flume/taildir_position_log.json
flumeAgent.sources.r_log.skipToEnd = true # Channel
flumeAgent.channels.c_file.type = file
flumeAgent.channels.c_file.checkpointDir = /data/flume/check
flumeAgent.channels.c_file.useDualCheckpoints = true
flumeAgent.channels.c_file.backupCheckpointDir = /data/flume/backup
flumeAgent.channels.c_file.dataDirs = /data/flume/data
flumeAgent.channels.c_file.keep-alive =
flumeAgent.channels.c_file.write-timeout = # Sink
flumeAgent.sinks.s_hdfs.type = hdfs
flumeAgent.sinks.s_hdfs.channel = c_file
flumeAgent.sinks.s_hdfs.hdfs.path = hdfs://master.hadoop/webroot/logs
flumeAgent.sinks.s_hdfs.hdfs.filePrefix = api
flumeAgent.sinks.s_hdfs.hdfs.rollInterval =
flumeAgent.sinks.s_hdfs.hdfs.rollSize =
flumeAgent.sinks.s_hdfs.hdfs.rollCount =
flumeAgent.sinks.s_hdfs.hdfs.batchSize =
flumeAgent.sinks.s_hdfs.hdfs.fileType = DataStream
flumeAgent.sinks.s_hdfs.hdfs.writeFormat = Text
flumeAgent.sinks.s_hdfs.hdfs.threadsPoolSize =
flumeAgent.sinks.s_hdfs.hdfs.round = true
flumeAgent.sinks.s_hdfs.hdfs.roundValue =
flumeAgent.sinks.s_hfds.hdfs.roundUnit = hour
flumeAgent.sinks.s_hdfs.hdfs.useLocalTimeStamp = true
# 注意:上述配置文件中涉及到的所有目录都会自动生成,不需要手动创建 ( 当然也可以手动创建 )
shell > sh bin/flume-ng agent -c conf -f conf/flume-conf.properties -n flumeAgent -Dflume.root.logger=INFO,console
# 启动 Agent
shell > tree /data/flume
/data/flume
├── backup
│ ├── backupComplete
│ ├── checkpoint
│ ├── checkpoint.meta
│ ├── inflightputs
│ └── inflighttakes
├── check
│ ├── checkpoint
│ ├── checkpoint.meta
│ ├── inflightputs
│ ├── inflighttakes
│ └── queueset
├── data
│ ├── log-
│ ├── log-.meta
│ ├── log-
│ └── log-.meta
└── taildir_position_log.json
# 这便是配置中的所需目录及文件
shell > su - hadoop hadoop shell > hdfs dfs -ls /webroot/logs
-rw-r--r-- root supergroup -- : /webroot/logs/api..tmp hadoop shell > hdfs dfs -tail -f /webroot/logs/api..tmp
192.168.1.246 - - [/Apr/::: +] "GET /icons/apache_pb.gif HTTP/1.1" - "http://192.168.1.25/" "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/49.0.2623.221 Safari/537.36 SE 2.X MetaSr 1.0"
192.168.1.246 - - [/Apr/::: +] "GET /icons/poweredby.png HTTP/1.1" - "http://192.168.1.25/" "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/49.0.2623.221 Safari/537.36 SE 2.X MetaSr 1.0"
# 可以看到数据正被写入 hdfs ( 注意:是从日志文件头部读取 )
报错管理:
1、hdfs 权限拒绝
-- ::, (SinkRunner-PollingRunner-DefaultSinkProcessor) [WARN - org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:)] HDFS IO error
org.apache.hadoop.security.AccessControlException: Permission denied: user=root, access=WRITE, inode="/webroot/logs":hadoop:supergroup:drwxr-xr-x
at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:)
at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:)
at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:)
at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:)
at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:)
at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkAncestorAccess(FSDirectory.java:)
at org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.resolvePathForStartFile(FSDirWriteFileOp.java:)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:)
at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:)
解决方法:
shell > su - hadoop hadoop shell > vim /usr/local/hadoop-2.8./etc/hadoop/hdfs-site.xml <property>
<name>dfs.permissions.enabled</name>
<value>false</value>
</property>
# 暂时关闭了权限验证,还没找到更好的方法 ( 开放 hdfs / 权限,跟关闭权限验证没啥区别 )