操作步骤
- 在ClickHouse集群中新建Kafka消费表。
-
CREATE TABLE default.kafka_src_table ON CLUSTER default
( //定义表结构的字段
id Int32,
age Int32,
msg String
) ENGINE = Kafka()
SETTINGS
kafka_broker_list = '*',
kafka_topic_list = 'test',
kafka_group_name = 'test',
kafka_format = 'JSONEachRow';
参数说明
- kafka_broker_list:对应的Kafka集群地址
- kafka_topic_list:对应消费的topic。
- kafka_group_name:消费topic的group,需要先在Kafka中创建
-
- kafka_format:ClickHouse可以处理的数据类型。
- JSONEachRow表示每行一条数据的json格式。一般如果是json格式的话,设置JSONEachRow即可。
- 如果需要输入嵌套的json,请设置input_format_import_nested_json=1。
- 关于ClickHouse支持的各种格式,可以参考官网:Formats for Input and Output Data。
更多属性设置,可以参考:ClickHouse集成kafka。
- 创建ClickHouse目的表。
- 创建本地表。
create table default.kafka_table_local ON CLUSTER default (
id Int32,
age UInt32,
msg String
) ENGINE = ReplicatedMergeTree(
'/clickhouse/tables/kafka_sink_table/{shard}',
'{replica}')
ORDER BY (id);
- 创建分布式表。
CREATE TABLE kafka_table_distributed ON CLUSTER default AS default.kafka_table_local
ENGINE = Distributed(default, default, kafka_table_local, id);
- 创建view把Kafka消费表消费到的数据导入ClickHouse目的表。
CREATE MATERIALIZED VIEW consumer TO kafka_table_distributed AS SELECT * FROM kafka_src_table;
说明 Kafka消费表不能直接作为结果表使用。Kafka消费表只是用来消费Kafka数据,没有真正的存储所有数据。