本文翻译自官网:https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/common.html
Table API和SQL集成在共同API中。该API的中心概念是Table
,用作查询的输入和输出。本文档介绍了使用Table API和SQL查询的程序的通用结构,如何注册 Table
,如何查询Table
以及如何发出 Table(数据)
。
- 两个 planner 之间的主要区别
- 表API和SQL程序的结构
- 创建一个TableEnvironment
- 在 Catalog 中注册表
- 注册扩展 Catalog
- 查询表
- 发出表(数据)
- 翻译并执行查询
- 与DataStream和DataSet API集成
- 查询优化
两个 planner 之间的主要区别
-
Blink将批处理作业视为流的特殊情况。因此,还不支持Table和DataSet之间的转换,并且批处理作业不会转换成DateSet ,而是像流作业一样转换为
DataStream
程序。 -
Blink planner 不支持
BatchTableSource
,而是使用bounded StreamTableSource 代替。 -
Blink planner 仅支持新的
Catalog
,不支持ExternalCatalog 它是
不推荐使用的。 -
为 old planner 和 blink planner 实现的 FilterableTableSource 是不相容的。old planner 会将
PlannerExpression
s 下推到FilterableTableSource
,而 blink planner 将 下推Expression
s (不懂什么意思:The old planner will push downPlannerExpression
s intoFilterableTableSource
, while the Blink planner will push downExpression
s.) 。 - 基于字符串的键值配置选项(有关详细信息,请参阅有关配置的文档)仅用于Blink planner。
-
两个 planner 的实现(
CalciteConfig
)PlannerConfig
不同。 - Blink planner 会将多个接收器优化为一个DAG(仅在TableEnvironment上支持,而不在StreamTableEnvironment上支持)。old planner 将始终将每个接收器优化为一个新的DAG,其中所有DAG彼此独立。
- old planner 现在不支持catalog 统计信息,而Blink planner 则支持。
表API和SQL程序的结构
用于批处理和流式传输的所有Table API和SQL程序都遵循相同的模式。以下代码示例显示了Table API和SQL程序的通用结构。
// create a TableEnvironment for specific planner batch or streaming val tableEnv = ... // see "Create a TableEnvironment" section // register a Table tableEnv.registerTable("table1", ...) // or tableEnv.registerTableSource("table2", ...) // or tableEnv.registerExternalCatalog("extCat", ...) // register an output Table tableEnv.registerTableSink("outputTable", ...); // create a Table from a Table API query val tapiResult = tableEnv.scan("table1").select(...) // create a Table from a SQL query val sqlResult = tableEnv.sqlQuery("SELECT ... FROM table2 ...") // emit a Table API result Table to a TableSink, same for SQL result tapiResult.insertInto("outputTable") // execute tableEnv.execute("scala_job")
注意:表API和SQL查询可以轻松地与DataStream或DataSet程序集成并嵌入其中。请参阅与DataStream和DataSet API集成,以了解如何将DataStream和DataSet转换为Tables,反之亦然。
创建一个TableEnvironment
TableEnvironment
是Table API和SQL集成的中心概念。它负责:
-
Table
在内部catalog中注册 - 注册外部catalog
- 执行SQL查询
- 注册用户定义的(标量,表或聚合)函数
-
将
DataStream
或DataSet
转换为Table
-
持有对
ExecutionEnvironment
或StreamExecutionEnvironment的引用
Table
始终绑定到特定的TableEnvironment
。不可能在同一查询中组合不同TableEnvironments的表,例如,join 或union 它们。
TableEnvironment
是通过调用StreamExecutionEnvironment
或ExecutionEnvironment的
静态方法 BatchTableEnvironment.create()
或StreamTableEnvironment.create()与可选的
。该TableConfig
创建的TableConfig
可用于配置TableEnvironment
或定制查询优化和翻译过程(参见查询优化)。
请务必选择特定的planner BatchTableEnvironment
/ StreamTableEnvironment 与
你的编程语言相匹配。
如果两个planner jar都在类路径上(默认行为),则应明确设置要在当前程序中使用的planner 。
// ********************** // FLINK STREAMING QUERY // ********************** import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.table.api.EnvironmentSettings import org.apache.flink.table.api.scala.StreamTableEnvironment val fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build() val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment val fsTableEnv = StreamTableEnvironment.create(fsEnv, fsSettings) // or val fsTableEnv = TableEnvironment.create(fsSettings) // ****************** // FLINK BATCH QUERY // ****************** import org.apache.flink.api.scala.ExecutionEnvironment import org.apache.flink.table.api.scala.BatchTableEnvironment val fbEnv = ExecutionEnvironment.getExecutionEnvironment val fbTableEnv = BatchTableEnvironment.create(fbEnv) // ********************** // BLINK STREAMING QUERY // ********************** import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.table.api.EnvironmentSettings import org.apache.flink.table.api.scala.StreamTableEnvironment val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment val bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() val bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings) // or val bsTableEnv = TableEnvironment.create(bsSettings) // ****************** // BLINK BATCH QUERY // ****************** import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment} val bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build() val bbTableEnv = TableEnvironment.create(bbSettings)
注意:如果/lib
目录中只有一个计划器jar ,则可以使用useAnyPlanner
(use_any_planner
对于python)创建specific EnvironmentSettings
。
在catalog中注册表
TableEnvironment
维护按名称注册的表的catalog。表有两种类型,输入表和输出表。可以在Table API和SQL查询中引用输入表并提供输入数据。输出表可用于将表API或SQL查询的结果发送到外部系统。
输入表可以从各种source 进行注册:
-
现有
Table
对象,通常是Table API或SQL查询的结果。 -
TableSource
,用于访问外部数据,例如文件,数据库或消息系统。 -
DataStream
或DataSet
从数据流(仅适用于流作业)或数据集(仅适用于old planner 转换批处理作业)程序。DataStream和DataSet API的集成部分中讨论了注册DataStream
或DataSet。
可以使用TableSink来注册输出表。
注册表格
在TableEnvironment中注册Table如下:
// get a TableEnvironment val tableEnv = ... // see "Create a TableEnvironment" section // table is the result of a simple projection query val projTable: Table = tableEnv.scan("X").select(...) // register the Table projTable as table "projectedTable" tableEnv.registerTable("projectedTable", projTable)
注意:注册Table
的处理方式与关系数据库系统中的VIEW相似,即,定义的查询Table
未经过优化,但是当另一个查询引用已注册的查询时将内联Table
。如果多个查询引用同一个已注册的查询Table
,则将为每个引用查询内联该查询并执行多次,即Table
将不会共享已注册的结果。
注册TableSource
TableSource
提供对存储 在存储系统(例如数据库(MySQL,HBase等)具有特定编码的文件(CSV,Apache [Parquet,Avro,ORC]等)或消息系统(Apache Kafka,RabbitMQ等)中的外部数据的访问。
Flink旨在为常见的数据格式和存储系统提供TableSources。请查看“ 表源和接收器”页面,以获取受支持的TableSource的列表以及如何构建自定义的TableSource
。
在TableEnvironment中注册TableSource 如下:
// get a TableEnvironment val tableEnv = ... // see "Create a TableEnvironment" section // create a TableSource val csvSource: TableSource = new CsvTableSource("/path/to/file", ...) // register the TableSource as table "CsvTable" tableEnv.registerTableSource("CsvTable", csvSource)
注:TableEnvironment
用于blink planner 只接受StreamTableSource
,LookupableTableSource和
InputFormatTableSource,
StreamTableSource
用于blink planner必须是有界的。
注册TableSink
TableSink
可以使用注册的表将Table API或SQL查询的结果发送到外部存储系统,例如数据库,键值存储(系统),消息队列或文件系统(采用不同的编码,例如CSV,Apache [Parquet ,Avro,ORC],…)。
Flink旨在为常见的数据格式和存储系统提供TableSink。请参阅有关“ 表源和接收器”页面的文档,以获取有关可用接收器的详细信息以及如何实现自定义的TableSink
。
在TableEnvironment中注册TableSink 如下:
// get a TableEnvironment val tableEnv = ... // see "Create a TableEnvironment" section // create a TableSink val csvSink: TableSink = new CsvTableSink("/path/to/file", ...) // define the field names and types val fieldNames: Array[String] = Array("a", "b", "c") val fieldTypes: Array[TypeInformation[_]] = Array(Types.INT, Types.STRING, Types.LONG) // register the TableSink as table "CsvSinkTable" tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, csvSink)
注册外部catalog
外部catalog可以提供有关外部数据库和表的信息,例如它们的名称,结构,统计信息,以及有关如何访问存储在外部数据库、表或文件中的数据的信息。
可以通过实现ExternalCatalog接口来创建外部catalog,并在TableEnvironment中对其进行注册,如下所示:
// get a TableEnvironment val tableEnv = ... // see "Create a TableEnvironment" section // create an external catalog val catalog: ExternalCatalog = new InMemoryExternalCatalog // register the ExternalCatalog catalog tableEnv.registerExternalCatalog("InMemCatalog", catalog)
在TableEnvironment中注册后,可以通过指定表的完整路径(例如catalog.database.table)从Table API或SQL查询中访问在ExternalCatalog中定义的所有表。
目前,Flink提供了一个InMemoryExternalCatalog
用于演示和测试目的的工具。但是,该ExternalCatalog
接口还可用于 将HCatalog或Metastore之类的catalog连接到Table API。
注意:blink planner不支持外部catalog。
查询表
表API
Table API是用于Scala和Java的语言集成查询API。与SQL相反,查询未指定为字符串,而是以宿主语言逐步构成。
该API基于Table类,Table类代表一个表(流或批的),并提供应用关系操作的方法。 这些方法返回一个新的Table对象,该对象表示对输入Table应用关系操作的结果。 某些关系操作由多个方法调用组成,例如table.groupBy(...)select(),其中groupBy(...)指定表的分组,并select(...)分组的投影表。
Table API文档描述了流和批处理表支持的所有Table API操作。
以下示例显示了一个简单的Table API聚合查询:
// get a TableEnvironment val tableEnv = ... // see "Create a TableEnvironment" section // register Orders table // scan registered Orders table val orders = tableEnv.scan("Orders") // compute revenue for all customers from France val revenue = orders .filter(‘cCountry === "FRANCE") .groupBy(‘cID, ‘cName) .select(‘cID, ‘cName, ‘revenue.sum AS ‘revSum) // emit or convert Table // execute query
注意: Scala Table API使用Scala符号,这些符号以单个记号(‘
)开头来引用的Table属性。Table API使用Scala隐式转换。为了使用Scala隐式转换确保导入了 org.apache.flink.api.scala._ 和
org.apache.flink.table.api.scala._ 。
SQL
Flink的SQL集成基于实现SQL标准的Apache Calcite。SQL查询被指定为常规字符串。
SQL文件描述flink SQL支持的流和批的表。
以下示例说明如何指定查询并以返回用表 表示的结果。
// get a TableEnvironment val tableEnv = ... // see "Create a TableEnvironment" section // register Orders table // compute revenue for all customers from France val revenue = tableEnv.sqlQuery(""" |SELECT cID, cName, SUM(revenue) AS revSum |FROM Orders |WHERE cCountry = ‘FRANCE‘ |GROUP BY cID, cName """.stripMargin) // emit or convert Table // execute query
下面的示例演示如何指定将更新查询的结果插入到已注册表中。
// get a TableEnvironment val tableEnv = ... // see "Create a TableEnvironment" section // register "Orders" table // register "RevenueFrance" output table // compute revenue for all customers from France and emit to "RevenueFrance" tableEnv.sqlUpdate(""" |INSERT INTO RevenueFrance |SELECT cID, cName, SUM(revenue) AS revSum |FROM Orders |WHERE cCountry = ‘FRANCE‘ |GROUP BY cID, cName """.stripMargin) // execute query
混合 Table API和SQL
Table API和SQL查询可以轻松混合,因为它们都返回Table
对象:
-
可以在
Table API 的查询可以定义在
SQL查询返回的Table对象上。 -
通过在TableEnvironment中注册结果表并在SQL查询的FROM子句中引用它,可以对Table API查询的结果定义SQL查询。(好绕:A SQL query can be defined on the result of a Table API query by registering the resulting Table in the
TableEnvironment
and referencing it in theFROM
clause of the SQL query )
发出Table
通过将表写入TableSink来发出表。 TableSink是通用接口,用于支持各种文件格式(例如CSV,Apache Parquet,Apache Avro),存储系统(例如JDBC,Apache HBase,Apache Cassandra,Elasticsearch)或消息系统(例如Apache Kafka, RabbitMQ)。
批处理表只能写入BatchTableSink,而流表则需要AppendStreamTableSink、RetractStreamTableSink或UpsertStreamTableSink。
请参阅有关表源和接收器的文档,以获取有关可用接收器的详细信息以及有关如何实现自定义TableSink的说明。
Table.insertInto(String tableName)方法将表发射到已注册的TableSink。 该方法通过名称从catalog中查找TableSink,并验证Table的结构与TableSink的结构是否相同。
以下示例显示如何发出表:
// get a TableEnvironment val tableEnv = ... // see "Create a TableEnvironment" section // create a TableSink val sink: TableSink = new CsvTableSink("/path/to/file", fieldDelim = "|") // register the TableSink with a specific schema val fieldNames: Array[String] = Array("a", "b", "c") val fieldTypes: Array[TypeInformation] = Array(Types.INT, Types.STRING, Types.LONG) tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, sink) // compute a result Table using Table API operators and/or SQL queries val result: Table = ... // emit the result Table to the registered TableSink result.insertInto("CsvSinkTable") // execute the program
翻译并执行查询
对于两个planner 来说,翻译和执行查询的行为是不同的。
Old planner
根据Table API和SQL查询的输入是流输入还是批处理输入,它们将转换为DataStream或DataSet程序。查询被内部表示为一个逻辑查询计划,并在两个阶段被转换:
- 优化逻辑计划
- 转换为 DataStream 或 DataSet 程序
在以下情况下,将转换 Table API或SQL查询:
-
将
Table
发射到TableSink
,即当Table.insertInto()
被调用。 -
指定SQL更新查询,即当
TableEnvironment.sqlUpdate()
调用。 -
将
Table
转换为DataStream 或
DataSet
(请参阅与DataStream和DataSet API集成)。
转换后,将像常规DataStream或DataSet程序一样处理Table API或SQL查询,并在调用StreamExecutionEnvironment.execute()
或ExecutionEnvironment.execute()时执行
。
Blink planner
无论Table API和SQL查询的输入是流传输还是批处理,都将转换为DataStream程序。查询被内部表示为一个逻辑查询计划,并在两个阶段被转换:
- 优化逻辑计划,
- 转换为DataStream程序。
翻译查询的行为TableEnvironment
和StreamTableEnvironment是不同的
。
对于TableEnvironment
,Table API或SQL查询在TableEnvironment.execute()
调用时被转换,因为TableEnvironment
将优化多个接收器为一个DAG。
而对于StreamTableEnvironment
,在以下情况下会转换Table API或SQL查询:
-
将
Table
发射到TableSink
,即当Table.insertInto()
被调用。 -
指定SQL更新查询,即当
TableEnvironment.sqlUpdate()被
调用。 -
将
Table
转换为DataStream
。
转换后,将像常规的DataStream程序一样处理Table API或SQL查询,并在调用TableEnvironment.execute()
或StreamExecutionEnvironment.execute()时执行
。
DataStream和DataSet API集成
以下示例显示如何发出表:流上的两个planner 都可以与DataStream API集成。 只有old planner 才能与DataSet API集成,批量blink planner不能同时与两者结合。 注意:下面讨论的DataSet API仅与批量使用的old planner 有关。
Table API和SQL查询可以轻松地与DataStream和DataSet程序集成并嵌入其中。 例如,可以查询外部表(例如从RDBMS),进行一些预处理,例如过滤,投影,聚合或与元数据联接,然后使用DataStream或 DataSet API(以及在这些API之上构建的任何库,例如CEP或Gelly)。 相反,也可以将Table API或SQL查询应用于DataStream或DataSet程序的结果。
可以通过将DataStream或DataSet转换为Table来实现这种交互,反之亦然。 在本节中,我们描述如何完成这些转换。
Scala的隐式转换
Scala Table API具有对DataSet,DataStream和Table类的隐式转换。 通过为Scala DataStream API导入org.apache.flink.table.api.scala._以及org.apache.flink.api.scala._,可以启用这些转换。
将DataStream或DataSet注册为表
可以在TableEnvironment中将DataStream或DataSet注册为表。 结果表的模式取决于已注册的DataStream或DataSet的数据类型。 请检查有关将数据类型映射到表模式的部分,以获取详细信息。
// get TableEnvironment // registration of a DataSet is equivalent val tableEnv: StreamTableEnvironment = ... // see "Create a TableEnvironment" section val stream: DataStream[(Long, String)] = ... // register the DataStream as Table "myTable" with fields "f0", "f1" tableEnv.registerDataStream("myTable", stream) // register the DataStream as table "myTable2" with fields "myLong", "myString" tableEnv.registerDataStream("myTable2", stream, ‘myLong, ‘myString)
注意:DataStream表的名称不能与^ _DataStreamTable_ [0-9] 模式匹配,并且DataSet表的名称不能与^ _DataSetTable_ [0-9] 模式匹配。 这些模式仅供内部使用。
将DataStream或DataSet转换为表
除了在TableEnvironment中注册DataStream或DataSet之外,还可以将其直接转换为Table。 如果要在Table API查询中使用Table,这将很方便。
// get TableEnvironment // registration of a DataSet is equivalent val tableEnv = ... // see "Create a TableEnvironment" section val stream: DataStream[(Long, String)] = ... // convert the DataStream into a Table with default fields ‘_1, ‘_2 val table1: Table = tableEnv.fromDataStream(stream) // convert the DataStream into a Table with fields ‘myLong, ‘myString val table2: Table = tableEnv.fromDataStream(stream, ‘myLong, ‘myString)
将表转换为DataStream或DataSet
可以将表转换为DataStream或DataSet。 这样,可以在Table API或SQL查询的结果上运行自定义DataStream或DataSet程序。
将表转换为DataStream或DataSet时,需要指定生成的DataStream或DataSet的数据类型,即,将表的行转换为的数据类型。 最方便的转换类型通常是Row。 以下列表概述了不同选项的功能:
- Row:字段按位置,任意数量的字段,支持null值,无类型安全访问的方式映射。
- POJO:字段按名称映射(POJO字段必须命名为
Table
字段),任意数量的字段,支持null
值,类型安全访问。 - case class:字段按位置映射,不支持
null
值,类型安全访问。 - tuple:按位置映射字段,限制为22(Scala)或25(Java)字段,不支持
null
值,类型安全访问。 - 原子类型:
Table
必须具有单个字段,不支持null
值,类型安全访问。
将Table转换为DataStream
流式查询结果产生的表将动态更新,即随着新记录到达查询的输入流不断变化。 因此,将这种动态查询转换成的DataStream需要对表的更新进行编码。
有两种模式可以将Table转换为DataStream:
- 追加模式:仅当动态表仅通过INSERT更改进行修改时才可以使用此模式,即,它仅是追加操作,并且以前发出的结果不更新。
- 撤回模式:始终可以使用此模式。 它使用布尔标志对INSERT和DELETE更改进行编码。
// get TableEnvironment. // registration of a DataSet is equivalent val tableEnv: StreamTableEnvironment = ... // see "Create a TableEnvironment" section // Table with two fields (String name, Integer age) val table: Table = ... // convert the Table into an append DataStream of Row val dsRow: DataStream[Row] = tableEnv.toAppendStream[Row](table) // convert the Table into an append DataStream of Tuple2[String, Int] val dsTuple: DataStream[(String, Int)] dsTuple = tableEnv.toAppendStream[(String, Int)](table) // convert the Table into a retract DataStream of Row. // A retract stream of type X is a DataStream[(Boolean, X)]. // The boolean field indicates the type of the change. // True is INSERT, false is DELETE. val retractStream: DataStream[(Boolean, Row)] = tableEnv.toRetractStream[Row](table)
注意:有关动态表及其属性的详细讨论,请参见“ 动态表”文档。
将Table转换为DataSet
Table
转换为DataSet
如下:
val tableEnv = BatchTableEnvironment.create(env) // Table with two fields (String name, Integer age) val table: Table = ... // convert the Table into a DataSet of Row val dsRow: DataSet[Row] = tableEnv.toDataSet[Row](table) // convert the Table into a DataSet of Tuple2[String, Int] val dsTuple: DataSet[(String, Int)] = tableEnv.toDataSet[(String, Int)](table)
数据类型到表结构的映射
Flink的DataStream和DataSet API支持非常多种类型。元组(内置Scala和Flink Java元组),POJO,Scala case class和 Flink 的 Row 类型等复合类型,允许嵌套的数据结构具有多个字段,这些字段可在表达式中访问。其他类型被视为原子类型。在下文中,我们描述Table API如何将这些类型转换为内部行表示形式,并显示将DataStream转换为的Table示例。
数据类型到表模式的映射可以使用两种方式:基于字段位置或基于字段名称。
基于位置的映射
基于位置的映射可用于在保持字段顺序的同时为字段提供更有意义的名称。此映射可用于具有定义的字段顺序的复合数据类型以及原子类型。元组,行和案例类等复合数据类型具有这样的字段顺序。但是,必须根据字段名称映射POJO的字段(请参阅下一节)。字段可以投影出来,但不能用别名 as 重命名。
定义基于位置的映射时,输入数据类型中一定不能存在指定的名称,否则API会假定应该基于字段名称进行映射。 如果未指定任何字段名称,则使用复合类型的默认字段名称和字段顺序,或者原子类型使用f0。
// get a TableEnvironment val tableEnv: StreamTableEnvironment = ... // see "Create a TableEnvironment" section val stream: DataStream[(Long, Int)] = ... // convert DataStream into Table with default field names "_1" and "_2" val table: Table = tableEnv.fromDataStream(stream) // convert DataStream into Table with field "myLong" only val table: Table = tableEnv.fromDataStream(stream, ‘myLong) // convert DataStream into Table with field names "myLong" and "myInt" val table: Table = tableEnv.fromDataStream(stream, ‘myLong, ‘myInt)
基于名称的映射
基于名称的映射可用于任何数据类型,包括POJO。这是定义表模式映射的最灵活的方法。映射中的所有字段均按名称引用,并且可以使用别名 as 重命名。字段可以重新排序和投影。
如果未指定任何字段名称,则使用复合类型的默认字段名称和字段顺序,或者原子类型使用f0。
// get a TableEnvironment val tableEnv: StreamTableEnvironment = ... // see "Create a TableEnvironment" section val stream: DataStream[(Long, Int)] = ... // convert DataStream into Table with default field names "_1" and "_2" val table: Table = tableEnv.fromDataStream(stream) // convert DataStream into Table with field "_2" only val table: Table = tableEnv.fromDataStream(stream, ‘_2) // convert DataStream into Table with swapped fields val table: Table = tableEnv.fromDataStream(stream, ‘_2, ‘_1) // convert DataStream into Table with swapped fields and field names "myInt" and "myLong" val table: Table = tableEnv.fromDataStream(stream, ‘_2 as ‘myInt, ‘_1 as ‘myLong)
原子类型
Flink将基本类型(整数,双精度型,字符串)或泛型(无法分析和分解的类型)视为原子类型。 原子类型的DataStream或DataSet转换为具有单个属性的表。 从原子类型推断出属性的类型,并且可以指定属性的名称。
// get a TableEnvironment val tableEnv: StreamTableEnvironment = ... // see "Create a TableEnvironment" section val stream: DataStream[Long] = ... // convert DataStream into Table with default field name "f0" val table: Table = tableEnv.fromDataStream(stream) // convert DataStream into Table with field name "myLong" val table: Table = tableEnv.fromDataStream(stream, ‘myLong)
元组(Scala和Java)和case类(仅Scala)
Flink支持Scala的内置元组,并为Java提供了自己的元组类。 两种元组的DataStreams和DataSet都可以转换为表。 可以通过提供所有字段的名称来重命名字段(根据位置进行映射)。 如果未指定任何字段名称,则使用默认字段名称。 如果引用了原始字段名称(Flink元组为f0,f1,...,Scala元组为_1,_2,...),则API会假定映射是基于名称的,而不是基于位置的。 基于名称的映射允许使用别名(as)对字段和投影进行重新排序。
// get a TableEnvironment val tableEnv: StreamTableEnvironment = ... // see "Create a TableEnvironment" section val stream: DataStream[(Long, String)] = ... // convert DataStream into Table with renamed default field names ‘_1, ‘_2 val table: Table = tableEnv.fromDataStream(stream) // convert DataStream into Table with field names "myLong", "myString" (position-based) val table: Table = tableEnv.fromDataStream(stream, ‘myLong, ‘myString) // convert DataStream into Table with reordered fields "_2", "_1" (name-based) val table: Table = tableEnv.fromDataStream(stream, ‘_2, ‘_1) // convert DataStream into Table with projected field "_2" (name-based) val table: Table = tableEnv.fromDataStream(stream, ‘_2) // convert DataStream into Table with reordered and aliased fields "myString", "myLong" (name-based) val table: Table = tableEnv.fromDataStream(stream, ‘_2 as ‘myString, ‘_1 as ‘myLong) // define case class case class Person(name: String, age: Int) val streamCC: DataStream[Person] = ... // convert DataStream into Table with default field names ‘name, ‘age val table = tableEnv.fromDataStream(streamCC) // convert DataStream into Table with field names ‘myName, ‘myAge (position-based) val table = tableEnv.fromDataStream(streamCC, ‘myName, ‘myAge) // convert DataStream into Table with reordered and aliased fields "myAge", "myName" (name-based) val table: Table = tableEnv.fromDataStream(stream, ‘age as ‘myAge, ‘name as ‘myName)
POJO (Java and Scala)
Flink支持POJO作为复合类型。确定POJO的规则在此文档。
在不指定字段名称的情况下将POJO DataStream或DataSet转换为Table时,将使用原始POJO字段的名称。 名称映射需要原始名称,并且不能按位置进行。 可以使用别名(使用as关键字)对字段进行重命名,重新排序和投影。
// get a TableEnvironment val tableEnv: StreamTableEnvironment = ... // see "Create a TableEnvironment" section // Person is a POJO with field names "name" and "age" val stream: DataStream[Person] = ... // convert DataStream into Table with default field names "age", "name" (fields are ordered by name!) val table: Table = tableEnv.fromDataStream(stream) // convert DataStream into Table with renamed fields "myAge", "myName" (name-based) val table: Table = tableEnv.fromDataStream(stream, ‘age as ‘myAge, ‘name as ‘myName) // convert DataStream into Table with projected field "name" (name-based) val table: Table = tableEnv.fromDataStream(stream, ‘name) // convert DataStream into Table with projected and renamed field "myName" (name-based) val table: Table = tableEnv.fromDataStream(stream, ‘name as ‘myName)
Row
该Row
数据类型支持字段和字段与任意数量的null
值。字段名称可以通过指定RowTypeInfo
或转化时Row
DataStream
或DataSet
成Table
。行类型支持按位置和名称映射字段。可以通过提供所有字段的名称(基于位置的映射)来重命名字段,也可以为投影/排序/重命名(基于名称的映射)单独选择字段。
// get a TableEnvironment val tableEnv: StreamTableEnvironment = ... // see "Create a TableEnvironment" section // DataStream of Row with two fields "name" and "age" specified in `RowTypeInfo` val stream: DataStream[Row] = ... // convert DataStream into Table with default field names "name", "age" val table: Table = tableEnv.fromDataStream(stream) // convert DataStream into Table with renamed field names "myName", "myAge" (position-based) val table: Table = tableEnv.fromDataStream(stream, ‘myName, ‘myAge) // convert DataStream into Table with renamed fields "myName", "myAge" (name-based) val table: Table = tableEnv.fromDataStream(stream, ‘name as ‘myName, ‘age as ‘myAge) // convert DataStream into Table with projected field "name" (name-based) val table: Table = tableEnv.fromDataStream(stream, ‘name) // convert DataStream into Table with projected and renamed field "myName" (name-based) val table: Table = tableEnv.fromDataStream(stream, ‘name as ‘myName)
查询优化
Old planner
Apache Flink利用Apache Calcite来优化和翻译查询。 当前执行的优化包括投影和过滤器下推,子查询去相关以及其他类型的查询重写。 Old Planner尚未优化联接的顺序,而是按照查询中定义的顺序执行它们(FROM子句中的表顺序和/或WHERE子句中的连接谓词顺序)。
Blink planner
Apache Flink利用并扩展了Apache Calcite来执行复杂的查询优化。这包括一系列基于规则和成本的优化,例如:
- 基于Apache Calcite的子查询解相关
- 计划修剪
- 分区修剪
- 过滤器下推
- 子计划重复数据删除避免重复计算
-
特殊的子查询重写,包括两个部分:
- 将IN和EXISTS转换为左半联接
- 将NOT IN和NOT EXISTS转换为左反联接
-
可选join 重新排序
-
通过启用
table.optimizer.join-reorder-enabled
-
通过启用
注意: IN / EXISTS / NOT IN / NOT EXISTS当前仅在子查询重写的结合条件下受支持。
优化器不仅基于计划,而且还基于可从数据源获得的丰富统计信息以及每个operator(例如io,cpu,网络和内存)的细粒度成本来做出明智的决策。
高级用户可以通过CalciteConfig
对象提供自定义优化,该对象可以通过调用提供给表环境TableEnvironment#getConfig#setPlannerConfig
。
解释表
Table API提供了一种机制来解释计算表的逻辑和优化查询计划。 这是通过TableEnvironment.explain(table)方法或TableEnvironment.explain()方法完成的。 explain(表)返回给定表的计划。 describe()返回多接收器计划的结果,主要用于Blink planner。 它返回一个描述三个计划的字符串:
- 关系查询的抽象语法树,即未优化的逻辑查询计划
- 优化的逻辑查询计划
- 实际执行计划
以下代码显示了一个示例以及使用explain(table)给定Table的相应输出:
val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = StreamTableEnvironment.create(env) val table1 = env.fromElements((1, "hello")).toTable(tEnv, ‘count, ‘word) val table2 = env.fromElements((1, "hello")).toTable(tEnv, ‘count, ‘word) val table = table1 .where(‘word.like("F%")) .unionAll(table2) val explanation: String = tEnv.explain(table) println(explanation)
== Abstract Syntax Tree == LogicalUnion(all=[true]) LogicalFilter(condition=[LIKE($1, _UTF-16LE‘F%‘)]) FlinkLogicalDataStreamScan(id=[1], fields=[count, word]) FlinkLogicalDataStreamScan(id=[2], fields=[count, word]) == Optimized Logical Plan == DataStreamUnion(all=[true], union all=[count, word]) DataStreamCalc(select=[count, word], where=[LIKE(word, _UTF-16LE‘F%‘)]) DataStreamScan(id=[1], fields=[count, word]) DataStreamScan(id=[2], fields=[count, word]) == Physical Execution Plan == Stage 1 : Data Source content : collect elements with CollectionInputFormat Stage 2 : Data Source content : collect elements with CollectionInputFormat Stage 3 : Operator content : from: (count, word) ship_strategy : REBALANCE Stage 4 : Operator content : where: (LIKE(word, _UTF-16LE‘F%‘)), select: (count, word) ship_strategy : FORWARD Stage 5 : Operator content : from: (count, word) ship_strategy : REBALANCE
以下代码显示了一个示例以及使用explain()的多sink计划的相应输出:
val settings = EnvironmentSettings.newInstance.useBlinkPlanner.inStreamingMode.build val tEnv = TableEnvironment.create(settings) val fieldNames = Array("count", "word") val fieldTypes = Array[TypeInformation[_]](Types.INT, Types.STRING) tEnv.registerTableSource("MySource1", new CsvTableSource("/source/path1", fieldNames, fieldTypes)) tEnv.registerTableSource("MySource2", new CsvTableSource("/source/path2",fieldNames, fieldTypes)) tEnv.registerTableSink("MySink1", new CsvTableSink("/sink/path1").configure(fieldNames, fieldTypes)) tEnv.registerTableSink("MySink2", new CsvTableSink("/sink/path2").configure(fieldNames, fieldTypes)) val table1 = tEnv.scan("MySource1").where("LIKE(word, ‘F%‘)") table1.insertInto("MySink1") val table2 = table1.unionAll(tEnv.scan("MySource2")) table2.insertInto("MySink2") val explanation = tEnv.explain(false) println(explanation)
多sink计划的结果是
== Abstract Syntax Tree == LogicalSink(name=[MySink1], fields=[count, word]) - LogicalFilter(condition=[LIKE($1, _UTF-16LE‘F%‘)]) - LogicalTableScan(table=[[default_catalog, default_database, MySource1, source: [CsvTableSource(read fields: count, word)]]]) LogicalSink(name=[MySink2], fields=[count, word]) - LogicalUnion(all=[true]) :- LogicalFilter(condition=[LIKE($1, _UTF-16LE‘F%‘)]) : - LogicalTableScan(table=[[default_catalog, default_database, MySource1, source: [CsvTableSource(read fields: count, word)]]]) - LogicalTableScan(table=[[default_catalog, default_database, MySource2, source: [CsvTableSource(read fields: count, word)]]]) == Optimized Logical Plan == Calc(select=[count, word], where=[LIKE(word, _UTF-16LE‘F%‘)], reuse_id=[1]) - TableSourceScan(table=[[default_catalog, default_database, MySource1, source: [CsvTableSource(read fields: count, word)]]], fields=[count, word]) Sink(name=[MySink1], fields=[count, word]) - Reused(reference_id=[1]) Sink(name=[MySink2], fields=[count, word]) - Union(all=[true], union=[count, word]) :- Reused(reference_id=[1]) - TableSourceScan(table=[[default_catalog, default_database, MySource2, source: [CsvTableSource(read fields: count, word)]]], fields=[count, word]) == Physical Execution Plan == Stage 1 : Data Source content : collect elements with CollectionInputFormat Stage 2 : Operator content : CsvTableSource(read fields: count, word) ship_strategy : REBALANCE Stage 3 : Operator content : SourceConversion(table:Buffer(default_catalog, default_database, MySource1, source: [CsvTableSource(read fields: count, word)]), fields:(count, word)) ship_strategy : FORWARD Stage 4 : Operator content : Calc(where: (word LIKE _UTF-16LE‘F%‘), select: (count, word)) ship_strategy : FORWARD Stage 5 : Operator content : SinkConversionToRow ship_strategy : FORWARD Stage 6 : Operator content : Map ship_strategy : FORWARD Stage 8 : Data Source content : collect elements with CollectionInputFormat Stage 9 : Operator content : CsvTableSource(read fields: count, word) ship_strategy : REBALANCE Stage 10 : Operator content : SourceConversion(table:Buffer(default_catalog, default_database, MySource2, source: [CsvTableSource(read fields: count, word)]), fields:(count, word)) ship_strategy : FORWARD Stage 12 : Operator content : SinkConversionToRow ship_strategy : FORWARD Stage 13 : Operator content : Map ship_strategy : FORWARD Stage 7 : Data Sink content : Sink: CsvTableSink(count, word) ship_strategy : FORWARD Stage 14 : Data Sink content : Sink: CsvTableSink(count, word) ship_strategy : FORWARD
欢迎关注Flink菜鸟公众号,会不定期更新Flink(开发技术)相关的推文