1+1>2 ?多数据源关联分析系列…

时间:2023-02-23 11:17:14

数据表连接的 join 操作,相信大家都不陌生吧?

在数据分析时,经常需要对多个不同的数据源进行关联操作,因此各类数据库的 SQL 语言都包含了丰富的 join 语句,以支持批计算关联

而在金融的业务场景中,流数据实时关联更为常见。从简单的 OHLC 数据,到逐笔委托、逐笔成交、快照行情数据,从个股日频指标到中高频复杂因子,数据每分钟每几秒就会产生更新。对这些流数据实时融合,可以使因子指标计算更高效,也能满足更丰富的信息输出需求。


流数据实时关联难在哪?

- 对齐难

不同数据源的数据结构字段顺序时间频率等不完全一致,如何满足不同应用场景的关联需求,将多个数据表进行匹配和对齐,是算法层面需要解决的问题。

- 触发难

批计算的关联操作针对全量输入数据,计算结果也是一次性输出;而流数据实时关联中,数据是源源不断的,且无法预知下一条记录何时到来。增量计算应该什么时候触发呢?这是技术实现上要解决的问题。

- 缓存难

为了在每一条输入到来后,能尽快且尽可能正确地输出结果,历史数据流的缓存是基础,面对源源不断的数据,如何设置缓存和清理机制最高效?这就对内存管理机制提出了要求。

- 计算难

金融市场数据多以面板形式出现,支持面板数据分析向量化计算的函数可以极大方便用户的数据预处理,加速业务流程,如何在流数据实时关联场景下延续这些计算函数的优秀性能,也是一款流数据处理引擎需要考虑的问题。

股票、指数、逐笔、快照…. 围绕这些不同行情数据源而产生的上述问题,使用 DolphinDB,全部轻松解决。

系列之流数据实时关联

DolphinDB 提供了多种流数据连接引擎,支持对不断增长的数据流进行实时关联处理,在大数据流量下依然能够保持毫秒级的时延性能,用户直接调用引擎即可实现流数据实时关联。

这里我们罗列了一下用户经常会用到的一些场景:

1. 根据行情快照和逐笔成交数据实时计算复杂高频因子;

2. 在逐笔成交数据的基础上丰富委托信息并实时输出;

3. 根据逐笔成交数据实时匹配最近一次报价;

4. 对多个数据源降频采样,计算分钟指标并将结果关联到同一张表中;

5. 根据快照数据实时匹配股票历史日频指标;

6. 实时计算股票与某指数的分钟收益率相关性。

我们将以一个系列的形式,为大家详细展示,如何快速实现以上业务场景。


场景一 行情快照和逐笔成交数据

第一期,我们以依赖行情快照和逐笔成交数据实时计算复杂高频因子为例。

行情快照和逐笔成交数据包含着不同的信息,很多高频因子的计算同时依赖行情快照和成交数据。对于复杂因子计算来说,融合后的数据可以更方便地作为后续复杂因子的计算的输入。

通常的做法是在行情快照数据的基础上,融合前后两个快照之间的逐笔成交数据

如下图所示,我们需要将每条行情快照记录与一个时间窗口内的全部逐笔成交记录进行聚合匹配。这个时间窗口的上下界由两条行情快照数据的时刻决定,输出与原始的每一条行情快照记录一一对应。对于一个窗口中的逐笔成交记录,既需要计算交易量总和这样的聚合值,也希望以一个字段保留窗口内的全部逐笔成交明细。

1+1>2 ?多数据源关联分析系列…

DolphinDB 提供了 Window Join 引擎的特殊窗口来实现此场景。

// create table
share streamTable(1:0, `Sym`TradeTime`Side`TradeQty, [SYMBOL, TIME, INT, LONG]) as trades
share streamTable(1:0, `Sym`Time`Open`High`Low`Close, [SYMBOL, TIME, DOUBLE, DOUBLE, DOUBLE, DOUBLE]) as snapshot
share streamTable(1:0, `Time`Sym`Open`High`Low`Close`BuyQty`SellQty`TradeQtyList`TradeTimeList, [TIME, SYMBOL, DOUBLE, DOUBLE, DOUBLE, DOUBLE, LONG, LONG, LONG[], TIME[]]) as output

// create engine
wjMetrics = <[Open, High, Low, Close, sum(iif(Side==1, TradeQty, 0)), sum(iif(Side==2, TradeQty, 0)), TradeQty, TradeTime]>
fillArray = [00:00:00.000, "", 0, 0, 0, 0, 0, 0, [[]], [[]]]
wjEngine = createWindowJoinEngine(name="windowJoin", leftTable=snapshot, rightTable=trades, outputTable=output, window=0:0, metrics=wjMetrics, matchingColumn=`Sym, timeColumn=`Time`TradeTime, useSystemTime=false, nullFill=fillArray)

// subscribe topic
subscribeTable(tableName="snapshot", actinotallow="appendLeftStream", handler=getLeftStream(wjEngine), msgAsTable=true, offset=-1, hash=0)
subscribeTable(tableName="trades", actinotallow="appendRightStream", handler=getRightStream(wjEngine), msgAsTable=true, offset=-1, hash=1)

输出结果如下,其中最后两列为 array vector 类型数据,记录了窗口中全部成交记录的 TradeQty 字段明细、TradeTime 字段明细。

1+1>2 ?多数据源关联分析系列…

关于 Window join 的具体用法、参数介绍,大家可以点击阅读原文查看我们的用户手册。

下期预告

委托信息怎么补充?

Left Semi Join 引擎级联

敬请期待……