上一篇文章《 linux安装flume和集成kafka测试》,我们介绍了flume安装和集成数据到kafka,本篇文章我们将集成kafka,flume,hadoop,通过flume发送kafka数据到hadoop的hdfs文件。
一.前期准备
1.1 hadoop安装
版本:Hadoop 2.6.5
1.2 zookeeper安装
版本:zookeeper-3.4.9
1.3 kafka安装
版本:kafka_2.10-0.10.1.0
1.4 flume安装
版本:apache-flume-1.7.0
二.配置flume-conf
#source的名字
agent.sources = kafkaSource
# channels的名字,建议按照type来命名
agent.channels = memoryChannel
# sink的名字,建议按照目标来命名
agent.sinks = hdfsSink
# 指定source使用的channel名字
agent.sources.kafkaSource.channels = memoryChannel
# 指定sink需要使用的channel的名字,注意这里是channel
agent.sinks.hdfsSink.channel = memoryChannel
#-------- kafkaSource相关配置-----------------
# 定义消息源类型
agent.sources.kafkaSource.type = org.apache.flume.source.kafka.KafkaSource
# 定义kafka所在zk的地址
agent.sources.kafkaSource.zookeeperConnect = 192.168.32.128:2181,192.168.32.131:2181,192.168.32.132:2181
# 配置消费的kafka topic
agent.sources.kafkaSource.topic=flumeTest2
# 配置消费者组的id
agent.sources.kafkaSource.groupId = flume
# 消费超时时间,参照如下写法可以配置其他所有kafka的consumer选项。注意格式从kafka.xxx开始是consumer的配置属性
#agent.sources.kafkaSource.kafka.consumer.timeout.ms = 100
#------- memoryChannel相关配置-------------------------
# channel类型
agent.channels.memoryChannel.type = memory
# channel存储的事件容量
agent.channels.memoryChannel.capacity=1000
# 事务容量
agent.channels.memoryChannel.transactionCapacity=100
#---------hdfsSink 相关配置------------------
agent.sinks.hdfsSink.type = hdfs
agent.sinks.hdfsSink.hdfs.path = hdfs://Master:9000/usr/hive/warehouse/test
三.kafka,flume,hadoop集成测试
3.1 hadoop操作
3.1.1 启动dfs+yarn
sbin/start-all.sh
3.1.2 新建/usr/hive/warehouse/test文件并赋予权限
hadoop fs -mkdir /usr/hive/warehouse/test
hadoop dfs -chmod g+w /usr/hive/warehouse/test
3.1.3 查看创建文件
3.2 kafka操作
3.2.1 创建主题flumeTest2
bin/kafka-topics.sh --create --zookeeper 192.168.32.128:2181,192.168.32.131:2181,192.168.32.132:2181 --replication-factor 3 --partitions 3 --topic flumeTest2
3.2.2 kafka生产者
bin/kafka-console-producer.sh --broker-list 192.168.32.128:9092,192.168.32.131:9092,192.168.32.132:9092 --topic flumeTest2
3.2.3 kafka消费者
bin/kafka-console-consumer.sh --zookeeper 192.168.32.128:2181,192.168.32.131:2181,192.168.32.132:2181 --topic flumeTest2 --from-beginning
3.3 flume操作
bin/flume-ng agent -n agent -c conf -f conf/flume-conf.properties -Dflume.root.logger=INFO,console
3.4 测试结果
四. 字符串输出实现集成
上节中实现集成的方式,输出的结果有乱码,类似二进制输出
本节将换一种方式实现flume,kafka,hadoop集成。
4.1 flume-conf 配置
tier1.sources = source1
tier1.channels = channel1
tier1.sinks = sink1
tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
tier1.sources.source1.zookeeperConnect = 192.168.32.128:2181,192.168.32.131:2181,192.168.32.132:2181
tier1.sources.source1.topic = flumeTest2
tier1.sources.source1.groupId = flume
tier1.sources.source1.channels = channel1
tier1.sources.source1.interceptors = i1
tier1.sources.source1.interceptors.i1.type = timestamp
tier1.sources.source1.kafka.consumer.timeout.ms = 1000
tier1.channels.channel1.type = memory
tier1.channels.channel1.capacity = 10000
tier1.channels.channel1.transactionCapacity = 1000
tier1.sinks.sink1.type = hdfs
tier1.sinks.sink1.hdfs.path = /tmp/kafka/%{topic}/%y-%m-%d
tier1.sinks.sink1.hdfs.rollInterval = 5
tier1.sinks.sink1.hdfs.rollSize = 0
tier1.sinks.sink1.hdfs.rollCount = 0
tier1.sinks.sink1.hdfs.fileType = DataStream
tier1.sinks.sink1.channel = channel1
4.2 启动flume
bin/flume-ng agent -n tier1 -c conf -f conf/flume-conf.properties -Dflume.root.logger=INFO,console