linux集成 kafka数据通过flume发送到hadoop

时间:2022-10-22 23:40:10
上一篇文章《 linux安装flume和集成kafka测试,我们介绍了flume安装和集成数据到kafka,本篇文章我们将集成kafka,flume,hadoop,通过flume发送kafka数据到hadoop的hdfs文件。

一.前期准备

1.1 hadoop安装

版本:Hadoop 2.6.5

1.2 zookeeper安装

版本:zookeeper-3.4.9

1.3 kafka安装

版本:kafka_2.10-0.10.1.0

1.4 flume安装

版本:apache-flume-1.7.0

二.配置flume-conf

#source的名字
agent.sources = kafkaSource
# channels的名字,建议按照type来命名
agent.channels = memoryChannel
# sink的名字,建议按照目标来命名
agent.sinks = hdfsSink

# 指定source使用的channel名字
agent.sources.kafkaSource.channels = memoryChannel
# 指定sink需要使用的channel的名字,注意这里是channel
agent.sinks.hdfsSink.channel = memoryChannel

#-------- kafkaSource相关配置-----------------
# 定义消息源类型
agent.sources.kafkaSource.type = org.apache.flume.source.kafka.KafkaSource
# 定义kafka所在zk的地址
agent.sources.kafkaSource.zookeeperConnect = 192.168.32.128:2181,192.168.32.131:2181,192.168.32.132:2181
# 配置消费的kafka topic
agent.sources.kafkaSource.topic=flumeTest2
# 配置消费者组的id
agent.sources.kafkaSource.groupId = flume
# 消费超时时间,参照如下写法可以配置其他所有kafka的consumer选项。注意格式从kafka.xxx开始是consumer的配置属性
#agent.sources.kafkaSource.kafka.consumer.timeout.ms = 100

#------- memoryChannel相关配置-------------------------
# channel类型
agent.channels.memoryChannel.type = memory
# channel存储的事件容量
agent.channels.memoryChannel.capacity=1000
# 事务容量
agent.channels.memoryChannel.transactionCapacity=100

#---------hdfsSink 相关配置------------------
agent.sinks.hdfsSink.type = hdfs
agent.sinks.hdfsSink.hdfs.path = hdfs://Master:9000/usr/hive/warehouse/test

三.kafka,flume,hadoop集成测试

3.1 hadoop操作

3.1.1 启动dfs+yarn

sbin/start-all.sh

3.1.2 新建/usr/hive/warehouse/test文件并赋予权限

hadoop fs -mkdir /usr/hive/warehouse/test
hadoop dfs -chmod g+w /usr/hive/warehouse/test

3.1.3 查看创建文件

linux集成 kafka数据通过flume发送到hadoop

3.2 kafka操作

3.2.1 创建主题flumeTest2

bin/kafka-topics.sh --create --zookeeper 192.168.32.128:2181,192.168.32.131:2181,192.168.32.132:2181 --replication-factor 3 --partitions 3 --topic flumeTest2

3.2.2 kafka生产者

bin/kafka-console-producer.sh --broker-list 192.168.32.128:9092,192.168.32.131:9092,192.168.32.132:9092 --topic flumeTest2

3.2.3 kafka消费者

bin/kafka-console-consumer.sh --zookeeper 192.168.32.128:2181,192.168.32.131:2181,192.168.32.132:2181 --topic flumeTest2 --from-beginning

3.3 flume操作

bin/flume-ng agent -n agent -c conf -f conf/flume-conf.properties -Dflume.root.logger=INFO,console

linux集成 kafka数据通过flume发送到hadoop

3.4 测试结果

linux集成 kafka数据通过flume发送到hadoop

四. 字符串输出实现集成

上节中实现集成的方式,输出的结果有乱码,类似二进制输出
linux集成 kafka数据通过flume发送到hadoop
本节将换一种方式实现flume,kafka,hadoop集成。

4.1 flume-conf 配置

tier1.sources  = source1
tier1.channels = channel1
tier1.sinks = sink1

tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
tier1.sources.source1.zookeeperConnect = 192.168.32.128:2181,192.168.32.131:2181,192.168.32.132:2181
tier1.sources.source1.topic = flumeTest2
tier1.sources.source1.groupId = flume
tier1.sources.source1.channels = channel1
tier1.sources.source1.interceptors = i1
tier1.sources.source1.interceptors.i1.type = timestamp
tier1.sources.source1.kafka.consumer.timeout.ms = 1000

tier1.channels.channel1.type = memory
tier1.channels.channel1.capacity = 10000
tier1.channels.channel1.transactionCapacity = 1000

tier1.sinks.sink1.type = hdfs
tier1.sinks.sink1.hdfs.path = /tmp/kafka/%{topic}/%y-%m-%d
tier1.sinks.sink1.hdfs.rollInterval = 5
tier1.sinks.sink1.hdfs.rollSize = 0
tier1.sinks.sink1.hdfs.rollCount = 0
tier1.sinks.sink1.hdfs.fileType = DataStream
tier1.sinks.sink1.channel = channel1

4.2 启动flume

bin/flume-ng agent -n tier1 -c conf -f conf/flume-conf.properties -Dflume.root.logger=INFO,console

4.3 测试结果

linux集成 kafka数据通过flume发送到hadoop
linux集成 kafka数据通过flume发送到hadoop