上下游开发指南

 

分布式文件系统 HDFS

更新时间 2023-09-06

版本说明

当前仅支持 Flink 1.12 版本。

使用范围

支持将数据流式写入到 HDFS 中。

DDL 定义

CREATE TABLE MyUserTable (
  column_name1 INT,
  column_name2 STRING,
  part_name1 INT,
  part_name2 STRING
) PARTITIONED BY (part_name1, part_name2) WITH (
  'connector' = 'filesystem',          
  'path' = 'hdfs://$namenode:9000/path/to/whatever', 
  'format' = 'csv',                    
  'sink.partition-commit.delay' = '1h',
  'sink.partition-commit.policy.kind'='success-file'
)

HDFS 源表 WITH 参数

参数值 是否必填 默认值 数据类型 描述
connector String 固定值为 filesystem
path String 文件写入的路径,hdfs:// 开头。注意
写入 HDFS 的 user 是 flink,需要提前调整好 path 的权限,可以将 path 赋予 flink 用户权限:
hdfs dfs -chown flink:flink path。
format String 文件类型,取值如下:
  • csv
  • json
  • avro
  • parquet
  • orc
  • debezium-json
  • canal-json
  • raw
  • sink.rolling-policy.file-size 128MB MemorySize 文件最大大小,当达到阈值时,打开新文件写入。
    sink.rolling-policy.rollover-interval 30min Duration 文件最大持续写入时间,当写入时间超过阈值时,打开新文件写入。
    sink.rolling-policy.check-interval 1min Duration 文件检查频率,按照设置的时间间隔,周期性检查是否需要写入到新文件。
    auto-compaction false Boolean 是否启动自动 compaction,如果启动,数据会先被写到临时文件,等 checkpoint 结束,临时文件会被 compact。
    临时文件在 compaction 之前不可见。
    compaction.file-size MemorySize compaction 目标文件大小,默认值同 sink.rolling-policy.file-size。
    sink.partition-commit.trigger process-time String 分区提交策略,当分区创建超过 sink.partition-commit.delay 之后将这个分区提交。可选值包括:
  • process-time:当前时间 > 分区创建的物理时间 + delay 时,触发分区提交。
  • partition-time:watermark 时间 > 从 partition 值里提取的时间 + delay 时,触发分区提交。需要配合 wartermark 使用。
  • sink.partition-commit.delay 0s Duration 分区提交延迟时间。如果是天分区,设置为 1d,如果是小时分区,设置为 1h
    partition.time-extractor.kind default String 分区时间抽取方式。这个配置仅当 sink.partition-commit.trigger 配置为 partition-time 时生效。
  • default:可以配置 partition.time-extractor.timestamp-pattern。
  • custom:自定义提取器 class。
  • partition.time-extractor.class String 分区时间抽取类,当 partition.time-extractor.kind 为 custom 时配置,该类需实现 PartitionTimeExtractor 接口。
    partition.time-extractor.timestamp-pattern String 分区时间戳的抽取格式。
  • 如果时间戳应该从单个分区字段 ‘dt’ 提取,可以配置 ‘$dt’。
  • 如果时间戳应该从多个分区字段中提取,如 ‘year’、‘month’、‘day’ 和 ‘hour’,可以配置 ‘$year-$month-$day $hour:00:00’。
  • 如果时间戳应该从两个分区字段 ‘dt’ 和 ‘hour’ 提取,可以配置 ‘$dt $hour:00:00’。
  • sink.partition-commit.policy.kind String 分区提交策略,用于通知下游应用该分区已完成写入,可以读取。可选值包括:
  • success-file:在分区对应的目录下生成一个 ‘_SUCCESS’ 的文件。
  • custom:用户实现的自定义分区提交策略。
  • sink.partition-commit.policy.class String 分区提交策略类,当 sink.partition-commit.policy.kind 为 custom 时配置,该类需实现 PartitionCommitPolicy 接口。
    sink.partition-commit.success-file.name _SUCCESS String 分区提交成功文件名,当 sink.partition-commit.policy.kind 为 success-file 时配置,默认是_SUCCESS

    代码示例

    -- 向 Flink SQL 中注册 Kafka topic 表
    CREATE TABLE kafka_table (
      user_id STRING,
      order_amount DOUBLE,
      log_ts TIMESTAMP(3),
      WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'source',
      'properties.bootstrap.servers' = '<yourKafkaBrokers>',
      'properties.group.id' = '<yourKafkaConsumerGroupId>',
      'value.format' = 'json'
    );
     
    -- 向 Flink SQL 中注册 HDFS path 表
    CREATE TABLE hdfs_table (
      user_id STRING,
      order_amount DOUBLE,
      dt STRING,
      `hour` STRING
    ) PARTITIONED BY (dt, `hour`) WITH (
      'connector'='filesystem',
      'path'='hdfs://$namenode:9000/path/to/whatever',
      'format'='parquet',
      'sink.partition-commit.delay'='1 h',
      'sink.partition-commit.policy.kind'='success-file'
    );
     
    -- 流式写入数据到 HDFS
    INSERT INTO hdfs_table SELECT user_id, order_amount, DATE_FORMAT(log_ts, 'yyyy-MM-dd'), DATE_FORMAT(log_ts, 'HH') FROM kafka_table;
    
    这篇文档解决了您的问题吗?
    0
    0