doris 各种数据导入及数据备份恢复使用方式

时间:2025-04-08 19:13:47

  Apache Doris 代码仓库地址:apache/incubator-doris 欢迎大家关注加星
 


doris mini load

1.创建表(这里采用DUPLICATE模型,也可以用其他模型)

 CREATE TABLE `t_pro_dish_list_detail_test` (
   `order_time` date,
   `shop_id` varchar(32) ,
   `id` varchar(32) ,
   `table_bill_id` varchar(36) ,
   `shop_name` varchar(100) ,
   `dish_type` int,
   `dish_id` varchar(50) ,
   `dish_name` varchar(100) ,
   `standard_code` varchar(32) ,
   `standard_id` varchar(100) ,
   `dish_price` decimal(16,2),
   `served_quantity` int(11),
   `dish_abnormal_status` varchar(20),
   `ts` varchar(20),
   `taste_type_id` varchar(255),
   `taste_name` varchar(255)
 ) 
 DUPLICATE KEY(order_time)
 PARTITION BY RANGE(order_time) ( 
     PARTITION P_202010 VALUES [("2020-10-01"),("2020-11-01")), 
     PARTITION P_202011 VALUES [("2020-11-01"),("2020-12-01"))
 )
 DISTRIBUTED BY HASH(shop_id) BUCKETS 8 
 PROPERTIES( 
 "replication_num" = "2", 
 "dynamic_partition.enable" = "true", 
 "dynamic_partition.time_unit" = "MONTH", 
 "dynamic_partition.start" = "-2147483648", 
 "dynamic_partition.end" = "2", 
 "dynamic_partition.prefix" = "P_", 
 "dynamic_partition.buckets" = "8",
 "dynamic_partition.time_zone" = "Asia/Shanghai"
 );
 说明:
 replication_num : 副本数
 dynamic_partition.enable:  是否开启动态分区
 dynamic_partition.time_unit: 动态分区调度的单位(HOUR、DAY、WEEK、MONTH)
         当指定为 HOUR 时,动态创建的分区名后缀格式为 yyyyMMddHH,例如2020032501
         当指定为 DAY 时,动态创建的分区名后缀格式为 yyyyMMdd,例如20200325
         当指定为 WEEK 时,动态创建的分区名后缀格式为yyyy_ww。即当前日期属于这一年的第几周,例如 2020-            03-25 创建的分区名后缀为 2020_13, 表明目前为2020年第13周
         当指定为 MONTH 时,动态创建的分区名后缀格式为 yyyyMM,例如 202003
 dynamic_partition.start: 动态分区的起始偏移,为负数根据 time_unit 属性的不同,以当天(星期/月)             为基准,分区范围在此偏移之前的分区将会被删除。如果不填写,则默认为 -2147483648,即不删除历史             分区
 dynamic_partition.end:动态分区的结束偏移,为正数。根据 time_unit 属性的不同,以当天(星期/月)为基准,         提前创建对应范围的分区
 dynamic_partition.prefix:动态创建的分区名前缀。
 dynamic_partition.buckets:动态创建的分区所对应的分桶数量
 dynamic_partition.time_zone:动态分区的时区
                                
 CREATE TABLE `t_pro_dish_list_detail_test_demo` (
   `create_time` date,
   `pro_id` varchar(255),  
   `id` int(11) 
 ) 
 DUPLICATE KEY(create_time)
 PARTITION BY RANGE(create_time) (  
     PARTITION P_202011 VALUES [("2020-11-01"),("2020-12-01"))
 )
 DISTRIBUTED BY HASH(pro_id) BUCKETS 2
 PROPERTIES( 
 "replication_num" = "2", 
 "dynamic_partition.enable" = "true", 
 "dynamic_partition.time_unit" = "MONTH", 
 "dynamic_partition.start" = "-2147483648", 
 "dynamic_partition.end" = "2", 
 "dynamic_partition.prefix" = "P_", 
 "dynamic_partition.buckets" = "2" 
 );

2.执行导入(HTTP)

 curl -i -v --location-trusted -u root: -T /root/test/  http://10.220.147.155:8030/api/demo/t_pro_dish_list_detail_test_demo/_load?label=2020-092232-01&column_separator=%2c
 说明:
  label 唯一标识
  column_separator:   用于指定列与列之间的分隔符,默认的为'\t'
                         NOTE: 需要进行url编码,譬如
                         需要指定'\t'为分隔符,那么应该传入'column_separator=%09'
                         需要指定'\x01'为分隔符,那么应该传入'column_separator=%01'
                         需要指定','为分隔符,那么应该传入'column_separator=%2c'

3.查看结果

  load可以查看导入信息,及错误信息
 2.页面system->jobs->操作的数据库名称(DbNAME)->load查看导入信息,及错误信息

4.常见错误

 Reason: actual column number is less than schema column number. actual number: 1 sep:   , schema number: 3; 
 原因:分割符没有生效,临时跳转的时候column_separator=%2c这个参数被丢弃了
 解决方法:
 curl -i -v --location-trusted -u root: -T /root/test/  http://10.220.147.155:8030/api/demo/t_pro_dish_list_detail_test_demo/_load?label=2020-092232-01\&column_separator=%2c
 加转译符"\"

doris Routine(kafka) load

1.提交流程

  向 FE 提交一个例行导入作业。
  通过 JobScheduler 将一个导入作业拆分成若干个 Task。每个 Task 负责导入指定的一部分数据。Task 被  分配到指定的 BE 上执行。
 4.在 BE 上,一个 Task 被视为一个普通的导入任务,通过 Stream Load 的导入机制进行导入。导入完成后,向 FE 汇报。
  中的 JobScheduler 根据汇报结果,继续生成后续新的 Task,或者对失败的 Task 进行重试。
 整个例行导入作业通过不断的产生新的 Task,来完成数据不间断的导入

(介绍)

 1.功能:
   支持用户提交一个常驻的导入任务,通过不断的从指定的数据源读取数据,将数据导入到 Doris 中。
   目前仅支持通过无认证或者 SSL 认证方式,从 Kakfa 导入文本格式(CSV)的数据。
 2.语法:
    CREATE ROUTINE LOAD [db.]job_name ON tbl_name
     [merge_type]
     [load_properties]
     [job_properties]
     FROM data_source
     [data_source_properties]
   说明:
    1. [db.]job_name
         导入作业的名称,在同一个 database 内,相同名称只能有一个 job 在运行。
    2. tbl_name
         指定需要导入的表的名称。
    3. merge_type
         数据的合并类型,一共支持三种类型APPEND、DELETE、MERGE 其中,APPEND是默认值,表示这批数据全部需要追加到现有数据中,DELETE 表示删除与这批数据key相同的所有行,MERGE 语义 需要与delete on条件联合使用,表示满足delete 条件的数据按照DELETE 语义处理其余的按照APPEND 语义处理, 语法为[WITH MERGE|APPEND|DELETE]
    4. load_properties
          用于描述导入数据。语法:
         [column_separator],
         [columns_mapping],
         [where_predicates],
         [delete_on_predicates],
         [source_sequence],
         [partitions]
         1. column_separator:
            指定列分隔符,如:COLUMNS TERMINATED BY ",",默认为:\t
         2. columns_mapping: 指定源数据中列的映射关系,以及定义衍生列的生成方式。
            1. 映射列:按顺序指定,源数据中各个列,对应目的表中的哪些列。对于希望跳过的列,可以指定一个不存在的列名。假设目的表有三列 k1, k2, v1。源数据有4列,其中第1、2、4列分别对应 k2, k1, v1。则书写如下:
             COLUMNS (k2, k1, xxx, v1)
             其中 xxx 为不存在的一列,用于跳过源数据中的第三列。
            2. 衍生列:以 col_name = expr 的形式表示的列,我们称为衍生列。即支持通过 expr 计算得出目的表中对应列的值。衍生列通常排列在映射列之后,虽然这不是强制的规定,但是 Doris 总是先解析映射列,再解析衍生列。
 接上一个示例,假设目的表还有第4列 v2,v2 由 k1 和 k2 的和产生。则可以书写如下:
             COLUMNS (k2, k1, xxx, v1, v2 = k1 + k2);
 ​
         3. where_predicates
             用于指定过滤条件,以过滤掉不需要的列。过滤列可以是映射列或衍生列。
             例如我们只希望导入 k1 大于 100 并且 k2 等于 1000 的列,则书写如下:
             WHERE k1 > 100 and k2 = 1000
         4. partitions
             指定导入目的表的哪些 partition 中。如果不指定,则会自动导入到对应的 partition 中。
             示例:
             PARTITION(p1, p2, p3)
         5. delete_on_predicates
             表示删除条件,仅在 merge type 为MERGE 时有意义,语法与where 相同
         6. source_sequence:
             只适用于UNIQUE_KEYS,相同key列下,保证value列按照source_sequence列进行REPLACE,                   source_sequence可以是数据源中的列,也可以是表结构中的一列。
     5. job_properties
             用于指定例行导入作业的通用参数。
         语法:
         PROPERTIES (
             "key1" = "val1",
             "key2" = "val2"
         )

3.参数

1.desired_concurrent_number
期望的并发度。一个例行导入作业会被分成多个子任务执行。这个参数指定一个作业最多有多少任务可以同时执行。必须大于0。默认为3。这个并发度并不是实际的并发度,实际的并发度,会通过集群的节点数、负载情况,以及数据源的情况综合考虑。 例:"desired_concurrent_number" = "3"
2.max_batch_interval/max_batch_rows/max_batch_size
这三个参数分别表示:
            1)每个子任务最大执行时间,单位是秒。范围为 5 到 60。默认为10。
            2)每个子任务最多读取的行数。必须大于等于200000。默认是200000。
            3)每个子任务最多读取的字节数。单位是字节,范围是 100MB 到 1GB。默认是 100MB
这三个参数,用于控制一个子任务的执行时间和处理量。当任意一个达到阈值,则任务结束。
例:  "max_batch_interval" = "20",
     "max_batch_rows" = "300000",
     "max_batch_size" = "209715200"
3.max_error_number
采样窗口内,允许的最大错误行数。必须大于等于0。默认是 0,即不允许有错误行。采样窗口为 max_batch_rows * 10。即如果在采样窗口内,错误行数大于 max_error_number,则会导致例行作业被暂停,需要人工介入检查数据质量问题。 被 where 条件过滤掉的行不算错误行。
4.strict_mode
是否开启严格模式,默认为关闭。如果开启后,非空原始数据的列类型变换如果结果为 NULL,则会被过滤。指定方式为 "strict_mode" = "true"

指定导入作业所使用的时区。默认为使用 Session 的 timezone 参数。该参数会影响所有导入涉及的和时区有关的函数结果。

指定导入数据格式,默认是csv,支持json格式。"format" = "json"

 导入json方式分为:简单模式和匹配模式。如果设置了jsonpath则为匹配模式导入,否则为简单模式导入
 "jsonpaths" = "[\"$.category\",\"$.author\",\"$.price\",\"$.timestamp\"]"
8.json_root
json_root为合法的jsonpath字符串,用于指定json document的根节点,默认值为"",
{
	"data": [{
			"category": "11",
			"title": "SayingsoftheCentury",
			"price": 895,
			"timestamp": 1589191587
		},
		{
			"category": "22",
			"author": "2avc",
			"price": 895,
			"timestamp": 1589191487
		},
		{
			"category": "33",
			"author": "3avc",
			"title": "SayingsoftheCentury",
			"timestamp": 1589191387
		}
	]
}
"json_root" = "$.data"
9.strip_outer_array
布尔类型,为true表示json数据以数组对象开始且将数组对象中进行展平,默认值是false。
[{
		"category": "11",
		"title": "SayingsoftheCentury",
		"price": 895,
		"timestamp": 1589191587
	},
	{
		"category": "22",
		"author": "2avc",
		"price": 895,
		"timestamp": 1589191487
	},
	{
		"category": "33",
		"author": "3avc",
		"title": "SayingsoftheCentury",
		"timestamp": 1589191387
	}
] 
"strip_outer_array" = "true"
10.data_source
数据源的类型 	 FROM KAFKA
11.data_source_properties
指定数据源相关的信息。
语法:(
            "key1" = "val1",
            "key2" = "val2"
     )
例:
     (
            "kafka_broker_list" = "test-dev-bigdata5:9092,test-dev-bigdata6:9092,test-dev-bigdata7:9092",
            "kafka_topic" = "test_doris_kafka_load",	
            "kafka_partitions" = "0",
            "kafka_offsets" = "0,OFFSET_BEGINNING,OFFSET_END"
      );
OFFSET_BEGINNING: 从有数据的位置开始订阅
OFFSET_END: 从末尾开始订阅
12.导入数据格式样例
        整型类(TINYINT/SMALLINT/INT/BIGINT/LARGEINT):1, 1000, 1234
        浮点类(FLOAT/DOUBLE/DECIMAL):1.1, 0.23, .356
        日期类(DATE/DATETIME):2017-10-03, 2017-06-13 12:34:03。
        字符串类(CHAR/VARCHAR)(无引号):I am a student, a
        NULL值:\N

4.示例

####示例1
1.创建表
CREATE TABLE `example_table` (
  `id` int,
  `name` varchar(11),  
  `age` int,
  `address` varchar(50)
) 
DISTRIBUTED BY HASH(id) BUCKETS 2
PROPERTIES( 
"replication_num" = "2"
);
2.创建ROUTINE
CREATE ROUTINE LOAD example_db.test_json_label_1 ON example_table
        COLUMNS(id,age,name,address)
        PROPERTIES
        (
        "desired_concurrent_number"="2",
        "max_batch_interval" = "20",
        "max_batch_rows" = "300000",
        "max_batch_size" = "209715200",
        "strict_mode" = "false",
        "format" = "json"
        )
        FROM KAFKA
        (
        "kafka_broker_list" = "test-dev-bigdata5:9092,test-dev-bigdata6:9092,test-dev-bigdata7:9092",
        "kafka_topic" = "test_doris_kafka_load",
        "kafka_partitions" = "0",
        "kafka_offsets" = "0"
        );
说明:
example_db 数据库
example_table 表名称
test_json_label_1 唯一任务标识
格式:
{
  "id": 1,
  "age": 18,
  "name": "曹丽娜",
  "address": "china"
}
####示例2
1.创建表
CREATE TABLE `example_tbl` (
        `category` varchar(24) NULL COMMENT "",
        `author` varchar(24) NULL COMMENT "",
        `timestamp` bigint(20) NULL COMMENT "",
        `dt` int(11) NULL COMMENT "",
        `price` double REPLACE
        ) ENGINE=OLAP
        AGGREGATE KEY(`category`,`author`,`timestamp`,`dt`)
        COMMENT "OLAP"
        PARTITION BY RANGE(`dt`)
        (PARTITION p0 VALUES [("-2147483648"), ("20200509")),
        PARTITION p20200509 VALUES [("20200509"), ("20200510")),
        PARTITION p20200510 VALUES [("20200510"), ("20200511")),
        PARTITION p20200511 VALUES [("20200511"), ("20200512")))
        DISTRIBUTED BY HASH(`category`,`author`,`timestamp`) BUCKETS 4
        PROPERTIES (
            "storage_type" = "COLUMN",
            "replication_num" = "1"
);
2.创建ROUTINE
CREATE ROUTINE LOAD example_db.test1 ON example_tbl
        COLUMNS(category, author, price, timestamp, dt=from_unixtime(timestamp, '%Y%m%d'))
        PROPERTIES
        (
            "desired_concurrent_number"="2",
            "max_batch_interval" = "20",
            "max_batch_rows" = "300000",
            "max_batch_size" = "209715200",
            "strict_mode" = "false",
            "format" = "json",
            "jsonpaths" = "[\"$.category\",\"$.author\",\"$.price\",\"$.timestamp\"]",
            "strip_outer_array" = "true"
        )
        FROM KAFKA
        (
            "kafka_broker_list" = "test-dev-bigdata5:9092,test-dev-bigdata6:9092,test-dev-bigdata7:9092",
            "kafka_topic" = "test_doris_kafka_load",
            "" = "test1", 
            "" = "test1",
            "kafka_partitions" = "0",
            "kafka_offsets" = "0"
        );
3.格式
[{
		"category": "11",
		"title": "SayingsoftheCentury",
		"price": 895,
		"timestamp": 1589191587
	},
	{
		"category": "22",
		"author": "2avc",
		"price": 895,
		"timestamp": 1589191487
	},
	{
		"category": "33",
		"author": "3avc",
		"title": "SayingsoftheCentury",
		"timestamp": 1589191387
	}
] 
####示例3
CREATE ROUTINE LOAD example_db.test3 ON example_tbl
        COLUMNS(category, author, price, timestamp, dt=from_unixtime(timestamp, '%Y%m%d'))
        PROPERTIES
        (
            "desired_concurrent_number"="2",
            "max_batch_interval" = "20",
            "max_batch_rows" = "300000",
            "max_batch_size" = "209715200",
            "strict_mode" = "false",
            "format" = "json",
            "jsonpaths" = "[\"$.category\",\"$.author\",\"$.price\",\"$.timestamp\"]",
            "strip_outer_array" = "true"
            "json_root" = "$.data"
        )
        FROM KAFKA
        (
            "kafka_broker_list" = "test-dev-bigdata5:9092,test-dev-bigdata6:9092,test-dev-bigdata7:9092",
            "kafka_topic" = "test_doris_kafka_load",
            "kafka_partitions" = "0",
            "kafka_offsets" = "8"
        );
格式:
{
	"data": [{
			"category": "11",
			"title": "SayingsoftheCentury",
			"price": 895,
			"timestamp": 1589191587
		},
		{
			"category": "22",
			"author": "2avc",
			"price": 895,
			"timestamp": 1589191487
		},
		{
			"category": "33",
			"author": "3avc",
			"title": "SayingsoftheCentury",
			"timestamp": 1589191387
		}
	]
}
####示例4
 为 example_db 的 example_tbl 创建一个名为 test1 的 Kafka 例行导入任务。并且删除与v3 >100 行相匹配的key列的行                                   
 CREATE ROUTINE LOAD example_db.test1 ON example_tbl
        WITH MERGE
        COLUMNS(k1, k2, k3, v1, v2, v3),
        WHERE k1 > 100 and k2 like "%doris%",
        DELETE ON v3 >100
        PROPERTIES
        (
            "desired_concurrent_number"="3",
            "max_batch_interval" = "20",
            "max_batch_rows" = "300000",
            "max_batch_size" = "209715200",
            "strict_mode" = "false"
        )
        FROM KAFKA  
        (
            "kafka_broker_list" = "test-dev-bigdata5:9092,test-dev-bigdata6:9092,test-dev-bigdata7:9092",
            "kafka_topic" = "test_doris_kafka_load",
            "kafka_partitions" = "0",
            "kafka_offsets" = "8"
        );
####示例5
导入数据到含有sequence列的UNIQUE_KEYS表中
        CREATE ROUTINE LOAD example_db.test_job ON example_tbl
        COLUMNS TERMINATED BY ",",
        COLUMNS(k1,k2,source_sequence,v1,v2),
        ORDER BY source_sequence
        PROPERTIES
        (
            "desired_concurrent_number"="3",
            "max_batch_interval" = "30",
            "max_batch_rows" = "300000",
            "max_batch_size" = "209715200"
        ) FROM KAFKA
        (
            "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
            "kafka_topic" = "my_topic",
            "kafka_partitions" = "0,1,2,3",
            "kafka_offsets" = "101,0,0,200"
        );                                

5.查看任务状态

查看test1的运行状态
SHOW ROUTINE LOAD TASK WHERE JobName = "test1";
停止test1的运行状态
STOP ROUTINE LOAD FOR test1;
暂停test1的运行状态
PAUSE ROUTINE LOAD FOR test1;
恢复名称为 test1 的例行导入作业。
RESUME ROUTINE LOAD FOR test1;

6.查看导入信息及日志

system->routine_loads

7.遇到错误

1.一直提交,但是没有数据进来,不会报错
原因:连接kafka集群的时候采用ip连接就会出现上面那种情况,建议换成主机名
2.出现消费kafka任务终止的情况,是因为下面那三个参数没有设置合理
max_batch_interval/max_batch_rows/max_batch_size
这三个参数用于控制单个任务的执行时间。其中任意一个阈值达到,则任务结束。其中 max_batch_rows 用于记录从 Kafka 中读取到的数据行数。max_batch_size 用于记录从 Kafka 中读取到的数据量,单位是字节。目前一个任务的消费速率大约为 5-10MB/s。
那么假设一行数据 500B,用户希望每 100MB 或 10 秒为一个 task。100MB 的预期处理时间是 10-20 秒,对应的行数约为 200000 行。则一个合理的配置为:
"max_batch_interval" = "10",
"max_batch_rows" = "200000",
"max_batch_size" = "104857600"
这三个参数需要合理设置

doris broker loda

1.提交流程

用户在提交导入任务后,FE 会生成对应的 Plan 并根据目前 BE 的个数和文件的大小,将 Plan 分给 多个 BE 执行,每个 BE 执行一部分导入数据。
BE 在执行的过程中会从 Broker 拉取数据,在对数据 transform 之后将数据导入系统。所有 BE 均完成导入,由 FE 最终决定导入是否成功。

2.创建broker_name

添加
ALTER SYSTEM ADD BROKER broker_name_1 "test-dev-bigdata5:8000";
ALTER SYSTEM ADD BROKER broker_name_2 "test-dev-bigdata6:8000";
ALTER SYSTEM ADD BROKER broker_name_3 "test-dev-bigdata7:8000";

删除
ALTER SYSTEM DROP BROKER broker_name01 "test-pro-doris-01:8000";

3.示例

####多个表导入
1.创建表
CREATE TABLE `test1` (
  `id` int,
  `name` varchar(11)
) 
DISTRIBUTED BY HASH(id) BUCKETS 2
PROPERTIES( 
"replication_num" = "2"
);

CREATE TABLE `test2` (
  `col1` int,
  `col2` varchar(11)
) 
DISTRIBUTED BY HASH(id) BUCKETS 2
PROPERTIES( 
"replication_num" = "2"
);
2.创建LABEL
LOAD LABEL example_db.label1
(
    DATA INFILE("hdfs://10.220.147.151:8020/tmp/palo/file")
    INTO TABLE test1
    COLUMNS TERMINATED BY ","
    (id,name)
    ,
    DATA INFILE("hdfs://10.220.147.151:8020/tmp/palo/file1")
    INTO TABLE test2
    COLUMNS TERMINATED BY ","
    (col1, col2)
)
WITH BROKER 'broker_name_2'
PROPERTIES
(
    "timeout" = "3600"
);

doris spark load

1.提交流程

 调度提交 ETL 任务到 Spark 集群执行。
 集群执行 ETL 完成对导入数据的预处理。包括全局字典构建(BITMAP类型)、分区、排序、聚合等。
 任务完成后,FE 获取预处理过的每个分片的数据路径,并调度相关的 BE 执行 Push 任务。
 通过 Broker 读取数据,转化为 Doris 底层存储格式。
 调度生效版本,完成导入任务。

2.配置FE节点

1下载spark依赖包
 -zxvf spark-1.5.1-bin-hadoop2. 解压即可
3.将spark客户端下的jars文件夹内所有jar包归档打包成一个zip文件
4.安装yarn客户端
 ../添加
  enable_spark_load = true
  spark_home_default_dir = /usr/local/spark2
  spark_resource_path = /usr/local/spark2/
  yarn_client_path = /usr/local/hadoop/bin/yarn

3.配置 ETL 集群

####语法:
-- create spark resource 创建
CREATE EXTERNAL RESOURCE resource_name
PROPERTIES
(
  type = spark,
  spark_conf_key = spark_conf_value,
  working_dir = path,
  broker = broker_name,
  broker.property_key = property_value
)

-- drop spark resource 删除
DROP RESOURCE resource_name

-- show resources 查看
SHOW RESOURCES
SHOW PROC "/resources"

-- privileges 赋权限
GRANT USAGE_PRIV ON RESOURCE resource_name TO user_identity
GRANT USAGE_PRIV ON RESOURCE resource_name TO ROLE role_name

-- 例子:授予spark0资源的使用权限给用户user0
GRANT USAGE_PRIV ON RESOURCE "spark0" TO "user0"@"%";

####参数说明
resource_name 为 Doris 中配置的 Spark 资源的名字
Spark 相关参数如下:
: 必填,目前支持yarn,spark://host:port。
: Spark 程序的部署模式,必填,支持 cluster,client 两种。
: master为yarn时必填。
: master为yarn时必填。
其他参数为可选,参考/docs/latest/
working_dir: ETL 使用的目录。spark作为ETL资源使用时必填。例如:hdfs://host:port/tmp/doris。
broker: broker 名字。spark作为ETL资源使用时必填。需要使用ALTER SYSTEM ADD BROKER 命令提前完成配置。
broker.property_key: broker读取ETL生成的中间文件时需要指定的认证信息等。

####示例配置 ETL 集群cluster模式
CREATE EXTERNAL RESOURCE "spark6"
PROPERTIES
(
  "type" = "spark",
  "" = "yarn",
  "" = "cluster",
  "" = "1g",
  "" = "queue0",
  "" = "hdfs://test-dev-bigdata1:8032",
  "" = "hdfs://test-dev-bigdata1:8020",
  "working_dir" = "hdfs://test-dev-bigdata1:8020/tmp/doris",
  "broker" = "broker_name_1"
);

####client模式
CREATE EXTERNAL RESOURCE "spark2"
PROPERTIES
(
  "type" = "spark",
  "" = "spark://10.220.147.155:7077",
  "" = "client",
  "working_dir" = "hdfs://10.220.147.151:8020/tmp/doris",
  "broker" = "broker_name_2"
);

4.示例

####上游数据源是hive表的情况
####step 1:新建hive外部表
CREATE EXTERNAL TABLE hive_t2
(
    orderid INT,
    createtime varchar(25),
    modifiedtime varchar(50),
    status varchar(100),
    dt varchar(100)
)
ENGINE=hive
properties
(
"database" = "test",
"table" = "ods_orders",
"" = "thrift://10.220.147.151:9083"
);
####step 2:创建doris表
CREATE TABLE `test_hive` (
    orderid INT,
    createtime varchar(25),
    modifiedtime varchar(50),
    status varchar(100),
    dt varchar(100)
) 
DISTRIBUTED BY HASH(orderid) BUCKETS 2
PROPERTIES( 
"replication_num" = "2"
);
#####step 3:提交load命令,要求导入的 doris 表中的列必须在 hive 外部表中存在。
LOAD LABEL demo.label25
(
    DATA FROM TABLE hive_t2
    INTO TABLE test_hive
)
WITH RESOURCE 'spark0'
(
    "" = "2g",
    "" = "true"
)
PROPERTIES
(
    "timeout" = "3600" //注:数据量特别大的,时间可以设置长
);

####上游数据源为hdfs文件的情况
####step 1:创建表
CREATE TABLE `test3` (
  `id` int,
  `name` varchar(11)
) 
DISTRIBUTED BY HASH(id) BUCKETS 2
PROPERTIES( 
"replication_num" = "2"
);
####step 2:提交load命令
LOAD LABEL demo.label23
(
    DATA INFILE("hdfs://10.220.147.151:8020/tmp/palo/file1")
    INTO TABLE test3
    COLUMNS TERMINATED BY ","
    (id,name)
    
)
WITH RESOURCE 'spark0'
(
    "" = "2g",
    "" = "true"
)
PROPERTIES
(
    "timeout" = "3600"
);

4.常见错误

:ETL_SUBMIT_FAIL; msg:errCode = 2, detailMessage = start spark app failed. error: Waiting too much time to get appId from handle. spark app state: UNKNOWN, loadJobId:16020
出现上面这个情况需要去看具体的错误,目录:/soft/doris-fe/log/spark_launcher_log
2.1064 - errCode = 2, detailMessage = Spark Load is coming soon
打压缩包的时候只需要把jar文件下的包打进去,不需要把jar目录打进去
:ETL_SUBMIT_FAIL; msg:errCode = 2, detailMessage = errCode = 2, detailMessage = failed to upload lib to repository, srcPath=/usr/local/spark2/ destPath=hdfs://10.220.147.151:8020/tmp/doris/2113522669/__spark_repository__spark0/__archive_1.0.0/__lib__spark message=errCode = 2, detailMessage = Read file exception. filePath=/usr/local/spark2/
原因:配置  spark_resource_path = /usr/local/spark2/ 这个地址写错了
4. 查看yarn上面的报错信息:Path does not exist: hdfs://10.220.147.151:8020/tmp/doris/jobs/11001/label17/18009/configs/;
的错误信息是:$JAVA_HOEM not set
原因java_home没有设置,查看JAVA_HOME是否配置,如果配置,则需要在hadoop/libexec/添加export JAVA_HOEM=/usr/local/java,因为在fe要用hadoop/libexec/

DDL

添加分区
ALTER TABLE ods.ods_pos_pro_dish_list_detail_delta  ADD PARTITION P_20150101 VALUES [("2015-01-01 00:00:00"),("2016-01-01 00:00:00"));
删除分区
ALTER TABLE ods_pos_pro_dish_list_detail_delta DROP PARTITION P_20200101;
设置动态分区
ALTER TABLE ods.ods_pos_pro_dish_list_detail_delta SET
(
    "dynamic_partition.enable" = "true"
);
修改副本数
ALTER TABLE ods.ods_pos_pro_dish_list_detail_delta SET
(
    "replication_num" = "6"
);
1) 设置数据库数据量配额,单位为B/K/KB/M/MB/G/GB/T/TB/P/PB
        ALTER DATABASE db_name SET DATA QUOTA quota;
2) 重命名数据库
        ALTER DATABASE db_name RENAME new_db_name;
3) 设置数据库的副本数量配额
        ALTER DATABASE db_name SET REPLICA QUOTA quota;                                                                                      
添加字段
    ALTER TABLE example_db.my_table ADD COLUMN dish_type_code varchar(20) DEFAULT NULL 
添加字段时出错:errCode = 2, detailMessage = Create replicas failed. Error: Error replicas:10003=5341268, 10003=5341296, 10003=5341288
解决: ADMIN SET FRONTEND CONFIG ("tablet_create_timeout_second"="10");
      ADMIN SET FRONTEND CONFIG ("max_create_table_timeout_second"="1000");                修改字段注释
    ALTER TABLE tb_user MODIFY COLUMN name VARCHAR(30) NOT NULL COMMENT '姓名2';

doris导出数据

SELECT * FROM ods_pos_pro_taste_name_delta
INTO OUTFILE "hdfs://test-pro-cdh-namenode1:8020/tmp/palo/result_"
FORMAT AS CSV
PROPERTIES
(
    "" = "broker_name01",
    "column_separator" = ",",
    "line_delimiter" = "\n",
    "max_file_size" = "10MB"
);

###参数介绍
column_separator:列分隔符,仅对 CSV 格式适用。默认为 \t。
line_delimiter:行分隔符,仅对 CSV 格式适用。默认为 \n。
max_file_size:单个文件的最大大小。默认为 1GB。取值范围在 5MB 到 2GB 之间。超过这个大小的文件将会被切分。

doris备份和迁移

####创建语法(help CREATE REPOSITORY;)
 该语句用于创建仓库。仓库用于属于备份或恢复。仅 root 或 superuser 用户可以创建仓库。
    语法:
        CREATE [READ ONLY] REPOSITORY `repo_name`
        WITH BROKER `broker_name`
        ON LOCATION `repo_location`
        PROPERTIES ("key"="value", ...);
            
    说明:
        1. 仓库的创建,依赖于已存在的 broker
        2. 如果是只读仓库,则只能在仓库上进行恢复。如果不是,则可以进行备份和恢复操作。
        3. 根据 broker 的不同类型,PROPERTIES 有所不同,具体见示例。
Examples:
    1. 创建名为 bos_repo 的仓库,依赖 BOS broker "bos_broker",数据根目录为:bos://palo_backup
        CREATE REPOSITORY `bos_repo`
        WITH BROKER `bos_broker`
        ON LOCATION "bos://palo_backup"
        PROPERTIES
        (
            "bos_endpoint" = "",
            "bos_accesskey" = "069fc2786e664e63a5f111111114ddbs22",
            "bos_secret_accesskey"="70999999999999de274d59eaa980a"
        );
     
    2. 创建和示例 1 相同的仓库,但属性为只读:
        CREATE READ ONLY REPOSITORY `bos_repo`
        WITH BROKER `bos_broker`
        ON LOCATION "bos://palo_backup"
        PROPERTIES
        (
            "bos_endpoint" = "",
            "bos_accesskey" = "069fc2786e664e63a5f111111114ddbs22",
            "bos_secret_accesskey"="70999999999999de274d59eaa980a"
        );

    3. 创建名为 hdfs_repo 的仓库,依赖 Baidu hdfs broker "hdfs_broker",数据根目录为:hdfs://hadoop-name-node:54310/path/to/repo/
        CREATE REPOSITORY `hdfs_repo`
        WITH BROKER `hdfs_broker`
        ON LOCATION "hdfs://hadoop-name-node:54310/path/to/repo/"
        PROPERTIES
        (
            "username" = "user",
            "password" = "password"
        );
##查看远程仓库命令
show REPOSITORY;
  
##备份(help BACKUP)
该语句用于备份指定数据库下的数据。该命令为异步操作。提交成功后,需通过 SHOW BACKUP 命令查看进度。仅支持备份 OLAP 类型的表。
    语法:
        BACKUP SNAPSHOT [db_name].{snapshot_name}
        TO `repository_name`
        ON (
            `table_name` [PARTITION (`p1`, ...)],
            ...
        )
        PROPERTIES ("key"="value", ...);
            
    说明:
        1. 同一数据库下只能有一个正在执行的 BACKUP 或 RESTORE 任务。
        2. ON 子句中标识需要备份的表和分区。如果不指定分区,则默认备份该表的所有分区。
        3. PROPERTIES 目前支持以下属性:
                "type" = "full":表示这是一次全量更新(默认)。
                "timeout" = "3600":任务超时时间,默认为一天。单位秒。
Examples:

    1. 全量备份 example_db 下的表 example_tbl 到仓库 example_repo 中:
        BACKUP SNAPSHOT example_db.snapshot_label1
        TO example_repo
        ON (example_tbl)
        PROPERTIES ("type" = "full");
        
    2. 全量备份 example_db 下,表 example_tbl 的 p1, p2 分区,以及表 example_tbl2 到仓库 example_repo 中:
        BACKUP SNAPSHOT example_db.snapshot_label2
        TO example_repo
        ON 
        (
            example_tbl PARTITION (p1,p2),
            example_tbl2
        );
###查看最近一次 backup 作业的执行情况,包括
JobId:本次备份作业的 id。
SnapshotName:用户指定的本次备份作业的名称(Label)。
DbName:备份作业对应的 Database。
State:备份作业当前所在阶段:
PENDING:作业初始状态。
SNAPSHOTING:正在进行快照操作。
UPLOAD_SNAPSHOT:快照结束,准备上传。
UPLOADING:正在上传快照。
SAVE_META:正在本地生成元数据文件。
UPLOAD_INFO:上传元数据文件和本次备份作业的信息。
FINISHED:备份完成。
CANCELLED:备份失败或被取消。
BackupObjs:本次备份涉及的表和分区的清单。
CreateTime:作业创建时间。
SnapshotFinishedTime:快照完成时间。
UploadFinishedTime:快照上传完成时间。
FinishedTime:本次作业完成时间。
UnfinishedTasks:在 SNAPSHOTTING,UPLOADING 等阶段,会有多个子任务在同时进行,这里展示的当前阶段,未完成的子任务的 task id。
TaskErrMsg:如果有子任务执行出错,这里会显示对应子任务的错误信息。
Status:用于记录在整个作业过程中,可能出现的一些状态信息。
Timeout:作业的超时时间,单位是秒。
###命令:SHOW BACKUP
###查看远端仓库中已存在的备份。
 1. 查看仓库 example_repo 中已有的备份:
        SHOW SNAPSHOT ON example_repo;
        
2. 仅查看仓库 example_repo 中名称为 backup1 的备份:
        SHOW SNAPSHOT ON example_repo WHERE SNAPSHOT = "backup1";
        
3. 查看仓库 example_repo 中名称为 backup1 的备份,时间版本为 "2018-05-05-15-34-26" 的详细信息:
        SHOW SNAPSHOT ON example_repo
        WHERE SNAPSHOT = "backup1" AND TIMESTAMP = "2018-05-05-15-34-26";

###参数说明:
Snapshot:备份时指定的该备份的名称(Label)。
Timestamp:备份的时间戳。
Status:该备份是否正常。
        
###恢复(help RESTORE)
 1. RESTORE
    该语句用于将之前通过 BACKUP 命令备份的数据,恢复到指定数据库下。该命令为异步操作。提交成功后,需通过 SHOW RESTORE 命令查看进度。仅支持恢复 OLAP 类型的表。
    语法:
        RESTORE SNAPSHOT [db_name].{snapshot_name}
        FROM `repository_name`
        ON (
            `table_name` [PARTITION (`p1`, ...)] [AS `tbl_alias`],
            ...
        )
        PROPERTIES ("key"="value", ...);
            
    说明:
        1. 同一数据库下只能有一个正在执行的 BACKUP 或 RESTORE 任务。
        2. ON 子句中标识需要恢复的表和分区。如果不指定分区,则默认恢复该表的所有分区。所指定的表和分区必须已存在于仓库备份中。
        3. 可以通过 AS 语句将仓库中备份的表名恢复为新的表。但新表名不能已存在于数据库中。分区名称不能修改。
        4. 可以将仓库中备份的表恢复替换数据库中已有的同名表,但须保证两张表的表结构完全一致。表结构包括:表名、列、分区、Rollup等等。
        5. 可以指定恢复表的部分分区,系统会检查分区 Range 是否能够匹配。
        6. PROPERTIES 目前支持以下属性:
                "backup_timestamp" = "2018-05-04-16-45-08":指定了恢复对应备份的哪个时间版本,必填。该信息可以通过 `SHOW SNAPSHOT ON repo;` 语句获得。
                "replication_num" = "3":指定恢复的表或分区的副本数。默认为3。若恢复已存在的表或分区,则副本数必须和已存在表或分区的副本数相同。同时,必须有足够的 host 容纳多个副本。
                "timeout" = "3600":任务超时时间,默认为一天。单位秒。
                "meta_version" = 40:使用指定的 meta_version 来读取之前备份的元数据。注意,该参数作为临时方案,仅用于恢复老版本 Doris 备份的数据。最新版本的备份数据中已经包含 meta version,无需再指定。
Examples:
    1. 从 example_repo 中恢复备份 snapshot_1 中的表 backup_tbl 到数据库 example_db1,时间版本为 "2018-05-04-16-45-08"。恢复为 1 个副本:
        RESTORE SNAPSHOT example_db1.`snapshot_1`
        FROM `example_repo`
        ON ( `backup_tbl` )
        PROPERTIES
        (
            "backup_timestamp"="2018-05-04-16-45-08",
            "replication_num" = "1"
        );
        
    2. 从 example_repo 中恢复备份 snapshot_2 中的表 backup_tbl 的分区 p1,p2,以及表 backup_tbl2 到数据库 example_db1,并重命名为 new_tbl,时间版本为 "2018-05-04-17-11-01"。默认恢复为 3 个副本:
        RESTORE SNAPSHOT example_db1.`snapshot_2`
        FROM `example_repo`
        ON
        (
            `backup_tbl` PARTITION (`p1`, `p2`),
            `backup_tbl2` AS `new_tbl`
        )
        PROPERTIES
        (
            "backup_timestamp"="2018-05-04-17-11-01"
        );
###查看最近一次 restore 作业的执行情况,包括:
JobId:本次恢复作业的 id。
Label:用户指定的仓库中备份的名称(Label)。
Timestamp:用户指定的仓库中备份的时间戳。
DbName:恢复作业对应的 Database。
State:恢复作业当前所在阶段:
PENDING:作业初始状态。
SNAPSHOTING:正在进行本地新建表的快照操作。
DOWNLOAD:正在发送下载快照任务。
DOWNLOADING:快照正在下载。
COMMIT:准备生效已下载的快照。
COMMITTING:正在生效已下载的快照。
FINISHED:恢复完成。
CANCELLED:恢复失败或被取消。
AllowLoad:恢复期间是否允许导入。
ReplicationNum:恢复指定的副本数。
RestoreObjs:本次恢复涉及的表和分区的清单。
CreateTime:作业创建时间。
MetaPreparedTime:本地元数据生成完成时间。
SnapshotFinishedTime:本地快照完成时间。
DownloadFinishedTime:远端快照下载完成时间。
FinishedTime:本次作业完成时间。
UnfinishedTasks:在 SNAPSHOTTING,DOWNLOADING, COMMITTING 等阶段,会有多个子任务在同时进行,这里展示的当前阶段,未完成的子任务的 task id。
TaskErrMsg:如果有子任务执行出错,这里会显示对应子任务的错误信息。
Status:用于记录在整个作业过程中,可能出现的一些状态信息。
Timeout:作业的超时时间,单位是秒。
###命令:SHOW RESTORE

##取消当前正在执行的备份作业。
CANCEL BACKUP

##取消当前正在执行的恢复作业。
CANCEL RESTORE

##删除已创建的远端仓库。删除仓库,仅仅是删除该仓库在 Doris 中的映射,不会删除实际的仓库数据。
DROP REPOSITORY

###示例
## 1.创建远程仓库
  CREATE REPOSITORY `hdfs_repo`
        WITH BROKER `broker_name_2`
        ON LOCATION "hdfs://test-dev-bigdata1:8020/tmp/doris_backup"
        PROPERTIES
        (
            "username" = "",
            "password" = ""
        );

##2.备份表
全量备份 example_db 下的表 example_tbl 到仓库 example_repo 中:
        BACKUP SNAPSHOT ods.snapshot_label1
        TO hdfs_repo
        ON (ods_pos_pro_sell_out_delta)
        PROPERTIES ("type" = "full");
        
##3.查看备份任务
show BACKUP
##4.查看SNAPSHOT状态
SHOW SNAPSHOT ON hdfs_repo WHERE SNAPSHOT = "snapshot_label1";
##5.其他集群数据下添加REPOSITORY
 CREATE REPOSITORY `hdfs_repo`
        WITH BROKER `broker_name01`
        ON LOCATION "hdfs://test-dev-bigdata1:8020/tmp/doris_backup"
        PROPERTIES
        (
            "username" = "",
            "password" = ""
        );
##6.恢复
  RESTORE SNAPSHOT demo.`snapshot_label1`
        FROM `hdfs_repo`
        ON ( `ods_pos_pro_sell_out_delta` )
        PROPERTIES
        (
            "backup_timestamp"="2020-12-21-13-47-49", //通过SHOW SNAPSHOT ON hdfs_repo WHERE SNAPSHOT = "snapshot_label1";获取
            "replication_num" = "3"
        );
## 7.查看恢复状态
SHOW RESTORE
        
##示例2
##备份分区表
 BACKUP SNAPSHOT demo.snapshot_label2
        TO hdfs_repo
        ON (
				  ods_pos_pro_dish_list_detail_delta_tmp PARTITION (`P_20201101`, `P_20201102`,`P_20201103`,`P_20201104`,`P_20201105`,`P_20201106`,`P_20201107`,`P_20201108`,`P_20201109`,`P_20201110`,`P_20201111`,`P_20201112`)
					)
        PROPERTIES (
				    "type" = "full"
				);
##迁移
 RESTORE SNAPSHOT ods.`snapshot_label2`
        FROM `hdfs_repo`
        ON (  ods_pos_pro_dish_list_detail_delta_tmp PARTITION (`P_20201101`, `P_20201102`,`P_20201103`,`P_20201104`,`P_20201105`,`P_20201106`,`P_20201107`,`P_20201108`,`P_20201109`,`P_20201110`,`P_20201111`,`P_20201112`)
				)
        PROPERTIES
        (
            "backup_timestamp"="2020-12-21-14-37-45",
            "replication_num" = "3"
        );