0x01 需求背景
将Kafka中的JSON数据持久化存储到Hive表中,以供后期有查找的需求。
(看了很多讲解的博文,出了各种bug!饶了很多弯路!总结出来的经验就是一定要仔细看Flume的官方文档!!!!!!)
Kafka中的数据示例:
>{"id":1,"name":"snowty","age":25}
Hive表示例:
hive> desc hivetable;
OK
id int
name string
age int
Time taken: 0.162 seconds, Fetched: 3 row(s)
0x02 环境搭建
0x03 Kafka2Hive
1、hive配置
建表时要进行分桶、赋予事务性,需要对hive进行配置
- 修改hive-site.xml文件:
<property>
<name>hive.txn.manager</name>
<value>org.apache.hadoop.hive.ql.lockmgr.DbTxnManager</value>
<description>
Set to org.apache.hadoop.hive.ql.lockmgr.DbTxnManager as part of turning on Hive
transactions, which also requires appropriate settings for hive.compactor.initiator.on,
hive.compactor.worker.threads, hive.support.concurrency (true),
and hive.exec.dynamic.partition.mode (nonstrict).
The default DummyTxnManager replicates pre-Hive-0.13 behavior and provides
no transactions.
</description>
</property>
<property>
<name>hive.support.concurrency</name>
<value>true</value>
<description>
Whether Hive supports concurrency control or not.
A ZooKeeper instance must be up and running when using zookeeper Hive lock manager
</description>
</property>
<property>
<name>hive.metastore.uris</name>
<value>thrift://localhost:9083</value>
<description>Thrift URI for the remote metastore. Used by metastore client to connect to remote metastore.</description>
</property>
- 执行:
$ hive --service metastore & //先启动元数据服务
$ hive
- 创建database、table,其中表有id、name、age这个三个字段
hive> create database hivedatabase;
hive> create table hivetable(id int,name string,age int) clustered by(id) into 2 buckets stored as orc tblproperties('transactional'='true');
- 将`/opt/hive/hcatalog/share/hcatalog/hive-hcatalog-streaming-3.1.1.jar `拷贝到flume的lib文件夹下:
(不拷贝的话会有dependency的问题,困扰了我好久..)
$ sudo cp /opt/hive/hcatalog/share/hcatalog/hive-hcatalog-streaming-3.1.1.jar /opt/flume/lib/
2、flume agent配置文件(详情参考官方文档!花十几分钟看下就豁然开朗了!)
由图可知,flume中需要配置:
- source:读取数据源(此处的数据源为kafka,即kafka source),将数据传输到channel中
- channel:传输数据的通道
- sink:从channel中读取数据,再将其存储到数据库中(此处的数据库为hive,即hive sink)
flume通过配置文件来运行,因此在`/opt/flume/conf`目录下创建` kafka2hive.conf`文件
1). 创建source、channel、sink:
a.sources=source_from_kafka
a.channels=mem_channel
a.sinks=hive_sink
2). 配置kafka source,参考官方文档,kafka的地址、topic必须配置:
例如:
a.sources.source_from_kafka.type=org.apache.flume.source.kafka.KafkaSource
a.sources.source_from_kafka.batchSize=10
a.sources.source_from_kafka.kafka.bootstrap.servers=localhost:9092
a.sources.source_from_kafka.topic=test
a.sources.source_from_kafka.channels=mem_channel
a.sources.source_from_kafka.consumer.timeout.ms=1000
3).配置hive sink:
例如:
a.sinks.hive_sink.type=hive
a.sinks.hive_sink.hive.metastore=thrift://localhost:9083
a.sinks.hive_sink.hive.database=hivedatabase
a.sinks.hive_sink.hive.table=hivetable
a.sinks.hive_sink.hive.txnsPerBatchAsk=2
a.sinks.hive_sink.batchSize=10
a.sinks.hive_sink.serializer=JSON
a.sinks.hive_sink.serializer.fieldnames=id,name,age
4).完整的kafka2hive.conf文件为:
a.sources=source_from_kafka
a.channels=mem_channel
a.sinks=hive_sink
#kafka为souce的配置
a.sources.source_from_kafka.type=org.apache.flume.source.kafka.KafkaSource
a.sources.source_from_kafka.batchSize=10
a.sources.source_from_kafka.kafka.bootstrap.servers=localhost:9092
a.sources.source_from_kafka.topic=test
a.sources.source_from_kafka.channels=mem_channel
a.sources.source_from_kafka.consumer.timeout.ms=1000
#hive为sink的配置
a.sinks.hive_sink.type=hive
a.sinks.hive_sink.hive.metastore=thrift://localhost:9083
a.sinks.hive_sink.hive.database=hivedatabase
a.sinks.hive_sink.hive.table=hivetable
a.sinks.hive_sink.hive.txnsPerBatchAsk=2
a.sinks.hive_sink.batchSize=10
a.sinks.hive_sink.serializer=JSON
a.sinks.hive_sink.serializer.fieldnames=id,name,age
#channel的配置
a.channels.mem_channel.type=memory
a.channels.mem_channel.capacity=1000
a.channels.mem_channel.transactionCapacity=100
#三者之间的关系
a.sources.source_from_kafka.channels=mem_channel
a.sinks.hive_sink.channel=mem_channel
3、运行
- 运行flume
$ flume-ng agent -n a -c /opt/flume/conf -f /opt/flume/conf/kafka2hive.conf -Dflume.root.logger=INFO,console
- 向kafka的topic中传输JSON格式的数据:
>{"id":1,"name":"snowty","age":25}
- 查看hive表:
select * from hivetable;