上下游开发指南

 

数据库 HBase

更新时间 2023-09-06

版本说明

当前仅支持 Flink 1.12 版本。

使用范围

支持读取和写入 HBase 集群。

DDL 定义

注意

  • HBase 表所有列簇需要声明为 ROW 类型,内部定义列名和类型,可定义多组。
  • 非 ROW 类型的字段会被识别为 rowkey,rowkey 可以定义为任意名字。
-- 在 Flink SQL 里注册 HBase 表 'mytable'
CREATE TABLE hTable (
 rowkey INT,
 family1 ROW<q1 INT>,
 family2 ROW<q2 STRING, q3 BIGINT>,
 family3 ROW<q4 DOUBLE, q5 BOOLEAN, q6 STRING>,
 PRIMARY KEY (rowkey) NOT ENFORCED
) WITH (
 'connector' = 'hbase-2.2',
 'table-name' = 'mytable',
 'zookeeper.quorum' = 'localhost:2181'
 'zookeeper.znode.parent' = '/hbase/cl-8wry3tmz'
);

HBase 源表 WITH 参数

参数值 是否必填 默认值 数据类型 描述
connector String 连接器,目前支持 hbase-2.2
table-name String HBase 表名。
zookeeper.quorum String HBase 的 zookeeper 地址。
zookeeper.znode.parent /hbase String HBase 在 zookeeper 中的根目录,请根据实际情况进行配置。例如:/hbase/{集群id}
null-string-literal null String HBase 字段类型为 String 时,遇到空值则将该字段赋值为当前参数值。

HBase 维表 WITH 参数

参数值 是否必填 默认值 数据类型 描述
connector String 连接器,目前支持 hbase-2.2
table-name String HBase 表名。
zookeeper.quorum String HBase 的 zookeeper 地址。
zookeeper.znode.parent /hbase String HBase 在 zookeeper 中的根目录,请根据实际情况进行配置。例如:/hbase/{集群id}
null-string-literal null String HBase 字段类型为 String 时,遇到空值则将该字段赋值为当前参数值。

HBase 结果表 WITH 参数

参数值 是否必填 默认值 数据类型 描述
connector String 连接器,目前支持 hbase-2.2
table-name String HBase 表名。
zookeeper.quorum String HBase 的 zookeeper 地址。
zookeeper.znode.parent /hbase String HBase 在 zookeeper 中的根目录,请根据实际情况进行配置。例如:/hbase/{集群id}
null-string-literal null String HBase 字段类型为 String 时,遇到空值则将该字段赋值为当前参数值。
sink.buffer-flush.max-size 2mb MemorySize 每次写入请求在内存中缓存的数据量。调大该值有利于提高 HBase 写入性能,但会增加写入延迟。设置为 0 禁用。
sink.buffer-flush.max-rows 1000 Integer 每次写入请求在内存中缓存的数据行数。调大该值有利于提高 HBase 写入性能,但会增加写入延迟。设置为 0 禁用。
sink.buffer-flush.interval 1s Duration 周期性 flush 缓存数据到 HBase 的时间间隔。可以提高 HBase 写入性能,但会增加写入延迟。设置为 0’ 禁用。
sink.parallelism Integer HBase sink 算子的并行度,默认情况下,框架使用上游算 子相同的平行度。

类型映射

HBase 将所有数据存储为字节数组。

Flink 字段类型 HBase 字段类型
CHAR / VARCHAR / STRING byte[] toBytes(String s)
String toString(byte[] b)
BOOLEAN byte[] toBytes(boolean b)
boolean toBoolean(byte[] b)
BINARY / VARBINARY byte[] 按原样返回。
DECIMAL byte[] toBytes(BigDecimal v)
BigDecimal toBigDecimal(byte[] b)
TINYINT new byte[] { val }
bytes[0] // returns first and only byte from bytes
SMALLINT byte[] toBytes(short val)
short toShort(byte[] bytes)
INT byte[] toBytes(int val)
int toInt(byte[] bytes)
BIGINT byte[] toBytes(long val)
long toLong(byte[] bytes)
FLOAT byte[] toBytes(float val)
float toFloat(byte[] bytes)
DOUBLE byte[] toBytes(double val)
double toDouble(byte[] bytes)
DATE 将日期转换成自 1970.01.01 以来的天数,用 int 表示。
TIME 将时间转换成自 00:00:00 以来的毫秒数,用 int 表示。
TIMESTAMP 将时间戳转换成自 1970-01-01 00:00:00 以来的毫秒数,用 long 表示。
ARRAY 不支持
MAP / ULTISET 不支持
ROW 不支持

代码示例

-- 使用 datagen 连接器生成随机数据
CREATE TEMPORARY TABLE datagen_source (
  rowkey INT,
  f1q1 INT,
  f2q1 STRING,
  f2q2 BIGINT,
  f3q1 DOUBLE,
  f3q2 BOOLEAN,
  f3q3 STRING
) with (
  'connector'='datagen'
);

-- 在 Flink SQL 里注册 HBase 表 'demo'
CREATE TEMPORARY TABLE hbase_sink (
  rowkey INT,
  family1 ROW<q1 INT>,
  family2 ROW<q1 STRING, q2 BIGINT>,
  family3 ROW<q1 DOUBLE, q2 BOOLEAN, q3 STRING>,
  PRIMARY KEY (rowkey) NOT ENFORCE
) with (
  'connector'='hbase-2.2',
  'table-name'='demo',
  'zookeeper.quorum'='localhost:2181'
  'zookeeper.znode.parent' = '/hbase/cl-8wry3tmz'
);

-- 从 datagen_source 中读取数据并写入 hbase_sink 中
INSERT INTO hbase_sink
SELECT rowkey, ROW(f1q1), ROW(f2q1, f2q2), ROW(f3q1, f3q2, f3q3) FROM datagen_source;
这篇文档解决了您的问题吗?
0
0