Doris 基本操作

时间:2023-01-15 20:02:42

一、创建用户和数据库

创建用户

CREATE USER ‘test’ IDENTIFIED BY ‘123456’;

后续登录就可以直接使用命令登录

mysql -h 192.168.1.101 -P9030 -utest -p12345

创建数据库并赋予权限

初始可以通过 root 或 admin 用户创建数据库

create database test_db;

查看数据库

show databases;

授权

grant all on test_db to test;

注意事项:

可以使用 help command 查看语法帮助,不清楚命令全名的话可以使用 ‘help 某一字段’进行模糊查询

information_schema 数据库是为了兼容 MySQL 协议而存在的

二、建表

1、基本概念

Doris 数据都以表 Table 的形式进行逻辑上的描述,一张表包括 行 Row 和 列 Column,Row 就是用户一行数据

从表的角度来看,一张 Table 会拆成多个 Tablet,Tablet 会存成多副本,存储在不同的 BE 中,BE 节点上物理数据的可靠性通过多副本来实现,默认 3 副本

Tablet 和 Partition

Doris 存储引擎中,用户数据被水平划分为若干数据分片 Tablet,也称为数据分桶,每个 Tablet 包含若干行数据,和其他 Tablet 没有交集,物理上独立存储

多个 Tablet 在逻辑上归属于不同的分区 Partition,一个 Tablet 只属于一个 Partition,一个 Partition 包含若干个 Tablet,若干个 Partition 组成一个 Table

Tablet 是数据移动、复制等操作的最小物理存储单元,Partition 可以视为逻辑上最小的管理单元,数据导入和删除,都可以针对一个 Partition 进行

Doris 存储引擎规则

用户数据首先被划分成若干个分区 Partition,划分对规则通常是按照用户指定的分区进行范围划分,比如按时间划分

在每个分区内,数据被进一步按照 Hash 的方式分桶,分桶的规则是要找用户指定的分桶列的值进行 Hash 后分桶,每个分桶就是一个 Tablet,也是最小数据划分逻辑单元

Partition 可以视为逻辑上最小的管理单元,数据的导入与删除,都可以针对一个 Partition 进行

Tablet 直接的数据是没有交集的,独立存储,Tablet 也是数据移动、复制操作的最小物理存储单元

2、创建表

首先需要切换数据库(use test_db)

使用帮助命令可以查看很多案例 help create table

数据类型

tinyint、smallint、int、bigint、largeint、float、double、decimal(precision,scale)、date、datetime、char(length)、varchar(length)、hll、bitmap、agg_type

建表方式

单分区

即数据不分区,只做 HASH 分布,也就是分桶

复合分区

第一级为 Partition,即分区,用户可以指定某一纬度列作为分区列(整型、时间类型),并指定分区取值范围

第二级为 Distribution,即分桶,用户指定一个或多个纬度列以及桶数对数据进行 HASH 分布

使用复合分区的场景

有时间纬度或类似带有有序值的纬度:可以以这类纬度列作为分区列,分区粒度可以根据导入频次、分区数量等进行评估

历史数据删除需求:可以通过删除历史分区来达到目的,也可以通过在指定分区内发送 DELETE 语句进行数据删除

解决数据倾斜问题:每个分区可以单独指定分桶数量,如按天分区,当天的数据量差异很大时,可以通过指定分区的分桶数,合理划分不同分区的数据,分桶列建议选择区分度大的列

3、数据导入

相关文档:​https://doris.apache.org/zh-CN/docs/data-operate/import/load-manual

所有导入方式都支持 CSV 数据格式,Broker load 还支持 parquet 和 orc 数据格式

Stream load

用户通过 HTTP 协议提交请求并携带原始数据创建导入,主要用于快速将本地文件或数据流中的数据导入到 Doris,导入命令同步返回导入结果

curl --location-trusted -u root:123456 -H “label:table1_20221121” -H “column_separator:,” -T table1_data ​​http://hybrid01:8030/api/test_db/table1/_stream_load​​

Doris 基本操作

Insert

类似 MySQL 中的 Insert 语句,Doris 提供 inssert into tbl SELECT …; 的方式从 Doris 表中读取数据并导入到另一张表或 insert into tbl values(…) 插入

insert into table1 values(1,1,‘user1’,10);

Broker load

通过 Broker 进程访问并读取外部数据源导入到 Doris,用户通过 MySQL 协议提交导入任务后,异步执行,show load 命令查看导入结果

具体语法案例见代码

Multi load

用户通过 HTTP 协议提交多个导入作业,Multi Load 可以保证多个导入作业的原子生效

Routine load

通过 MySQL 协议提交例行导入作业,生成一个常驻线程,不间断从数据源中读取数据并导入到 Doris 中

目前仅支持 Kafka 进行导入,支持无认证、SSL 认证的 Kafka 集群,支持的格式为 csv 文本格式,每个 message 为一行,行尾不包括换行符

原理

FE 通过 JobScheduler 将一个导入任务拆分成若干个 Task,每个 Task 负责导入指定的一部分数据,Task 被 TaskScheduler 分配到指定的 BE 上执行

在 BE 上,一个 Task 被视为一个普通的导入任务,通过 Stream Load 的导入机制进行导入,导入完成后向 FE 汇报

FE 中的 JobScheduler 根据汇报结果,继续生成后续新的 Task,或者对失败的 Task 进行重试

整个例行导入作业通过不断的产生新的 Task 来完成数据不间断的导入

具体语法案例见代码

通过 S3 协议直接导入

用法和 Broker Load 类似

三、代码案例

-- 建表语句案例

CREATE [EXTERNAL] TABLE [IF NOT EXISTS] [database.]table_name

(column_definition1[, column_definition2, ...]

[, index_definition1[, index_definition2, ...]])

[ENGINE = [olap|mysql|broker|hive|iceberg]]

[key_desc]

[COMMENT "table comment"];

[partition_desc]

[distribution_desc]

[rollup_index]

[PROPERTIES ("key"="value", ...)]

[BROKER PROPERTIES ("key"="value", ...)]

-- 创建表-单分区表

CREATE TABLE test_db.table1

(

siteid INT DEFAULT '10',

citycode SMALLINT,

username VARCHAR(32) DEFAULT '',

pv BIGINT SUM DEFAULT '0'

)

AGGREGATE KEY(siteid, citycode, username)

DISTRIBUTED BY HASH(siteid) BUCKETS 10

PROPERTIES("replication_num" = "1");

-- insert 导入

insert into table1 values(1,1,'user1',10);

insert into table1 values(1,1,'user1',10);

insert into table1 values(1,2,'user1',10);

-- 创建表-复合分区表

-- event_day 作为分区列,建立三个分区,每个分区使用 siteid 进行哈希分桶,桶数为10

-- p202209: 范围[最小值,2022-10-01]

-- p202210: 范围[2022-10-01, 2022-11-01]

-- p202211: 范围[2022-11-01, 2022-12-01]

-- 左闭右开

CREATE TABLE test_db.table2

(

event_day DATE,

siteid INT DEFAULT '10',

citycode SMALLINT,

username VARCHAR(32) DEFAULT '',

pv BIGINT SUM DEFAULT '0'

)

AGGREGATE KEY(event_day,siteid,citycode,username)

PARTITION BY RANGE(event_day)

(

PARTITION p202209 VALUES LESS THAN ('2022-10-01'),

PARTITION p202210 VALUES LESS THAN ('2022-11-01'),

PARTITION p202211 VALUES LESS THAN ('2022-12-01')

)

DISTRIBUTED BY HASH(siteid) BUCKETS 10

PROPERTIES("replication_num" = "3");

-- Broker 导入,broker_id 就是 broker 的名字,最后一个是能容忍的错误数据量

LOAD LABEL test_db.table2

(

DATA INFILE("hdfs://hybrid01:8020/data/table2_data.csv")

INTO TABLE table2

COLUMNS TERMINATED BY ","

FORMAT AS "csv"

(

event_day,siteid,citycode,username,pv

)

)

WITH BROKER broker_id

(

"dfs.nameservices" = "my_cluster",

"dfs.ha.namenodes.my_cluster" = "nn1,nn2,nn3",

"dfs.namenode.rpc-address.my_cluster.nn1" = "hybrid01:8020",

"dfs.namenode.rpc-address.my_cluster.nn2" = "hybrid02:8020",

"dfs.namenode.rpc-address.my_cluster.nn3" = "hybrid03:8020",

"dfs.client.failover.proxy.provider" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"

)

PROPERTIES

(

"max_filter_ratio" = "0.00002"

);

-- Routine load 案例

CREATE TABLE student_kafka

(

id int,

name varchar(50),

age int

)

DUPLICATE KEY(id)

DISTRIBUTED BY HASH(id) BUCKETS 10;

-- 创建导入任务,任务名字 kafka_job1

CREATE ROUTINE LOAD test_db.kafka_job1 on student_kafka

PROPERTIES

(

"desired_concurrent_number"="1",

"strict_mode"="false",

"format"="json"

)

FROM KAFKA

(

"kafka_broker_list"="hybrid01:9092,hybrid02:9092,hybrid03:9092",

"kafka_topic"="test",

"property.group.id"="test_group_1",

"property.kafka_default_offsets"="OFFSET_BEGINNING",

"property.enable.auto.commit"="false"

);

Doris 基本操作