使用 Apache SeaTunnel 在 MySQL 和 HTTP 之间的数据同步示例

时间:2024-10-11 07:02:22

随着现代企业数据量的不断增长,跨系统、跨平台的数据同步需求变得愈发重要。

在实际的业务场景中,开发者常常需要将数据从 MySQL 同步到其他系统,或者从不同的数据源同步回 MySQL。Apache SeaTunnel 作为一款高效的分布式数据集成平台,支持批处理和流处理,能够灵活地完成这些任务。

本文将详细介绍如何使用 Apache SeaTunnel 实现以下几种常见的数据同步场景:

  • MySQL 同步到 HTTP 接口
  • MySQL 同步到 MySQL
  • HTTP 接口同步到 MySQL
  • MySQL-CDC 同步到 HTTP 接口

我们将逐一展示这些同步场景的配置方式,并提供清晰的代码示例,帮助开发者快速掌握 SeaTunnel 在不同场景下的应用。

官方文档参考

SeaTunnel JDBC Source Connector

前置准备

在开始之前,请确保已经下载了对应版本的 MySQL JDBC 驱动 mysql-connector-java-xxx.jar,并将其放置在 SeaTunnel 的安装目录下的 lib 文件夹中。

可以从以下链接获取:https://mvnrepository.com/artifact/mysql/mysql-connector-java

对于使用 Spark 或 Flink 的 SeaTunnel 任务,也需要将该 JAR 包复制到相应的目录下:

  • Spark: $SPARK_HOME/jars/
  • Flink: $FLINK_HOME/lib/

接下来,我们将逐一展示四种数据同步的配置和代码示例。

MySQL 同步到 HTTP 接口

在此场景中,我们将 MySQL 数据表中的信息同步到指定的 HTTP 接口。

这里假设我们从 user_info 表中查询数据并通过 HTTP POST 请求将其发送到目标 API。

env {
  execution.parallelism = 2
  job.mode = "BATCH"  # MySQL 作为数据源,只支持批量同步
}

source {
   jdbc {
     url =  "jdbc:mysql://172.27.10.22:6033/test"
     driver = "com.mysql.cj.jdbc.Driver"
     connection_check_timeout_sec = 100
     user = "root"
     password = "root"
     query = "SELECT * FROM user_info ORDER BY create_time LIMIT 1"
     result_table_name = "user_info_out"
  }
}

transform {
    Sql {
      source_table_name = "user_info_out"
      result_table_name = "user_info_sink"
      query = "select info, user_name, age from user_info_out"
    }
}

sink {
  Console {
    source_table_name = "user_info_sink"
  }

  http {
    source_table_name = "user_info_sink"
    url = "https://test.test.com:8080/api/user/test"
    method = "POST"
    headers = {Accept="application/json", Content-Type="application/json;charset=utf-8"}
  }
}

MySQL 同步到 MySQL

在此示例中,我们将从一个 MySQL 数据库中提取数据,并将其同步到另一个 MySQL 数据库。此场景适用于多个数据库实例之间的数据迁移或备份。

env {
  execution.parallelism = 2
  job.mode = "BATCH"
}

source {
    Jdbc {
        url =  "jdbc:mysql://172.27.10.22:6033/test"
        driver = "com.mysql.cj.jdbc.Driver"
        connection_check_timeout_sec = 10
        user = "root"
        password = "root"
        query = "SELECT `name`,`score` FROM `user`"
        result_table_name = "user_info"
    }
}

sink {
  Jdbc {
        source_table_name = "user_info"
        url =  "jdbc:mysql://192.27.10.22:16033/temp_user"
        driver = "com.mysql.cj.jdbc.Driver"
        connection_check_timeout_sec = 100
        user = "root"
        password = "root"
        query = "INSERT INTO `student`(`name`, `score`) VALUES(?, ?)"
  }
}

HTTP 接口同步到 MySQL

本示例展示了如何将 HTTP 接口中的数据同步到 MySQL 数据库。

这在从第三方 API 获取数据并将其存储到本地数据库的场景中非常实用。

env {
  execution.parallelism = 2
  job.mode = "STREAMING"  # HTTP 作为数据源,支持批量和流式模式
  checkpoint.interval = 10000  # 执行间隔(毫秒)
}

source {
  Http {
    url = "https://test.test.com:8080/api/test"
    method = "GET"
    format = "json"
    headers = {Authorization="Bearer example-token", language="zh"}
    params = {userId="fa438165b2c84d8dbe9175d152718437"}
    content_field = "$.content.*"
    schema = {
      fields {
        userId = string
        age = int
        phone = string
        name = string
      }
    }
    result_table_name = "user_info"
  }
}

transform {
    Sql {
      source_table_name = "user_info"
      result_table_name = "user_info_out"
      query = "SELECT name as userName, userId, age, phone FROM user_info"
    }
}

sink {
  Jdbc {
     url = "jdbc:mysql://172.27.10.22:26033/test"
     driver = "com.mysql.cj.jdbc.Driver"
     connection_check_timeout_sec = 100
     user = "root"
     password = "root"
     source_table_name = "user_info_out"
     query = "INSERT INTO `user_bak`(`userName`, `userId`, `age`, `phone`) VALUES (?, ?, ?, ?)"
  }
}

MySQL-CDC 同步到 HTTP 接口

MySQL-CDC(Change Data Capture)允许实时捕获数据库中的数据变化。

在此示例中,我们将 MySQL 数据库中的变化通过 CDC 机制捕获,并将其同步到 HTTP 接口。

env {
  execution.parallelism = 2
  job.mode = "STREAMING"  # MySQL-CDC 支持批量和流式模式
  checkpoint.interval = 10000  # 执行间隔(毫秒)
}

source {
    MySQL-CDC {
      catalog = {
        factory = MySQL
      }
      base-url = "jdbc:mysql://${mysql_ip_port}/test?serverTimezone=Asia/Shanghai&characterEncoding=utf8&useSSL=false"
      username = ${mysql_username}  # 使用变量替换
      password = ${mysql_pass}  # 使用变量替换
      table-names = ["test.user"]
      startup.mode = "initial"
      result_table_name = "user_info_out"
      table-names-config = [
        {
          table = "test.user"
          primaryKeys = ["user_id"]
        }
      ]
    }
}

transform {
    FilterRowKind {
      source_table_name = "user_info_out"
      result_table_name = "user_info_sink"
      include_kinds = ["UPDATE_AFTER", "INSERT"]
    }
}

sink {
  http {
    source_table_name = "user_info_sink"
    url = "https://test.test.com:28080/api/user/test"
    method = "POST"
    headers = {Accept="application/json", Content-Type="application/json;charset=utf-8"}
  }
}

总结

通过 Apache SeaTunnel 的强大数据集成能力,开发者可以轻松实现多种数据源之间的同步操作。无论是数据库与 API 之间的数据传输,还是跨数据库的数据迁移,SeaTunnel 都为开发者提供了灵活、高效的解决方案。

希望通过本文的示例,您能够快速上手并在实际项目中 应用 SeaTunnel 进行复杂的数据同步任务。

SeaTunnel 提供的流处理和批处理模式极大地满足了多种场景下的数据处理需求,使得跨平台、跨数据源的数据集成变得更加简单、高效。

本文由 白鲸开源科技 提供发布支持!