数据库 PostgreSQL
版本说明
当前仅支持 Flink 1.12 版本。
使用范围
使用 JDBC 驱动程序从 PostgreSQL 中读取数据或将数据写入其中。
支持作为数据源、维表、数据目的。
DDL 定义
CREATE TABLE pg_table (
id BIGINT,
name STRING,
age INT,
status BOOLEAN,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://localhost:5433/demo',
'table-name' = 'users',
'username' = 'root',
'password' = '123456'
);
PostgreSQL 源表 WITH 参数
参数值 | 是否必填 | 默认值 | 数据类型 | 描述 |
---|---|---|---|---|
connector | 是 | 无 | String | 连接器,固定值为 jdbc 。 |
url | 是 | 无 | String | PostgreSQL 数据库 JDBC URL。 |
table-name | 是 | 无 | String | PostgreSQL 表名。 |
driver | 否 | 无 | String | JDBC 驱动程序的类名,如果未设置,将自动从 url 获取。 |
username | 是 | 无 | String | PostgreSQL 数据库用户名。 |
password | 是 | 无 | String | PostgreSQL 数据库密码。 |
scan.partition.column | 否 | 无 | Integer | 指定对输入数据进行分区扫描的列名。该列必须是数值类型、日期类型或时间戳类型。 |
scan.partition.num | 否 | 无 | Integer | 分区扫描启用后,指定分区数。 |
scan.partition.lower-bound | 否 | 无 | Integer | 分区扫描启用后,指定第一个分区的最小值。 |
scan.partition.upper-bound | 否 | 无 | Integer | 分区扫描启用后,指定最后一个分区的最大值。 |
scan.fetch-size | 否 | 0 | Integer | 每次从数据库读取时,批量获取的行数。 |
scan.auto-commit | 否 | true | Boolean | 自动提交标志,决定每个语句是否在事务中自动提交。 |
说明
scan.partition.lower-bound
和scan.partition.upper-bound
仅用于决定分区步长,而不是用于过滤表中的行。所以表中的所有行都会被分区并返回。- 分区扫描功能可以加速读取数据,每个子任务可以读取自己的分区。使用该功能时,四个
scan.partition
开头的参数都必须指定。
PostgreSQL 维表 WITH 参数
参数值 | 是否必填 | 默认值 | 数据类型 | 描述 |
---|---|---|---|---|
connector | 是 | 无 | String | 连接器,固定值为 jdbc 。 |
url | 是 | 无 | String | PostgreSQL 数据库 JDBC URL。 |
table-name | 是 | 无 | String | PostgreSQL 表名。 |
driver | 否 | 无 | String | JDBC 驱动程序的类名,如果未设置,将自动从 url 获取。 |
username | 是 | 无 | String | PostgreSQL 数据库用户名。 |
password | 是 | 无 | String | PostgreSQL 数据库密码。 |
lookup.cache.max-rows | 否 | 无 | Integer | Lookup 缓存中最多缓存的数据行数。超过此值,最旧的行将过期。默认情况下禁用查找缓存。 |
lookup.cache.ttl | 否 | 无 | Duration | Lookup 缓存中每条记录最长的缓存时间。超过此时间,最旧的行将过期。默认情况下禁用 Lookup 缓存。 |
lookup.max-retries | 否 | 3 | Integer | 数据库查询失败时,最多重试的次数。 |
说明
- Lookup 缓存提升维表读取性能,目前仅支持同步读取模式。
- 默认情况下,Lookup 缓存未启用,所有请求需要请求数据库。
- 通过设置 lookup.cache.max-rows 和 lookup.cache.ttl 可以启用 Lookup 缓存,这时每个进程(即 TaskManager)都会持有一份缓存。Flink 会先查找缓存,缓存未命中时会向数据库发送请求,并根据返回的值更新缓存。
PostgreSQL 结果表 WITH 参数
参数值 | 必填 | 默认值 | 数据类型 | 描述 |
---|---|---|---|---|
connector | 是 | 无 | String | 连接器,固定值为 jdbc 。 |
url | 是 | 无 | String | PostgreSQL 数据库 JDBC URL。 |
table-name | 是 | 无 | String | PostgreSQL 表名。 |
driver | 否 | 无 | String | JDBC 驱动程序的类名,如果未设置,将自动从 url 获取。 |
username | 是 | 无 | String | PostgreSQL 数据库用户名。 |
password | 是 | 无 | String | PostgreSQL 数据库密码。 |
sink.buffer-flush.max-rows | 否 | 100 | Integer | 批量输出时,最多缓存的数据行数。设置为 0 表示禁用输出缓存。 |
sink.buffer-flush.interval | 否 | 1s | Duration | 每隔多久异步线程自动批量输出数据。设置为 0 表示禁用自动异步输出。 |
sink.max-retries | 否 | 3 | Integer | 数据库写入失败时,最多重试的次数。 |
说明
- 如果在 DDL 上定义了主键,则 sink 以 upsert 模式与外部系统交换 UPDATE/DELETE 消息,否则,它以 append 模式运行,不支持消费 UPDATE/DELETE 消息。
- 对于 PostgreSQL 表,Upsert 功能的实现采用 INSERT .. ON CONFLICT .. DO UPDATE SET .. 语法。
类型映射
PostgreSQL 字段类型 | Flink SQL 字段类型 |
---|---|
SMALLINT INT2 SMALLSERIAL SERIAL2 |
SMALLINT |
INTEGER SERIAL |
INT |
BIGINT BIGSERIAL |
BIGINT |
BIGINT | BIGINT |
REAL FLOAT4 |
FLOAT |
FLOAT8 DOUBLE PRECISION |
DOUBLE |
NUMERIC(p, s) DECIMAL(p, s) |
DECIMAL(p, s) |
BOOLEAN | BOOLEAN |
DATE | DATE |
TIME [(p)] [WITHOUT TIMEZONE] | TIME [(p)] [WITHOUT TIMEZONE] |
TIMESTAMP [(p)] [WITHOUT TIMEZONE] | TIMESTAMP [(p)] [WITHOUT TIMEZONE] |
CHAR(n) CHARACTER(n) VARCHAR(n) CHARACTER VARYING(n) TEXT |
STRING |
BYTEA | BYTES |
ARRAY | ARRAY |
代码示例
-- 利用 datagen 随机生成数据源数据
CREATE TEMPORARY TABLE datagen_source(
name STRING,
age INT
) WITH (
'connector' = 'datagen'
)
-- 用 Flink SQL 注册 PostgreSQL 表 students
CREATE TEMPORARY TABLE pg_sink(
name VARCHAR,
age INT
) WITH (
'connector' = 'jdbc'
'url' = 'jdbc://postgresql://localhost:5433/mydb',
'table-name' = 'students',
'username' = 'root',
'password' = '123456'
)
INSERT INTO pg_sink
SELECT * FROM datagen_source;