calcite 在flink中的二次开发
- 1 CodeGen
- 2 flink 语法扩展
- 2.1 在进行 Rule 规则匹配时,放开对 Distinct 的限制
- 2.2下面附上一个 利用codegen来生成所需类的例子:
- 3 flink使用calcite 生成解析器FlinkSqlParserImpl
- 3.1 FlinkSqlParserImpl 的生成
- 3.1.1 flink 引入 calcite
- 3.1.2 fmpp 生成 Parser.jj
- 3.1.3 javacc 生成 parser
- 3.1.4 看看 Parser
- 3.1.5 blink planner 引入 flink-sql-parser
- 4 calcite 规则优化器
- 4.1 什么是查询优化器
- 4.2 基于规则优化(RBO)
- 4.3 基于成本优化(CBO)
- 4.4 优化规则
- 4.4.1 谓词下推(Predicate Pushdown)
- 4.4.2 常量折叠(Constant Folding)
- 4.4.3 列裁剪(Column Pruning)
- 扩展资料
关于calcite的概念相关的内容,在我另一篇帖子
深入理解flinksql执行流程,扩展解析器实现语法的扩展
1 CodeGen
首先阐述一下 codegen:
Codegen是基于ObjectWeb ASM的低开销的java代码生成器,他可以根据预先填好的规则与条件,通过编译代码,自动生成java类
在递归调用各个节点 DataStreamRel 的 translateToPlan 方法时,会利用CodeGen元编程成Flink的各种算子,就相当于我们直接利用Flink的DataSet或DataStream API开发的程序。
还是以上面的Demo为例,跟踪进 DataStreamScan 的 translateToPlan 方法中,会发现相关逻辑:
- 首先生成 function 代码的字符串形式,并封装成 GeneratedFunction 对象;
- 然后使用 CodeGen 进行编译;
- 在需要使用 Function 的时候使用反射进行加载使用。
后续在 扩展 flink语法(如join维表)时,需要针对上述步骤,拼接生成 function 的字符串形式。
2 flink 语法扩展
了解完 Flink Sql 的执行流程之后,就可以针对 Flink Sql 做语法、功能上的扩展。
在Flink老版本上,Flink不支持 COUNT(DISTINCT aaa) 语法,但是如果需要对 Flink 做此功能拓展,需要结合 前面说到的 Flink Sql 执行流程,做相应修改。
修改点:
- 在进行 Rule 规则匹配时,放开对 Distinct 的限制
- DataStreamRelNode 转为 DataStream 过程中,拼接CodeGen所需的 Function String
2.1 在进行 Rule 规则匹配时,放开对 Distinct 的限制
在 DATASTREAM_OPT_RULES.DataStreamGroupWindowAggregateRule 中放开对 Distinct 的限制:
2.2下面附上一个 利用codegen来生成所需类的例子:
新建一个项目 ,从源码中拷贝出codegen的代码文件夹
在配置文件中,添加好,sql的保留字,关键字,类的名字等信息,这里就不多说了,有需要的同学可以百度具体的原理技术
新建一个SqlUseFunction.java 这个就是上文中说到的function 代码的字符串形式,
在flink中,就是通过 拼装来拼装出一个类,调用codegen来进行编译得到继承抽象类SqlNode 的方法,所以在开发完的源代码中是找不到codegen相关的东西的,但实际他是参与了工作的。
package com;
import org.apache.calcite.sql.*;
import org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.calcite.util.ImmutableNullableList;
import java.util.List;
public class SqlUseFunction extends SqlCall {
private static final SqlSpecialOperator OPERATOR = new SqlSpecialOperator("USE FUNCTION",
SqlKind.OTHER_FUNCTION);
private final SqlIdentifier funcName;
private final SqlNodeList funcProps;
/**
* SqlUseFunction constructor.
*
* @param pos sql define location
* @param funcName function name
* @param funcProps function property
* */
public SqlUseFunction(SqlParserPos pos, SqlIdentifier funcName, SqlNodeList funcProps) {
super(pos);
this.funcName = funcName;
this.funcProps = funcProps;
}
@Override
public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
writer.keyword("USE FUNCTION");
funcName.unparse(writer, leftPrec, rightPrec);
if (funcProps != null) {
writer.keyword("WITH");
SqlWriter.Frame frame = writer.startList("(", ")");
for (SqlNode c : funcProps) {
writer.sep(",");
c.unparse(writer, 0, 0);
}
writer.endList(frame);
}
}
@Override
public SqlOperator getOperator() {
return OPERATOR;
}
@Override
public List<SqlNode> getOperandList() {
return ImmutableNullableList.of(funcName, funcProps);
}
}
pom文件中指定好,使用fmpp技术,以及codegen的地址等
<build>
<plugins>
<!-- adding fmpp code gen -->
<plugin>
<artifactId>maven-resources-plugin</artifactId>
<executions>
<execution>
<id>copy-fmpp-resources</id>
<phase>initialize</phase>
<goals>
<goal>copy-resources</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/codegen</outputDirectory>
<resources>
<resource>
<directory>src/main/codegen</directory>
<filtering>false</filtering>
</resource>
</resources>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<!-- 从calcite-core.jar提取解析器语法模板,并放入在${project.build}freemarker模板所在的目录 -->
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<version>2.8</version>
<executions>
<execution>
<id>unpack-parser-template</id>
<phase>initialize</phase>
<goals>
<goal>unpack</goal>
</goals>
<configuration>
<artifactItems>
<artifactItem>
<groupId>org.apache.calcite</groupId>
<artifactId>calcite-core</artifactId>
<version>1.18.0</version>
<type>jar</type>
<overWrite>true</overWrite>
<outputDirectory>${project.build.directory}/</outputDirectory>
<includes>**/Parser.jj</includes>
</artifactItem>
</artifactItems>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<configuration>
<cfgFile>${project.build.directory}/codegen/config.fmpp</cfgFile>
<outputDirectory>target/generated-sources</outputDirectory>
<templateDirectory>${project.build.directory}/codegen/templates</templateDirectory>
</configuration>
<groupId>com.googlecode.fmpp-maven-plugin</groupId>
<artifactId>fmpp-maven-plugin</artifactId>
<version>1.0</version>
<dependencies>
<dependency>
<groupId>org.freemarker</groupId>
<artifactId>freemarker</artifactId>
<version>2.3.28</version>
</dependency>
</dependencies>
<executions>
<execution>
<id>generate-fmpp-sources</id>
<phase>generate-sources</phase>
<goals>
<goal>generate</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>javacc-maven-plugin</artifactId>
<version>2.4</version>
<executions>
<execution>
<phase>generate-sources</phase>
<id>javacc</id>
<goals>
<goal>javacc</goal>
</goals>
<configuration>
<sourceDirectory>${project.build.directory}/generated-sources/</sourceDirectory>
<includes>
<include>**/Parser.jj</include>
</includes>
<lookAhead>2</lookAhead>
<isStatic>false</isStatic>
<outputDirectory>${project.build.directory}/generated-sources/</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
这里引入一张图 说明calcite中jacc和fmpp在其中起的作用
看到codegen中的fmpp了么 就是这个fmpp
3 flink使用calcite 生成解析器FlinkSqlParserImpl
以下面这个案例出发(代码基于 flink 1.13.1 版本):
public class ParserTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(10);
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
DataStream<Tuple3<String, Long, Long>> tuple3DataStream =
env.fromCollection(Arrays.asList(
Tuple3.of("2", 1L, 1627254000000L),
Tuple3.of("2", 1L, 1627218000000L + 5000L),
Tuple3.of("2", 101L, 1627218000000L + 6000L),
Tuple3.of("2", 201L, 1627218000000L + 7000L),
Tuple3.of("2", 301L, 1627218000000L + 7000L),
Tuple3.of("2", 301L, 1627218000000L + 7000L),
Tuple3.of("2", 301L, 1627218000000L + 7000L),
Tuple3.of("2", 301L, 1627218000000L + 7000L),
Tuple3.of("2", 301L, 1627218000000L + 7000L),
Tuple3.of("2", 301L, 1627218000000L + 86400000 + 7000L)))
.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor<Tuple3<String, Long, Long>>(Time.seconds(0L)) {
@Override
public long extractTimestamp(Tuple3<String, Long, Long> element) {
return element.f2;
}
});
tEnv.registerFunction("mod", new Mod_UDF());
tEnv.registerFunction("status_mapper", new StatusMapper_UDF());
tEnv.createTemporaryView("source_db.source_table", tuple3DataStream,
"status, id, timestamp, rowtime.rowtime");
String sql = "SELECT\n"
+ " count(1),\n"
+ " cast(tumble_start(rowtime, INTERVAL '1' DAY) as string)\n"
+ "FROM\n"
+ " source_db.source_table\n"
+ "GROUP BY\n"
+ " tumble(rowtime, INTERVAL '1' DAY)";
Table result = tEnv.sqlQuery(sql);
tEnv.toAppendStream(result, Row.class).print();
env.execute();
}
}
debug 过程如之前分析 sql -> SqlNode 过程所示,如下图直接定位到 SqlParser:
如上图可以看到具体的 Parser 就是 FlinkSqlParserImpl。
定位到具体的代码如下图所示(flink-table-palnner-blink-2.11-1.13.1.jar)。
最终 parse 的结果 SqlNode 如下图。
再来看看 FlinkSqlParserImpl 是怎么使用 calcite 生成的。
具体到 flink 中的实现,位于源码中的 flink-table.flink-sql-parser 模块(源码基于 flink 1.13.1)。
flink 是依赖 maven 插件实现的上面的整体流程。
3.1 FlinkSqlParserImpl 的生成
接下来看看整个 Parser 生成流程。
3.1.1 flink 引入 calcite
使用 maven-dependency-plugin 将 calcite 解压到 flink 项目 build 目录下。
<plugin>
<!-- Extract parser grammar template from calcite-core.jar and put
it under ${project.build.directory} where all freemarker templates are. -->
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<id>unpack-parser-template</id>
<phase>initialize</phase>
<goals>
<goal>unpack</goal>
</goals>
<configuration>
<artifactItems>
<artifactItem>
<groupId>org.apache.calcite</groupId>
<artifactId>calcite-core</artifactId>
<type>jar</type>
<overWrite>true</overWrite>
<outputDirectory>${project.build.directory}/</outputDirectory>
<includes>**/Parser.jj</includes>
</artifactItem>
</artifactItems>
</configuration>
</execution>
</executions>
</plugin>
3.1.2 fmpp 生成 Parser.jj
使用 maven-resources-plugin 将 Parser.jj 代码生成。
<plugin>
<artifactId>maven-resources-plugin</artifactId>
<executions>
<execution>
<id>copy-fmpp-resources</id>