时序数据库的流计算支持

时间:2023-01-06 01:01:29

一、时序数据及其特点 

时序数据(Time Series Data)是基于相对稳定频率持续产生的一系列指标监测数据,比如一年内的道琼斯指数、一天内不同时间点的测量气温等。时序数据有以下几个特点:

  • 历史数据的不变性
  • 数据的有效性
  • 数据的时效性
  • 结构化的数据
  • 数据的大量性

 

二、时序数据库基本架构

 

时序数据库的流计算支持

 

针对时序数据的特点,时序数据库一般具有以下特性:

  • 高速的数据入库
  • 数据的生命周期管理
  • 数据的流处理
  • 高效的数据查询
  • 定制的数据压缩

 

三、流计算介绍 

流计算主要是指针对实时获取来自不同数据源的海量数据,经过实时分析处理,从而获得有价值的信息。常见的业务场景包括实时事件的快速反应,市场变化的实时告警,实时数据的交互分析等。流计算一般包括如下几方面的功能:

1)过滤和转换 (filter & map)

2)聚合以及窗口函数 (reduce,aggregation/window)

3)多数据流合并以及模式匹配 (joining & pattern detection)

4)从流到块处理

 

四、时序数据库对流计算的支持   

  • 案例一:使用定制化的流计算 API,如下面例子所示:

from(bucket: "mydb")  
|> range(start: -1h)  
|> filter(fn: (r) => r["_measurement"] == "mymeasurement")  
|> map(fn: (r) => ({ r with value: r.value * 2 }))  
|> filter(fn: (r) => r.value > 100)  
|> aggregateWindow(every: 1m, fn: sum, createEmpty: false)  
|> group(columns: ["location"])  
|> join(tables: {stream1: {bucket: "mydb", measurement: "stream1", start: -1h}, stream2: {bucket: "mydb", measurement: "stream2", start: -1h}}, on: ["location"])  
|> alert(name: "value_above_threshold", message: "Value is above threshold", crit: (r) => r.value > 100)  
|> to(bucket: "mydb", measurement: "output", tagColumns: ["location"])

 

  • 案例二:使用类 SQL 指令,创建流计算以及定义流计算规则,如下:

CREATE STREAM current_stream        
TRIGGER AT_ONCE        
INTO current_stream_output_stb AS        
SELECT             
 _wstart as start,              
 _wend as end,              
 max(current) as max_current        
 FROM meters        
 WHERE voltage <= 220        
 INTEVAL (5S) SLIDING (1s);