实时同步 MySQL 数据到 Elasticsearch

 

数据开发

更新时间 2023-09-06

创建 SQL 作业

  1. 在目标工作空间选择数据开发 > 作业开发,进入作业开发页面。

  2. 点击创建作业,进入创建作业页面。

    选择模式
  3. 选择 SQL 模式

  4. 点击下一步,填写作业名称,并选择作业依赖的计算集群。

    填写信息
  5. 配置完成后,点击确定,开始创建作业。

    作业创建完成后,默认进入该作业的开发面板。

    填写信息

开发 SQL 作业

  1. 在开发面板中输入以下 SQL 代码,数据源相关参数请根据代码中的注释进行修改。

    说明

    • 以下 SQL 代码用于建立 flink table 与数据源之间的映射关系;本实践需要提前在 Mysql 中创建好 students 表,并且 students 表包含 id、score、name 列。
    • 更多相关参数请参见 MySQL CDCElasticsearch
    DROP TABLE IF EXISTS students;       --删除 flink table 映射
    CREATE TABLE students(               --建立 flink table 到 mysql table 的映射关系
        id INT,
        name STRING,
        score    INT,
        PRIMARY KEY (id) NOT ENFORCED
    ) WITH (
        'connector' = 'mysql-cdc',       --写入数据到 mysql-cdc
        'hostname' = '192.168.100.2',    --mysql 连接 IP 地址
        'port' = '3306',                 --mysql 连接端口号
        'username' = 'test01',           --mysql 连接用户名
        'password' = 'Gx12345678@',      --mysql 连接密码
        'database-name' = 'demo',        --mysql 数据库名称
        'table-name' = 'students'        --mysql 数据库表名称
    );
    
    DROP TABLE IF EXISTS es_stu;
    CREATE TABLE es_stu(
        id INT,
        name STRING,
        score INT,
        PRIMARY KEY (id) NOT ENFORCED --定义主键规则,开启主键是 upsert 模式;否则是 append 模式。本示例使用 append 模式。
    )WITH(
        'connector' = 'elasticsearch-7', --输出到es7
        'hosts' = 'http://192.168.100.19:9200',  --es连接地址
        'index' = 'stu',                         --es的index名;无需提前创建,您可以在此处自定义
        'sink.flush-on-checkpoint' = 'true',     --checkpoint时不允许批量写入
        'sink.bulk-flush.max-actions' = '50',    --每批次最多的条数
        'sink.bulk-flush.max-size' = '10mb',     --每批次累计最大大小
        'sink.bulk-flush.interval' = '1000',     --批量写入的间隔(ms)
        'connection.max-retry-timeout' = '1000', --每次请求的最大超时时间(ms)
        'format' = 'json'                        --输出数据格式,目前只支持 'json'
    );
    
    INSERT INTO es_stu SELECT id,name,score FROM students;
    
  2. 点击语法检查,对代码进行语法检查。

  3. 点击保存,保存修改,防止代码丢失。

配置作业调度

  1. 选择已创建好的作业,点击右侧的调度设置,进入调度配置页面。

  2. 设置调度策略。

    本实践选择执行一次发布后立即执行。若您需要配置为其他调度策略,请参见配置作业调度

    配置作业调度
  3. 设置完成后,点击确定

配置运行参数

  1. 选择已创建好的作业,点击右侧的运行参数,进入运行参数配置页面。

    运行参数
  2. 配置运行参数。

    • 计算集群:在该页面可以查看和修改运行作业的计算集群。
    • 并行度:配置作业的并发数,不能为 0,默认为 1
    • 依赖资源:选择作业运行所需的函数包以及自定义 Connector 包。本实践无需选择依赖资源。
  3. 配置完成后,点击确定

发布作业

完成作业调度和运行参数配置后,您才可以发布作业。

  1. 点击发布,弹出发布调度任务对话框。

    发布作业
  2. 填写作业描述信息。

  3. 根据实际情况选择是否终止当前作业正在运行中的实例

    如果终止当前作业正在运行中的实例,运行中的作业实例会立即被强制终止。

  4. 点击发布,发布作业。发布作业时也会对代码进行语法检查,需要一定的时间,请耐心等待。

    作业发布成功后,您可以前往运维中心查看已发布作业和作业实例。

    说明

    本实践的示例是一个实时持续的过程,所以作业实例的状态会一直显示运行中,除非您手动终止该作业实例。

这篇文档解决了您的问题吗?
0
0