1 配置
SQL 客户端启动时可以添加 CLI 选项,具体如下。
-
./bin/ embedded --help
-
-
Mode "embedded" submits Flink jobs from the local machine.
-
-
Syntax: embedded [OPTIONS]
-
"embedded" mode options:
-
-d,--defaults <environment file> The environment properties with which
-
every new session is initialized.
-
Properties might be overwritten by
-
session properties.
-
-e,--environment <environment file> The environment properties to be
-
imported into the session. It might
-
overwrite default environment
-
properties.
-
-h,--help Show the help message with
-
descriptions of all options.
-
-hist,--history <History file path> The file which you want to save the
-
command history into. If not
-
specified, we will auto-generate one
-
under your user's home directory.
-
-j,--jar <JAR file> A JAR file to be imported into the
-
session. The file might contain
-
user-defined classes needed for the
-
execution of statements such as
-
functions, table sources, or sinks.
-
Can be used multiple times.
-
-l,--library <JAR directory> A JAR file directory with which every
-
new session is initialized. The files
-
might contain user-defined classes
-
needed for the execution of
-
statements such as functions, table
-
sources, or sinks. Can be used
-
multiple times.
-
-pyarch,--pyArchives <arg> Add python archive files for job. The
-
archive files will be extracted to
-
the working directory of python UDF
-
worker. Currently only zip-format is
-
supported. For each archive file, a
-
target directory be specified. If the
-
target directory name is specified,
-
the archive file will be extracted to
-
a name can directory with the
-
specified name. Otherwise, the
-
archive file will be extracted to a
-
directory with the same name of the
-
archive file. The files uploaded via
-
this option are accessible via
-
relative path. '#' could be used as
-
the separator of the archive file
-
path and the target directory name.
-
Comma (',') could be used as the
-
separator to specify multiple archive
-
files. This option can be used to
-
upload the virtual environment, the
-
data files used in Python UDF (.:
-
--pyArchives
-
file:///tmp/py37.zip,file:///tmp/data
-
.zip#data --pyExecutable
-
py37.zip/py37/bin/python). The data
-
files could be accessed in Python
-
UDF, .: f = open('data/',
-
'r').
-
-pyexec,--pyExecutable <arg> Specify the path of the python
-
interpreter used to execute the
-
python UDF worker (.:
-
--pyExecutable
-
/usr/local/bin/python3). The python
-
UDF worker depends on Python 3.5+,
-
Apache Beam (version == 2.19.0), Pip
-
(version >= 7.1.0) and SetupTools
-
(version >= 37.0.0). Please ensure
-
that the specified environment meets
-
the above requirements.
-
-pyfs,--pyFiles <pythonFiles> Attach custom python files for job.
-
These files will be added to the
-
PYTHONPATH of both the local client
-
and the remote python UDF worker. The
-
standard python resource file
-
suffixes such as .py/.egg/.zip or
-
directory are all supported. Comma
-
(',') could be used as the separator
-
to specify multiple files (.:
-
--pyFiles
-
file:///tmp/myresource.zip,hdfs:///$n
-
amenode_address/myresource2.zip).
-
-pyreq,--pyRequirements <arg> Specify a file which
-
defines the third-party dependencies.
-
These dependencies will be installed
-
and added to the PYTHONPATH of the
-
python UDF worker. A directory which
-
contains the installation packages of
-
these dependencies could be specified
-
optionally. Use '#' as the separator
-
if the optional parameter exists
-
(.: --pyRequirements
-
file:///tmp/#file:///
-
tmp/cached_dir).
-
-s,--session <session identifier> The identifier for a session.
-
'default' is the default identifier.
-
-u,--update <SQL update statement> Experimental (for testing only!):
-
Instructs the SQL Client to
-
immediately execute the given update
-
statement after starting up. The
-
process is shut down after the
-
statement has been submitted to the
-
cluster and returns an appropriate
-
return code. Currently, this feature
-
is only supported for INSERT INTO
-
statements that declare the target
-
sink table.
1.1 环境配置文件
SQL 查询执行前需要配置相关环境变量。环境配置文件 定义了 catalog、table sources、table sinks、用户自定义函数和其他执行或部署所需属性。
每个环境配置文件是常规的 YAML 文件,例子如下。
-
# 定义表,如 source、sink、视图或临时表。
-
-
tables:
-
- name: MyTableSource
-
type: source-table
-
update-mode: append
-
connector:
-
type: filesystem
-
path: "/path/to/"
-
format:
-
type: csv
-
fields:
-
- name: MyField1
-
data-type: INT
-
- name: MyField2
-
data-type: VARCHAR
-
line-delimiter: "\n"
-
comment-prefix: "#"
-
schema:
-
- name: MyField1
-
data-type: INT
-
- name: MyField2
-
data-type: VARCHAR
-
- name: MyCustomView
-
type: view
-
query: "SELECT MyField2 FROM MyTableSource"
-
-
# 定义用户自定义函数
-
-
functions:
-
- name: myUDF
-
from: class
-
class:
-
constructor:
-
- 7.6
-
- false
-
-
# 定义 catalogs
-
-
catalogs:
-
- name: catalog_1
-
type: hive
-
property-version: 1
-
hive-conf-dir: ...
-
- name: catalog_2
-
type: hive
-
property-version: 1
-
default-database: mydb2
-
hive-conf-dir: ...
-
-
# 改变表程序基本的执行行为属性。
-
-
execution:
-
planner: blink # 可选: 'blink' (默认)或 'old'
-
type: streaming # 必选:执行模式为 'batch' 或 'streaming'
-
result-mode: table # 必选:'table' 或 'changelog'
-
max-table-result-rows: 1000000 # 可选:'table' 模式下可维护的最大行数(默认为 1000000,小于 1 则表示无限制)
-
time-characteristic: event-time # 可选: 'processing-time' 或 'event-time' (默认)
-
parallelism: 1 # 可选:Flink 的并行数量(默认为 1)
-
periodic-watermarks-interval: 200 # 可选:周期性 watermarks 的间隔时间(默认 200 ms)
-
max-parallelism: 16 # 可选:Flink 的最大并行数量(默认 128)
-
min-idle-state-retention: 0 # 可选:表程序的最小空闲状态时间
-
max-idle-state-retention: 0 # 可选:表程序的最大空闲状态时间
-
current-catalog: catalog_1 # 可选:当前会话 catalog 的名称(默认为 'default_catalog')
-
current-database: mydb1 # 可选:当前 catalog 的当前数据库名称
-
# (默认为当前 catalog 的默认数据库)
-
restart-strategy: # 可选:重启策略(restart-strategy)
-
type: fallback # 默认情况下“回退”到全局重启策略
-
-
# 用于调整和调优表程序的配置选项。
-
-
# 在专用的”配置”页面上可以找到完整的选项列表及其默认值。
-
configuration:
-
table.-reorder-enabled: true
-
table.: true
-
table.-size: 128kb
-
-
# 描述表程序提交集群的属性。
-
-
deployment:
-
response-timeout: 5000
上述配置:
- 定义一个从 CSV 文件中读取的 table source
MyTableSource
所需的环境, - 定义了一个视图
MyCustomView
,该视图是用 SQL 查询声明的虚拟表, - 定义了一个用户自定义函数
myUDF
,该函数可以使用类名和两个构造函数参数进行实例化, - 连接到两个 Hive catalogs 并用
catalog_1
来作为当前目录,用mydb1
来作为该目录的当前数据库, - streaming 模式下用 blink planner 来运行时间特征为 event-time 和并行度为 1 的语句,
- 在
table
结果模式下运行试探性的(exploratory)的查询, - 并通过配置选项对联结(join)重排序和溢出进行一些计划调整。
根据使用情况,配置可以被拆分为多个文件。因此,一般情况下(用 --defaults
指定默认环境配置文件)以及基于每个会话(用 --environment
指定会话环境配置文件)来创建环境配置文件。每个 CLI 会话均会被属于 session 属性的默认属性初始化。例如,默认环境配置文件可以指定在每个会话中都可用于查询的所有 table source,而会话环境配置文件仅声明特定的状态保留时间和并行性。启动 CLI 应用程序时,默认环境配置文件和会话环境配置文件都可以被指定。如果未指定默认环境配置文件,则 SQL 客户端将在 Flink 的配置目录中搜索 ./conf/
。
注意 在 CLI 会话中设置的属性(如 SET
命令)优先级最高:
CLI commands > session environment file > defaults environment file
重启策略(Restart Strategies)
重启策略控制 Flink 作业失败时的重启方式。与 Flink 集群的全局重启策略相似,更细精度的重启配置可以在环境配置文件中声明。
Flink 支持以下策略:
-
execution:
-
# 退回到 中定义的全局策略
-
restart-strategy:
-
type: fallback
-
-
# 作业直接失败并且不尝试重启
-
restart-strategy:
-
type: none
-
-
# 最多重启作业的给定次数
-
restart-strategy:
-
type: fixed-delay
-
attempts: 3 # 作业被宣告失败前的重试次数(默认:Integer.MAX_VALUE)
-
delay: 10000 # 重试之间的间隔时间,以毫秒为单位(默认:10 秒)
-
-
# 只要不超过每个时间间隔的最大故障数就继续尝试
-
restart-strategy:
-
type: failure-rate
-
max-failures-per-interval: 1 # 每个间隔重试的最大次数(默认:1)
-
failure-rate-interval: 60000 # 监测失败率的间隔时间,以毫秒为单位
-
delay: 10000 # 重试之间的间隔时间,以毫秒为单位(默认:10 秒)
1.2 依赖
SQL 客户端不要求用 Maven 或者 SBT 设置 Java 项目。相反,你可以以常规的 JAR 包给集群提交依赖项。你也可以分别(用 --jar
)指定每一个 JAR 包或者(用 --library
)定义整个 library 依赖库。为连接扩展系统(如 Apache Kafka)和相应的数据格式(如 JSON),Flink提供了开箱即用型 JAR 捆绑包(ready-to-use JAR bundles)。这些 JAR 包各个发行版都可以从 Maven *库中下载到。
提供的 SQL JARs 和使用文档的完整清单可以在连接扩展系统页面中找到。
如下例子展示了从 Apache Kafka 中读取 JSON 文件并作为 table source 的环境配置文件。
-
tables:
-
- name: TaxiRides
-
type: source-table
-
update-mode: append
-
connector:
-
property-version: 1
-
type: kafka
-
version: "0.11"
-
topic: TaxiRides
-
startup-mode: earliest-offset
-
properties:
-
: localhost:9092
-
: testGroup
-
format:
-
property-version: 1
-
type: json
-
schema: "ROW<rideId LONG, lon FLOAT, lat FLOAT, rideTime TIMESTAMP>"
-
schema:
-
- name: rideId
-
data-type: BIGINT
-
- name: lon
-
data-type: FLOAT
-
- name: lat
-
data-type: FLOAT
-
- name: rowTime
-
data-type: TIMESTAMP(3)
-
rowtime:
-
timestamps:
-
type: "from-field"
-
from: "rideTime"
-
watermarks:
-
type: "periodic-bounded"
-
delay: "60000"
-
- name: procTime
-
data-type: TIMESTAMP(3)
-
proctime: true
TaxiRide
表的结果格式与绝大多数的 JSON 格式相似。此外,它还添加了 rowtime 属性 rowTime
和 processing-time 属性 procTime
。
connector
和 format
都允许定义属性版本(当前版本为 1
)以便将来向后兼容。
1.3 自定义函数(User-defined Functions)
SQL 客户端允许用户创建用户自定义的函数来进行 SQL 查询。当前,这些自定义函数仅限于 Java/Scala 编写的类以及 Python 文件。
为提供 Java/Scala 的自定义函数,你首先需要实现和编译函数类,该函数继承自 ScalarFunction
、 AggregateFunction
或 TableFunction
(见自定义函数)。一个或多个函数可以打包到 SQL 客户端的 JAR 依赖中。
为提供 Python 的自定义函数,你需要编写 Python 函数并且用装饰器 或
来装饰(见 Python UDFs))。Python 文件中可以放置一个或多个函数。其Python 文件和相关依赖需要通过在环境配置文件中或命令行选项(见 命令行用法)配置中特别指定(见 Python 配置)。
所有函数在被调用之前,必须在环境配置文件中提前声明。functions
列表中每个函数类都必须指定
- 用来注册函数的
name
, - 函数的来源
from
(目前仅限于class
(Java/Scala UDF)或python
(Python UDF)),
Java/Scala UDF 必须指定:
- 声明了全限定名的函数类
class
以及用于实例化的constructor
参数的可选列表。
Python UDF 必须指定:
- 声明全程名称的
fully-qualified-name
,即函数的 “[module name].[object name]”
-
functions:
-
- name: java_udf # required: name of the function
-
from: class # required: source of the function
-
class: ... # required: fully qualified class name of the function
-
constructor: # optional: constructor parameters of the function class
-
- ... # optional: a literal parameter with implicit type
-
- class: ... # optional: full class name of the parameter
-
constructor: # optional: constructor parameters of the parameter's class
-
- type: ... # optional: type of the literal parameter
-
value: ... # optional: value of the literal parameter
-
- name: python_udf # required: name of the function
-
from: python # required: source of the function
-
fully-qualified-name: ... # required: fully qualified class name of the function
对于 Java/Scala UDF,要确保函数类指定的构造参数顺序和类型都要严格匹配。
构造函数参数
根据用户自定义函数可知,在用到 SQL 语句中之前,有必要将构造参数匹配对应的类型。
如上述示例所示,当声明一个用户自定义函数时,可以使用构造参数来配置相应的类,有以下三种方式:
隐式类型的文本值:SQL 客户端将自动根据文本推导对应的类型。目前,只支持 BOOLEAN
、INT
、 DOUBLE
和 VARCHAR
。
如果自动推导的类型与期望不符(例如,你需要 VARCHAR 类型的 false
),可以改用显式类型。
-
- true # -> BOOLEAN (case sensitive)
-
- 42 # -> INT
-
- 1234.222 # -> DOUBLE
-
- foo # -> VARCHAR
显式类型的文本值:为保证类型安全,需明确声明 type
和 value
属性的参数。
-
- type: DECIMAL
-
value: 11111111111111111
下表列出支持的 Java 参数类型和与之相对应的 SQL 类型。
Java 类型 | SQL 类型 |
---|---|
|
DECIMAL |
|
BOOLEAN |
|
TINYINT |
|
DOUBLE |
|
REAL , FLOAT
|
|
INTEGER , INT
|
|
BIGINT |
|
SMALLINT |
|
VARCHAR |
其他类型 (例如 TIMESTAMP
和 ARRAY
)、原始类型和 null
目前还不支持。
(嵌套)类实例:除了文本值外,还可以通过指定构造参数的 class
和 constructor
属性来创建(嵌套)类实例。这个过程可以递归执行,直到最后的构造参数是用文本值来描述的。
-
- class:
-
constructor:
-
- StarryName
-
- class:
-
constructor:
-
- class: .String
-
constructor:
-
- type: VARCHAR
-
value: 3
2 扩展
-
==============================================================================
-
**Table Sources**
-
==============================================================================
-
Define table sources here. See the Table API & SQL documentation for details.
-
-
tables:
-
- name: Rides --表名
-
type: source --表类型 soruce为读入型源表,sink为写入型目标表(source表不存储真实的数据,sink表存储真实数据存储在外部依赖如mysql,kafka等)
-
update-mode: append --更新方式 append 或者 update(Update 流只能写入支持更新的外部存储,如 MySQL, HBase。Append 流可以写入任意地存储,不过一般写入日志类型的系统,如 Kafka。)
-
schema: --映射 目标表的字段及类型,此处字段和类型与format处的字段对应
-
- name: rideId
-
type: LONG
-
- name: taxiId
-
type: LONG
-
- name: isStart
-
type: BOOLEAN
-
- name: lon
-
type: FLOAT
-
- name: lat
-
type: FLOAT
-
- name: rideTime -- 输出字段由eventTime变更为rideTime ,依据timestamp类型字段将其设为时间属性rowTime
-
type: TIMESTAMP
-
rowtime:
-
timestamps:
-
type: "from-field" --时间戳字段获取方式 :来自源表字段
-
from: "eventTime" --时间戳字段 :源表的时间戳字段
-
watermarks: --水印
-
type: "periodic-bounded" --定义周期性水印
-
delay: "60000" --最大延迟
-
- name: psgCnt
-
type: INT
-
connector: --连接器
-
property-version: 1
-
type: kafka --连接kafka
-
version: universal --0.11版本以上选择 universal
-
topic: Rides --消费的topic名称
-
startup-mode: earliest-offset --消费方式 earliest-offset从头开始消费数据 latest-offset消费最新数据
-
properties: --设置zk,kafka端口及IP地址
-
- key:
-
value: zookeeper:2181
-
- key:
-
value: kafka:9092
-
- key: group.id --设置消费者组
-
value: testGroup
-
format: --解析数据格式化
-
property-version: 1
-
type: json --此处解析数据类型是json格式,与上面字段映射一样
-
schema: "ROW(rideId LONG, isStart BOOLEAN, eventTime TIMESTAMP, lon FLOAT, lat FLOAT, psgCnt INT, taxiId LONG)"
-
- name: Fares
-
type: source
-
update-mode: append
-
schema:
-
- name: rideId
-
type: LONG
-
- name: payTime
-
type: TIMESTAMP
-
rowtime:
-
timestamps:
-
type: "from-field"
-
from: "eventTime"
-
watermarks:
-
type: "periodic-bounded"
-
delay: "60000"
-
- name: payMethod
-
type: STRING
-
- name: tip
-
type: FLOAT
-
- name: toll
-
type: FLOAT
-
- name: fare
-
type: FLOAT
-
connector:
-
property-version: 1
-
type: kafka
-
version: universal
-
topic: Fares
-
startup-mode: earliest-offset
-
properties:
-
- key:
-
value: zookeeper:2181
-
- key:
-
value: kafka:9092
-
- key: group.id
-
value: testGroup
-
format:
-
property-version: 1
-
type: json
-
schema: "ROW(rideId LONG, eventTime TIMESTAMP, payMethod STRING, tip FLOAT, toll FLOAT, fare FLOAT)"
-
- name: DriverChanges
-
type: source
-
update-mode: append
-
schema:
-
- name: taxiId
-
type: LONG
-
- name: driverId
-
type: LONG
-
- name: usageStartTime
-
type: TIMESTAMP
-
rowtime:
-
timestamps:
-
type: "from-field"
-
from: "eventTime"
-
watermarks:
-
type: "periodic-bounded"
-
delay: "60000"
-
connector:
-
property-version: 1
-
type: kafka
-
version: universal
-
topic: DriverChanges
-
startup-mode: earliest-offset
-
properties:
-
- key:
-
value: zookeeper:2181
-
- key:
-
value: kafka:9092
-
- key: group.id
-
value: testGroup
-
format:
-
property-version: 1
-
type: json
-
schema: "ROW(eventTime TIMESTAMP, taxiId LONG, driverId LONG)"
-
- name: Drivers
-
type: temporal-table
-
history-table: DriverChanges
-
primary-key: taxiId
-
time-attribute: usageStartTime
-
- name: Sink_TenMinPsgCnt -- 表名(外部存储系统 如kakfa的topic,或者mysql的表名
-
type: sink-table -- 表类型 soruce为读入型源表,sink为写入型目标表
-
schema:
-
- name: cntStart --要输出的目标字段名称 类型
-
type: STRING
-
- name: cntEnd
-
type: STRING
-
- name: cnt
-
type: INT
-
update-mode: append
-
connector:
-
property-version: 1
-
type: kafka
-
version: universal
-
topic: Sink_TenMinPsgCnt -- 输出的topic名称
-
properties:
-
- key:
-
value: zookeeper:2181
-
- key:
-
value: kafka:9092
-
- key: group.id
-
value: testGroup
-
format:
-
property-version: 1
-
type: json
-
schema: "ROW(cntStart STRING,cntEnd STRING,cnt INT)" -- 此处为输出的kafka的字段,中间的字段由sql加工别名转换为输出字段,注:字段个数,类型,顺序要与上面schema一摸一样
-
functions: -- 函数定义
-
- name: isInNYC
-
from: class
-
class: .sql_training.
-
- name: toAreaId
-
from: class
-
class: .sql_training.
-
- name: toCoords
-
from: class
-
class: .sql_training.
-
-
==============================================================================
-
**Execution properties**
-
==============================================================================
-
-
Execution properties allow for changing the behavior of a table program.
-
-
execution:
-
planner: blink # using the Blink planner
-
type: streaming # 'batch' or 'streaming' execution
-
result-mode: table # 'changelog' or 'table' presentation of results
-
parallelism: 1 # parallelism of the program
-
max-parallelism: 128 # maximum parallelism
-
min-idle-state-retention: 0 # minimum idle state retention in ms
-
max-idle-state-retention: 0 # maximum idle state retention in ms
-
-
==============================================================================
-
**Deployment properties**
-
==============================================================================
-
-
Deployment properties allow for describing the cluster to which table
-
programs are submitted to.
-
-
deployment:
-
type: standalone # only the 'standalone' deployment is supported
-
response-timeout: 5000 # general cluster communication timeout in ms
-
gateway-address: "" # (optional) address from cluster to gateway
-
gateway-port: 0 # (optional) port from cluster to gateway