Flume 实战(1) -- 初体验

时间:2021-01-01 21:10:18

前言:

  Flume-ng是数据收集/聚合/传输的组件, Flume-ng抛弃了Flume OG原本繁重的zookeeper和Master, Collector, 其整体的架构更加的简洁和明了. 其基础组件就Agent进程, 内部又可以细分为Source, Channel, Sink三个组件, Source是数据的输入源, channel作为消息的管道, 而sink是作为数据流的输出, Source可以配置多个channel, sink和channel一一对应.
  Flume 实战(1) -- 初体验

  *) 初体验Flume-ng
  以CDH5版本的Hadoop为基础, 编辑一个简单的agent用例
  1). flume-ng的安装目录
  which flume-ng
  /usr/bin/flume-ng

  2). flume-ng命令
  [<username>@<hostname> study_flume]# flume-ng help

 Usage: /usr/lib/flume-ng/bin/flume-ng <command> [options]...
  commands:
    help display this help text
    agent run a Flume agent
    avro-client run an avro Flume client
    version show Flume version info

  3). 两个简单Demo
  3.1). 简单日志输出Demo
  以netcat型监听服务作为source, 以日志输出作为sink的Demo
  #) 配置 flume-demo-logger.conf
  mkdir -p conf
  touch conf/flume-demo-logger.conf
  conf/flume-demo-logger.conf的内容如下:

agent1.sources = source1
agent1.sinks = sink1
agent1.channels = channel1

agent1.sources.source1.type = netcat
agent1.sources.source1.bind = 0.0.0.0
agent1.sources.source1.port = 9999

agent1.sinks.sink1.type = logger

agent1.channels.channel1.type = memory
agent1.channels.channel1.capacity = 1000
agent1.channels.channel1.transactionCapacity = 100

agent1.sources.source1.channels = channel1
agent1.sinks.sink1.channel = channel1

  注: 从运维的角度, bind最好是0.0.0.0, port最好小于10000(监听端口最好避免与临时端口争抢), 同时sources.<source_name>.channels是复数形式, sinks.<sink_name>.channel是单数形式, 请注意一对多, 一对一的对应关系.

  #) 启动flume-ng agent
  flume-ng agent -c conf -f conf/flume-demo-logger.conf --name agent1 -Dflume.root.logger=INFO,Console

14/07/03 14:14:34 INFO source.NetcatSource: Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/0:0:0:0:0:0:0:0:9999]

  表示已经监听在9999端口上了

  #) 启动telnet进行测试
  telnet 127.0.0.1 9999
  Flume 实战(1) -- 初体验
  注: telnet 使用ctrl+']'转换到命令模式, 然后键入quit, 退出

  #) flume-ng agent输出验证
  Flume 实战(1) -- 初体验

  3.2). 日志提取到HDFS(可以假设web访问日志, 自动进入HDFS)
  #) 在hdfs上创建输出目录
  sudo -u hdfs hdfs dfs -mkdir -p /flume/test/data

  #) 配置 flume-demo-hdfs.conf

agent1.sources = source1
agent1.sinks = sink1
agent1.channels = channel1

agent1.sources.source1.type = exec
agent1.sources.source1.command = tail -f /var/log/1.log

agent1.sinks.sink1.type = hdfs 
agent1.sinks.sink1.hdfs.path = hdfs://<namenode_server_ip>:8020/flume/test/data
agent1.sinks.sink1.hdfs.filePrefix = events-
agent1.sinks.sink1.hdfs.fileType = DataStream
agent1.sinks.sink1.hdfs.writeFormat = Text
agent1.sinks.sink1.hdfs.roundUnit = minute

agent1.channels.channel1.type = memory
agent1.channels.channel1.capacity = 100
agent1.channels.channel1.transactionCapacity = 100

agent1.sources.source1.channels = channel1
agent1.sinks.sink1.channel = channel1

  #) 启动flume-ng agent

  sudo -u hdfs flume-ng agent -c conf -f conf/flume-demo-hdfs.conf --name agent1 -Dflume.root.logger=INFO,Console

  #) 追加日志内容
  Flume 实战(1) -- 初体验

  #) 数据验证
  Flume 实战(1) -- 初体验

  详细的source/channel/sink可参考官网文档:

  http://flume.apache.org/FlumeUserGuide.html

总结:
  是不是很简单? 让我们深入研究源码 Continue!

参考链接:
http://blog.chinaunix.net/uid-790245-id-3869211.html
http://www.kankanews.com/ICkengine/archives/130646.shtml
http://www.ibm.com/developerworks/cn/data/library/bd-1309biginsightsflume/index.html
http://blog.csdn.net/cnbird2008/article/details/18967449