Flink 支持连接多种数据库,如MySQL、PostgresSQL、Derby。Derby通常用于测试。从关系数据库数据类型到Flink SQL数据类型的字段数据类型映射如下表所示,映射表可以帮助在Flink中轻松定义JDBC表。
MySQL type | PostgreSQL type | Flink SQL type |
---|---|---|
TINYINT | TINYINT | |
SMALLINT, TINYINT UNSIGNED |
SMALLINT, INT2, SMALLSERIAL, SERIAL2 |
SMALLINT |
INT, MEDIUMINT, SMALLINT UNSIGNED |
INTEGER, SERIAL |
INT |
BIGINT, INT UNSIGNED |
BIGINT, BIGSERIAL |
BIGINT |
BIGINT UNSIGNED | DECIMAL(20,0) | |
BIGINT | BIGINT | BIGINT |
FLOAT | REAL, FLOAT4 |
FLOAT |
DOUBLE, DOUBLE PRECISION |
FLOAT8, DOUBLE PRECISION |
DOUBLE |
NUMERIC(p, s), DECIMAL(p, s) |
NUMERIC(p, s), DECIMAL(p, s) |
DECIMAL(p, s) |
BOOLEAN, TINYINT(1) |
BOOLEAN | BOOLEAN |
DATE | DATE | DATE |
TIME [§] | TIME [§] [WITHOUT TIMEZONE] | TIME [§] [WITHOUT TIMEZONE] |
DATETIME [§] | TIMESTAMP [§] [WITHOUT TIMEZONE] | TIMESTAMP [§] [WITHOUT TIMEZONE] |
CHAR(n), VARCHAR(n), TEXT |
CHAR(n), CHARACTER(n), VARCHAR(n), CHARACTER VARYING(n), TEXT |
STRING |
BINARY, VARBINARY, BLOB |
BYTEA | BYTES |
ARRAY | ARRAY |
在API中,Flink尝试使用反射从类信息中自动提取数据类型,以避免重复的手动模式工作。但是,反射性地提取数据类型并不总是成功的,因为可能会丢失逻辑信息。因此,可能需要在类或字段声明附近添加其他信息以支持提取逻辑。下表列出了无需进一步信息即可隐式映射到数据类型的类。
注意:如果您打算在 Scala 中实现类,建议使用包装类型(例如)而不是 Scala 的原语。Scala的原语(例如Intor Double)被编译为JVM 原语(例如 int/double)并产生NOT NULL如下表所示的语义。此外,在泛型中使用的Scala原语(例如[Int, Double])在编译期间会被删除,并导致类信息类似于[, ]。
Java Class | Data Type |
---|---|
STRING | |
BOOLEAN | |
boolean | BOOLEAN NOT NULL |
TINYINT | |
byte | TINYINT NOT NULL |
SMALLINT | |
short | SMALLINT NOT NULL |
INT | |
int | INT NOT NULL |
BIGINT | |
long | BIGINT NOT NULL |
FLOAT | |
float | FLOAT NOT NULL |
DOUBLE | |
double | DOUBLE NOT NULL |
DATE | |
DATE | |
TIME(0) | |
TIME(9) | |
TIMESTAMP(9) | |
TIMESTAMP(9) | |
TIMESTAMP(9) WITH TIME ZONE | |
TIMESTAMP_LTZ(9) | |
INVERVAL SECOND(9) | |
INTERVAL YEAR(4) TO MONTH | |
byte[] | BYTES |
T[] | ARRAY |
<K, V> | MAP<K, V> |
structured type T | anonymous structured type T |
参考
/flink/flink-docs-release-1.14/zh/docs/dev/table/types/
/flink/flink-docs-release-1.14/zh/docs/connectors/table/jdbc/