DolphinDB C++ API 数据写入使用指南

时间:2023-03-27 10:59:21

本文为 DolphinDB C++ API (连接器)写入接口的使用指南,用户在有数据写入需求时,可以根据本篇教程快速明确地选择写入方式。本文将从使用场景介绍、原理简述、函数使用、场景实践四部分进行具体阐述。


一、场景介绍

目前大数据技术已广泛应用到金融、物联网等行业,而海量数据的写入是大数据处理和分析的基础。在实际应用中,数据的产生方式和采集途径多种多样,DolphinDB 作为轻量级的大数据平台,提供多种数据写入方式,用户可以在不同应用场景下选择最合适的方式进行数据写入。

以工业物联网场景为例,设备数据的写入场景通常可分为两类。

(1)多设备数据分散写入。

如某厂区有100台设备,每台设备通过独立的传输链路将数据一条条发送到 API 端,再统一通过 API 端写入到 DolphinDB。

DolphinDB C++ API 数据写入使用指南

多设备数据分散写入

(2)设备数据汇总后再写入 API。 如某厂区有100台设备,用采集服务(如 Kafka)将设备数据进行汇总,再统一通过 API 写入 DolphinDB。

DolphinDB C++ API 数据写入使用指南

设备数据汇总写入

针对以上场景,DolphinDB C++ API 提供了多种写入方法,以实现不同来源数据的高效写入:

设备场景

写入DolphinDB表的类型

调用DolphinDB 函数

实现方式

多设备数据一条条分散写入

ALL

MTW(MultithreadedTableWriter)

缓冲行数据后并行写入

设备数据汇总写入

ALL

MTW(MultithreadedTableWriter)

将合并的行数据并行写入

设备数据汇总写入

内存表

tableInsert

单表写入

设备数据汇总写入

分布式表

PTA(PartitionedTableAppender)

多线程按列批量写入

设备数据汇总写入

分布式表

AFTA(AutoFitTableAppender)

单线程按列批量写入

MTW 的写入方式可以适配多种写入场景,推荐首次接触 DolphinDB 的用户使用 MTW 方法;

tableInsert 方法可以将汇总数据简单快速地写入内存表;

而对于分布式表,C++ API 提供了保障并行写入的 PTA 方法,以及更简单易用,能自动转换写入数据字段类型的 AFTA/AFTU 方法。


二、原理简述

传统的开发人员通常对关系型数据库的行式存储(Row-Based)比较熟悉,数据按单行或多行的方式提交并写入,这种写入方式很容易理解,但是基于行式存储的数据库实际上并不是为大数据处理而设计的,海量数据的写入很容易遇到性能瓶颈。

DolphinDB 采用列式存储(Column-Based),在内存中维护一个 Cache Engine,当数据写入文件时,并不是直接写入到磁盘,而是先写入操作系统的缓冲页面中,再批量写入磁盘。为了确保写入数据不会在内存中丢失, DolphinDB 使用 WAL(Write Ahead Logging)的机制。详情可参考 DolphinDB 用户手册的数据模型

以一个5列(字段)的数据表为例,写入100万行的数据时,行式存储按行方式提交并写入,需要执行100万次的文件写入操作;而列式存储对单列进行写入,可以按列一次性提交100万个值,最少仅需5次文件操作就能完成数据写入。两种写入方式在海量数据的处理方面性能差异巨大。

DolphinDB C++ API 数据写入使用指南

行式存储和列式存储对比(图片来源于网络)

通常我们在为大数据应用场景规划写入方式前,需要理解以上列式存储的写入方式。按列批量写入能最大化发挥列式存储的优势,而当实际场景下多个设备写入数据较为分散时,可以选择有数据缓冲的 API 方法如 MTW,以获得最佳写入性能。

DolphinDB C++ API 支持多种数据写入方法,涵盖多样化的写入场景需求,主要特点如下:

写入方式

特点

MTW

-官方推荐用法

-按行接收数据

-内置数据缓冲队列

-多线程异步并发写入

tableInsert

- 方便简单,速度快

- 事务机制下,同一分区不能同时写入两条数据,因此不建议写入分布式表

PTA

- 按表写入

- 内置连接池

- 自动按分区同步并行写入

AFTA

- 自动转换字段类型写入

- 适用于历史数据整表落盘,追加写入

- 单线程同步写入

AFTU

- AFTA 更新写的版本

BatchTableWriter(旧版本)

- 因兼容性而保留的旧版函数

- 实时数据落盘,数据按行写入

- 单线程同步写入

  • MTW 支持高效按行写入,通过内置数据缓冲队列,MTW 将数据统一发送到 DolphinDB ,可以保证单条数据的写入效率,适用于多设备一条条分散写入场景。当性能要求不高时,也可用于第三方平台汇总数据后批量写入 API 的场景。 MTW 是对 BatchTableWriter(旧版本)的升级,二者均支持数据分散地从第三方平台传输到客户端的场景。MTW 的默认功能和 BatchTableWriter 一致,但支持多线程的并发写入。目前 BatchTableWriter 方式已经完全被 MTW 替代,仅因为兼容性而保留。
  • tableInsert 使用简单高效,可以支持数据汇总写入场景,若写入的 DolphinDB 表为内存表,可以选择 tableInsert 或者 PTA;但 tableInsert 没有分区写入保障机制,在开启事务机制的情况下,不建议写入分布式表。
  • PTA 能够自动按分区实现同步并行写入,适用于数据汇总写入场景。按列并行写入的机制确保了 PTA 方式在批量写入场景下拥有性能优势。
  • AFTA 能够自动将 C++ 字段类型转换为 DolphinDB 字段类型完成写入,使用上较 PTA 更为简单,同样适合数据汇总写入场景。PTA 的写入速度要好于 AFTA,在对写入效率有要求且仅进行追加写的情况下,建议优先考虑 PTA。
  • AFTU 是 AFTA 的更新写版本,更适合于重复数据存在的场景,读取新数据不存在重复时直接插入,存在重复时更新。针对数据写入是否需要更新,即当写入的数据在数据库中已有相同的主键或者相同的指定字段时,选择更新该条旧数据或者直接插入新数据,C++ API 给出了不同的写入方式。其中,MTW 内部分别实现了更新写和追加写,以 mode 参数的形式提供选择;而 PTA 仅提供了追加写的方式。

MTW,PTA,AFTA,AFTU 四种方法涵盖了绝大多数写入场景,其底层实现均调用了 tableInsert 或 upsert! (DolphinDB 脚本函数,关于 tableInsert 的更多介绍请参考 tableInsert — DolphinDB 2.0 documentation)。下节将重点介绍 MTW,PTA,AFTA,AFTU 四种函数的使用。

DolphinDB C++ API 的具体安装教程可参考 README_CN.md · dolphindb/api-cplusplus - Gitee


三、函数使用

1. MultithreadedTableWriter(MTW)

MTW 可以向内存表、流表、分区表、维度表中写入数据。不仅在内部实现并发写入,还可在 API 端创建多个 MTW 对象并发执行写入任务,MTW 也支持整型、时间等类型的内部自动转换。

MTW 在 API 端维护一个数据缓冲队列,API 端可调用写线程将数据按条持续写入缓冲队列,数据在缓冲队列堆积到一定数量后将一并被传送到服务器端。客户端创建出用户指定数目的 DolphinDB 连接,然后按照分区分配写入数据。在事务机制下,DolphinDB 不允许多个线程同时向同一个分区写入数据。

首先创建一个 MTW 对象。创建 MTW 时可指定每列的压缩方式。代码如下:

vector<COMPRESS_METHOD> compress;
for(int i=0;i<102;i++)compress.push_back(COMPRESS_LZ4);   // 每列的压缩方式
MultithreadedTableWriter writer(
      "127.0.0.1", 9900, "admin","123456","dfs://test_MultithreadedTableWriter","collect",NULL,false,NULL,1000,1,10,"deviceid", &compress);

MTW 的构造函数参数详见 README_CN.md · dolphindb/api-cplusplus - Gitee 。另外,dbNamepartitionColumnNamethreadCount 三个参数在写入不同类型的表时有很大区别,具体见下表。

表类型

参数1:dbName

参数2:partitionColumnName

参数3:threadCount

内存表

“”

任意字段

>=1

流表

“”

任意字段

>=1

分区表

实际数据库名

某个分区字段

>=1

维度表

实际数据库名

“”

1

接着在 API 端创建子线程插入数据到缓冲队列。当需要写入的数据量较大时,可根据实际情况在 API 端使用更多的线程。insert 方法需传入一个 ErrorCodeInfo 对象和一串变长参数,每个变长参数都代表一个字段值。

具体代码如下:

int rows = 1000; //行数
int cols = 5;   //列数
vector<ConstantSP> datas;
TableSP bt = conn.run("t0 = loadText('"+DATA_FIRE+"');t0");// 模拟数据源从 csv 文件导入
for(int i = 0; i< rows; ++i){
    for(int j = 0; j < cols; ++j)
        datas.emplace_back(bt->getColumn(j)->get(i));
}
// 创建线程
thread t([&]() {
    try {
        for(int i=0;i < bt->rows();i++){
           ErrorCodeInfo pErrorInfo;
           writer.insert(pErrorInfo,
                      datas[0], datas[1], datas[2], datas[3], datas[4] // 含5个字段的数据
           );
        }
    }catch (exception &e) {
         cerr << "MTW exit with exception: " << e.what() << endl;
    }
});
// 等待插入线程结束
t.join();

在 MTW 运行时,可能会发生写入错误,使用下述代码,获取对象当前的运行状态。

MultithreadedTableWriter::Status status;
writer.getStatus(status);
if (status.hasError()) {
	cout << "error in writing: " << status.errorInfo << endl;
}

status 对象的属性和方法详见 README_CN.md · dolphindb/api-cplusplus - Gitee 。

注意,API 端的写入线程结束不代表 MTW 完全退出,需使用 waitForThreadCompletion 方法等待 MTW 完全退出。同时,MTW 需要在内存中缓存写入数据,当 API 异常退出时可能会造成已缓存数据的丢失,因此需要合理配置缓存写入数量,以适配性能和高可用场景需求。

当 MTW 写入出现错误,需调用 getUnwrittenData 方法获取未写入数据。若有未写入数据,则需再次创建 MTW 对象进行写入。代码如下:

writer.getStatus(status);
    if (status.hasError()) {
        cout << "error after write complete: " << status.errorInfo << endl;
        // 获取未写入的数据
        std::vector<std::vector<ConstantSP>*> unwrittenData;
        writer.getUnwrittenData(unwrittenData);
        cout << "unwriterdata length " << unwrittenData.size() << endl;
        if (!unwrittenData.empty()) {
            try {
                // 重新写入这些数据,原有的 MTW 因为异常退出已经不能用了,需要创建新的 MTW
                cout << "create new MTW and write again." << endl;
                MultithreadedTableWriter newWriter("183.136.170.167", 9900, "admin", "123456", "dfs://test_MultithreadedTableWriter", "collect", NULL,false,NULL,1000,1,5,"deviceid", &compress);
                ErrorCodeInfo errorInfo;
                // 插入未写入的数据
                if (newWriter.insertUnwrittenData(unwrittenData, errorInfo)) {
                    // 等待写入完成后检查状态
                    newWriter.waitForThreadCompletion();
                    newWriter.getStatus(status);
                    if (status.hasError()) {
                        cout << "error in write again: " << status.errorInfo << endl;
                    }
                }
                else {
                    cout << "error in write again: " << errorInfo.errorInfo << endl;
                }
            }
            catch (exception &e) {
                cerr << "new MTW exit with exception: " << e.what() << endl;
            }
        }
    }

至此,MTW 的使用流程介绍完毕。

2. PartitionedTableAppender(PTA)

PTA 设计一个连接池,获取分布式表的分区信息后,将分区分配给连接池来并行写入。

PartitionedTableAppender 可向分布式表中写入数据

PTA 的使用简洁方便,创建一个连接池对象和 PTA 对象。注意,对象创建时需要指定写入表的分区字段,尽量使分区个数与连接池中连接个数相同。因为线程多反而会增加线程创建和销毁开销,而线程少无法最大利用服务器资源。当每个分区都能同时进行写入并且没有多余的线程创建,PTA 写入效率最高,资源分配也最合理。代码如下:

DBConnectionPool pool("127.0.0.1", 9900, 5, "admin", "123456");
// 分区列传入 deviceid 或 ts 均可,保证可以使用多线程写入数据集多个分区,因为 DolphinDB 开启事务时不允许多个 writer 同时写入到一个分区内
PartitionedTableAppender appender("dfs://test_PartitionedTableAppender", "collect","deviceid", pool);  
appender.append(bt);
pool.shutDown();

若当前连接池不再使用,会自动被释放,但存在释放延时,可以通过调用 shutDown() 等待线程任务执行结束后立即释放连接。

3. AutoFitTableAppender(AFTA)

AFTA 建立与 server 的连接后,对列数、列字段名、列字段类型等基础信息进行判断,完成整型、时间类型等字段的自动转换,随即使用 tableInsert 进行写入,目前 AFTA 与 AFTU 尚不支持整型与浮点型数据间的转换。

AutoFitTableAppender 内部实现简单,实用性高。可向流表、内存表、分区表、磁盘表写入数据

AFTA 的使用较 PTA 更为简单,创建完 AFTA 对象后即可调用 append() 写入。具体代码如下:

AutoFitTableAppender appender("dfs://test_AutoFitTableAppender", "collect", conn);
appender.append(bt);

4. AutoFitTableUpsert(AFTU)

创建 AFTU 时可指定字段 keycolName,当新插入数据的指定字段不与数据库中已有数据重复时,AFTU 直接将数据插入,而当该字段出现重复时,AFTU 可以对该条数据进行更新。

AutoFitTableUpsert 更适合于有重复数据写入的场景

类似于 AFTA,通过创建的 AFTU,调用 upsert() 即可完成数据的写入和更新:

vector<string> keycolName = {"id"};
AutoFitTableUpsert aftu("dfs://test_AutoFitTableUpsert", "collect", conn, false, &keycolName);
aftu.upsert(bt);

以下就追加写入和更新写入场景提供了更全面的选择参考:

数据写入场景

字段类型自动匹配

追加写入

更新写入

多设备分散写入


MTW

MTW

数据汇总写入


PTA、MTW

MTW

数据汇总写入


AFTA、MTW

AFTU、MTW

在底层实现上,AFTA 和 AFTU 通过整表插入的方式实现批量数据写入;而 MTW 通过维护数据缓冲队列实现批量数据异步写入。若需要使用 C++ API 实现整表数据的写入,推荐使用 AFTA 或 AFTU。


四、场景实践

以下场景案例展示了使用 DolphinDB C++ API 实现数据写入的流程:

某设备实验平台有100台设备,单台设备有1000个测点,实验平台需要采集设备的测点信息从而评估设备的使用情况。

实验平台要求测点信息按单值模型存储,每台设备每隔5分钟对所有1000个测点进行数据采集,汇总所有设备的数据后通过消息中间件统一传输到 API 端。实验平台要求支持对数据的批量写入,同时保证数据类型的一致性,不需要数据类型自动转换;若客户端意外崩溃,重启后 API 可重新接受数据。这种场景下采用 MTW 方法将实时数据写入数据库,其流程图如下:

DolphinDB C++ API 数据写入使用指南

实时数据落盘流程图

数据集:

  • 记录描述:100台设备,每台1000个测点,采集频率5分钟1次,采集持续10天
  • 记录行数:2.6亿行
  • 磁盘占用:1116 MB
  • 字段数量:6
  • 字段样式:
  • ts:数采时间
  • deviceCode:设备编号
  • logicalPostionId:逻辑位置ID
  • physicalPostionId:物理位置ID
  • propertyCode:属性测点编码
  • propertyValue:测点值(累计产量)

准备工作:

首先要在 server 端创建分布式数据库 db_demo、分区表 collect

// 建立分布式数据库及分区表
dbname="dfs://db_demo"
tablename="collect"
cols_info=`ts`deviceCdoe`logicalPostionId`physicalPostionId`propertyCode`propertyValue
cols_type=[DATETIME,SYMBOL,SYMBOL,SYMBOL,SYMBOL,INT]
t=table(1:0,cols_info,cols_type)
db=database(dbname,VALUE,[2022.11.01],engine=`TSDB)
pt=createPartitionedTable(db,t,tablename,`ts,,`deviceCdoe`ts)

然后创建一张流数据表 streamtable,使用 MTW 方式将数据写入这张流表,然后订阅流表,数据将从流表流向分区表 collect

// 建立流表
def saveToDFS(mutable dfstable, msg): dfstable.append!(msg)
share streamTable(1:0, cols_info, cols_type) as streamtable;
subscribeTable(tableName="streamtable", actionName="savetodfs", offset=0, handler=saveToDFS{pt}, msgAsTable=true, batchSize=1000, throttle=1)

也可直接在 C++ 代码中使用 conn.run(script) 的方式运行此段代码。

接口调用:

创建一个 MTW 对象,订阅流表

// 建立writer对象
MultithreadedTableWriter writer(
            "183.136.170.167", 9900, "admin","123456","","streamtable",NULL,false,NULL,1000,1,5,"deviceid", &compress);  
MultithreadedTableWriter::Status status;  // 保存 writer 状态

这里需要说明的是,本文着重介绍 API 的写入,通过模拟来展示从第三方平台采集数据到 API 端写入这一过程。此外,本场景在 API 端使用单线程写入数据,用户可根据实际场景使用多线程提高 API 端写入效率,完整代码见附件 API_mtw.cpp。

// 模拟接受批量数据,创建单线程写入数据
// bt 模拟接收消息中间件发送的数据,按设备(每台设备1000条数据)遍历采集数据
for(int i=0;i < (bt->rows())/1000;i++){
	system_clock::duration begin = system_clock::now().time_since_epoch();
	milliseconds milbegin = duration_cast<milliseconds>(begin);
	// 每台数据共1000个测点,写入1000行
	for(int j=i*1000;j<(i+1)*1000;j++){
		ErrorCodeInfo pErrorInfo;
		// 模拟对单条数据6个字段的写入
		writer.insert(pErrorInfo,
			datas[i*6+0], datas[i*6+1], datas[i*6+2], datas[i*6+3], datas[i*6+4], datas[i*6+5]
		)
	}
	system_clock::duration end = system_clock::now().time_since_epoch();
	milliseconds milend = duration_cast<milliseconds>(end);
	if((milend.count()-milbegin.count())<5000){
		// 控制模拟写入的频率
		sleep_for(std::chrono::milliseconds(5000-(milend.count()-milbegin.count())));
	}
}

若后台线程发生错误,MTW 可能退出后未将数据全部写入服务器(包括导致后台线程错误的那一批数据,这批数据可能已经写入服务器也可能未写入服务器)

// 检查写入完成后 MTW 状态
writer.getStatus(status);

该情况下首先获取未完成写入的数据

// 获取未写入的数据
std::vector<std::vector<ConstantSP>*> unwrittenData;
writer.getUnwrittenData(unwrittenData);
cout << "Unwritten data length " << unwrittenData.size() << endl;

重新写入上述数据

// 重新写入这些数据,原有的 MTW 因为异常退出已经不能用了,需要创建新的 MTW
MultithreadedTableWriter newWriter("192.168.0.61", 8848, "admin", "123456", "dfs://test_MultithreadedTableWriter", "collect", NULL,false,NULL,10000,1,10,"deviceid", &compress);
ErrorCodeInfo errorInfo;
// 插入获取到的未写入数据    
if (newWriter.insertUnwrittenData(unwrittenData, errorInfo)) {
	// 等待写入完成后检查状态
	newWriter.waitForThreadCompletion();
	newWriter.getStatus(status);
	if (status.hasError()) {
		cout << "error in write again: " << status.errorInfo << endl;
	}
}
else {
	cout << "error in write again: " << errorInfo.errorInfo << endl;
}

附件:

ddb_cpp_api_connector