上下游开发指南

 

消息队列 Upsert Kafka

更新时间 2023-09-06

作为 Source,Upsert Kafka 连接器产生 changelog 流,其中每条数据记录代表一个 UPDATE 或 DELETE 事件。

Kafka Topic 中存在相应的 key,则以 UPDATE 操作将 key 的值更新为数据记录中的 value。
Kafka Topic 中不存在相应的 key,则以 INSERT 操作将 key 的值写入 Kafka Topic。
Key 对应的 value 为 null,会被视作 DELETE 操作。
作为 Sink,Upsert Kafka 连接器可以消费 changelog 流。它会将 INSERT/UPDATE_AFTER 数据作为正常的 Kafka 消息写入,并将 DELETE 数据以 value 为空的 Kafka 消息写入(表示对应 key 的消息被删除)。Flink 将根据主键列的值对数据进行分区,从而保证主键上的消息有序,因此同一主键上的更新/删除消息将落在同一分区中。

版本说明

当前仅支持 Flink 1.12 版本。

使用范围

Upsert Kafka 连接器支持以 UPSERT 方式对 Kafka Topic 进行读写操作。

DDL 定义

说明

PRIMARY KEY 必须定义。

CREATE TABLE KafkaTable (
  `ts` TIMESTAMP(3) METADATA FROM 'timestamp',
  `user_id` BIGINT,
  `item_id` BIGINT,
  `behavior` STRING,
  PRIMARY KEY (`user_id`) NOT ENFORCED
) WITH (
  'connector' = 'upsert-kafka',
  ...
 
  'key.format' = 'json',
  'key.json.ignore-parse-errors' = 'true',
 
  'value.format' = 'json',
  'value.json.fail-on-missing-field' = 'false',
  'value.fields-include' = 'EXCEPT_KEY'
)

可用的元数据字段

请参见 Kafka 元数据字段

Upsert Kafka 源表参数

参数值 是否必填 默认值 数据类型 描述
connector String 固定值为 upsert-kafka
topic String 读取的 Topic 名称。
properties.bootstrap.servers String 以逗号分隔的 Kafka brokers 地址列表,格式为 host:port,host:port,host:port
properties.* String 后缀名称必须是 Kafka 配置文档 中定义的配置项。Flink 会将 properties. 前缀移除,并将剩余的键和值传递给 Kafka 客户端。
例如可以通过 'properties.allow.auto.create.topics' = 'false' 来禁用自动创建 Topic。
但是有一些配置不支持,例如 key.deserializervalue.deserializer,因为 Flink 会覆盖它们。
key.format String 在反序列化来自 Kafka 的消息 value 部分时使用的格式。
取值如下:
  • csv
  • json
  • avro
  • avro-confluent
  • raw
    有关更多详细信息,请参考官方文档格式注意
    与常规 Kafka connector 相比,无需指定 key.fields 参数,通过 PRIMARY KEY 来定义。
  • key.fields-prefix String 为所有消息键指定自定义前缀,以避免与消息体格式字段重名,默认前缀为空。
    如果定义了自定义前缀,表 schema 和配置项 key.fields 请使用带前缀的名称。
    当构建消息 key 数据类型时,前缀会被移除,将使用无前缀的名称。注意
    该配置项要求必须将 value.fields-include 配置为 EXCEPT_KEY。
    value.format String 在反序列化来自 Kafka 的消息 value 部分时使用的格式。
    取值如下:
  • csv
  • json
  • avro
  • avro-confluent
  • raw
    有关更多详细信息,请参考官方文档格式
  • value.fields-include ALL String 控制哪些字段应该出现在消息 value 解析出来的数据中。可取值:
  • ALL:消息 value 解析出来的数据将包含 schema 中所有的字段,包括定义为 key.fields 的字段。
  • EXCEPT_KEY:除去 key.fields 定义字段,剩余 schema 定义字段可以用来存放消息 value 解析出来的数据。
  • Upsert Kafka 结果表参数

    参数值 是否必填 默认值 数据类型 描述
    connector String 固定值为 upsert-kafka
    topic String 结果写入的 Topic 名称。
    properties.bootstrap.servers String 以逗号分隔的 Kafka brokers 地址列表,格式为 host:port,host:port,host:port
    key.format String 序列化 Kafka 消息 value 部分时使用的格式。
    取值如下:
  • csv
  • json
  • avro
  • raw
    有关更多详细信息,请参考官方文档格式注意
    与常规 Kafka connector 相比,无需指定 key.fields 参数,通过 PRIMARY KEY 来定义。
  • key.fields-prefix String 为所有消息键指定自定义前缀,以避免与消息体格式字段重名,默认前缀为空。
    如果定义了自定义前缀,表 schema 和配置项 key.fields 请使用带前缀的名称。
    当构建消息 key 数据类型时,前缀会被移除,将使用无前缀的名称。注意
    该配置项要求必须将 value.fields-include 配置为 EXCEPT_KEY。
    value.format String 序列化 Kafka 消息 value 部分时使用的格式。
    取值如下:
  • csv
  • json
  • avro
  • raw
    有关更多详细信息,请参考官方文档格式
  • value.fields-include ALL Enum
  • ALL
  • EXCEPT_KEY
  • 控制哪些字段应该出现在消息 value 解析出来的数据中。可取值:
  • ALL:消息 value 解析出来的数据将包含 schema 中所有的字段,包括定义为 key.fields 的字段。
  • EXCEPT_KEY:除去 key.fields 定义字段,剩余 schema 定义字段可以用来存放消息 value 解析出来的数据。
  • sink.parallelism Integer Kafka sink 算子的并行度,默认情况下,框架使用上游算子相同的平行度

    代码示例

    计算不同区域用户 pv 和 uv 情况。

    CREATE TABLE pageviews_per_region (
      user_region STRING,
      pv BIGINT,
      uv BIGINT,
      PRIMARY KEY (user_region) NOT ENFORCED
    ) WITH (
      'connector' = 'upsert-kafka',
      'topic' = 'pageviews_per_region',
      'properties.bootstrap.servers' = '<yourKafkaBrokers>',
      'key.format' = 'avro',
      'value.format' = 'avro'
    );
     
    CREATE TABLE pageviews (
      user_id BIGINT,
      page_id BIGINT,
      viewtime TIMESTAMP,
      user_region STRING,
      WATERMARK FOR viewtime AS viewtime - INTERVAL '2' SECOND
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'pageviews',
      'properties.bootstrap.servers' = '<yourKafkaBrokers>',
      'format' = 'json'
    );
     
    -- 计算 pv、uv 并插入到 upsert-kafka sink
    INSERT INTO pageviews_per_region
    SELECT
      user_region,
      COUNT(*),
      COUNT(DISTINCT user_id)
    FROM pageviews
    GROUP BY user_region;
    
    这篇文档解决了您的问题吗?
    0
    0