pipelinedb Continuous transforms 操作

时间:2021-02-18 20:17:31

Continuous transforms 可以进行数据的转换,数据是不进行存储,主要是可以加入到其他的stream pipeline 中,或者写到其他外部
存储中,和存储过程结合使用,当前默认内置一个pipeline_stream_insert方便数据写入其他strem
注意不支持聚合操作

docker-compose

version: '3.6'
services:
postgres:
image: pipelinedb/pipelinedb
ports:
- "5432:5432"

参考语法

CREATE CONTINUOUS TRANSFORM name AS query [ THEN EXECUTE PROCEDURE function_name ( arguments ) ]

query 查询说明
SELECT expression [ [ AS ] output_name ] [, ...]
[ FROM from_item [, ...] ]
[ WHERE condition ]
[ GROUP BY expression [, ...] ] where any expression in the SELECT statement can't contain an aggregate and
from_item can be one of: stream_name [ [ AS ] alias [ ( column_alias [, ...] ) ] ]
table_name [ [ AS ] alias [ ( column_alias [, ...] ) ] ]
from_item [ NATURAL ] join_type from_item [ ON join_condition ]

Continuous transforms 输出流

Continuous transforms 输出流,可以方便其他transforms或者Continuous view 读取

  • 参考
创建 CONTINUOUS TRANSFORM
CREATE CONTINUOUS TRANSFORM t AS
SELECT t.y FROM some_stream s JOIN some_table t ON s.x = t.x;
使用
CREATE CONTINUOUS VIEW v AS
SELECT sum(y) FROM output_of('t');

参考例子

  • 创建两个stream
CREATE STREAM mystream3 (x integer, y integer);

CREATE STREAM mystream4 (x integer, y integer);
  • 创建CONTINUOUS VIEW
CREATE CONTINUOUS VIEW v4 AS
SELECT x,y FROM mystream3 ; CREATE CONTINUOUS VIEW v5 AS
SELECT x,y FROM mystream4 ;
  • 创建CONTINUOUS TRANSFORM

    当insert 到mystream3 中的x为偶数的时候执行插入mystream4

CREATE CONTINUOUS TRANSFORM t3 AS
SELECT x::int, y::int FROM mystream3 WHERE mod(x, 2) = 0
THEN EXECUTE PROCEDURE pipeline_stream_insert('mystream4');
  • 数据插入&& 查询结果
    x 插入数据为1奇数
insert into mystream3(x,y) values(1,2);
select * from v4;
select * from v5;

pipelinedb Continuous transforms 操作
pipelinedb Continuous transforms 操作
x 插入数据为2奇数

insert into mystream3(x,y) values(2,5);
select * from v4;
select * from v5;

pipelinedb Continuous transforms 操作
pipelinedb Continuous transforms 操作

  • 使用CONTINUOUS TRANSFORM 的output steam
CREATE CONTINUOUS VIEW v6 AS

  SELECT x,y  FROM output_of('t3') ;

插入数据&&查询

insert into mystream3(x,y) values(4,7);
select * from v6;

pipelinedb Continuous transforms 操作

参考资料

http://docs.pipelinedb.com/continuous-transforms.html