Mysql数据如何实时同步到Es?这一篇就够啦~

时间:2024-10-28 07:47:29

前言

我们一般会使用Mysql用来存储数据,用Es来做全文检索和特殊查询,那么如何将数据优雅的从Mysql同步到Es呢?我们一般有以下几种方式:

1.双写。在代码中先向Mysql中写入数据,然后紧接着向Es中写入数据。这个方法的缺点是代码严重耦合,需要手动维护Mysql和Es数据关系,非常不便于维护。

2.发MQ,异步执行。在执行完向Mysql中写入数据的逻辑后,发送MQ,告诉消费端这个数据需要写入Es,消费端收到消息后执行向Es写入数据的逻辑。这个方式的优点是Mysql和Es数据维护分离,开发Mysql和Es的人员只需要关心各自的业务。缺点是依然需要维护发送、接收MQ的逻辑,并且引入了MQ组件,增加了系统的复杂度。

3.使用Datax进行全量数据同步。这个方式优点是可以完全不用写维护数据关系的代码,各自只需要关心自己的业务,对代码侵入性几乎为零。缺点是Datax是一种全量同步数据的方式,不使用实时同步。如果系统对数据时效性不强,可以考虑此方式。

4.使用Canal进行实时数据同步。这个方式具有跟Datax一样的优点,可以完全不用写维护数据关系的代码,各自只需要关心自己的业务,对代码侵入性几乎为零。与Datax不同的是Canal是一种实时同步数据的方式,对数据时效性较强的系统,我们会采用Canal来进行实时数据同步。

那么就让我们来看看Canal是如何使用的。

官网

/alibaba/can…

简介

canal [kə'næl] ,译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费

早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。

基于日志增量订阅和消费的业务包括

  • 数据库镜像
  • 数据库实时备份
  • 索引构建和实时维护(拆分异构索引、倒排索引等)
  • 业务 cache 刷新
  • 带业务逻辑的增量数据处理

当前的 canal 支持源端 MySQL 版本包括 5. , 5. , 5. , 5. , 8.

MySQL主备复制原理

  • MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)
  • MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
  • MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据

canal工作原理

  • canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
  • MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
  • canal 解析 binary log 对象(原始为 byte 流)

2.开启MySQL Binlog

  • 对于自建 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式, 中配置如下
[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
复制代码
注意:针对阿里云 RDS for MySQL , 默认打开了 binlog , 并且账号默认具有 binlog dump 权限 , 不需要任何权限或者 binlog 设置,可以直接跳过这一步
复制代码
  • 授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant
CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;
复制代码

注意:Mysql版本为时启动canal可能会出现“caching_sha2_password Auth failed”错误,这是因为创建用户时默认的密码加密方式为caching_sha2_password,与canal的方式不一致,所以需要将canal用户的密码加密方式修改为mysql_native_password

ALTER USER 'canal'@'%' IDENTIFIED WITH mysql_native_password BY 'canal'; #更新一下用户密码
FLUSH PRIVILEGES; #刷新权限
复制代码

3.安装Canal

3.1 下载Canal

点击下载地址,选择版本后点击文件下载

3.2 修改配置文件

打开目录下conf/example/文件,主要修改以下内容

## mysql serverId,不要和 mysql 的 server_id 重复
 = 10
#position info,需要改成自己的数据库信息
 = 127.0.0.1:3306 
#username/password,需要改成自己的数据库信息,与刚才添加的用户保持一致
 = canal  
 = canal
复制代码

3.3 启动和关闭

#进入文件目录下的bin文件夹
#启动
sh 
#关闭
sh 
复制代码

集成Canal

4.1 Canal数据结构

4.2 引入依赖

<!-- canal-client -->
<dependency>
    <groupId></groupId>
    <artifactId></artifactId>
    <version>1.1.6</version>
</dependency>

<!-- 高版本canal需要引入这个依赖 -->
<!-- canal-protocol -->
<dependency>
    <groupId></groupId>
    <artifactId></artifactId>
    <version>1.1.6</version>
</dependency>

<!-- Elasticsearch -->
<dependency>
    <groupId></groupId>
    <artifactId>elasticsearch-java</artifactId>
    <version>8.4.3</version>
</dependency>

<!-- -api -->
<dependency>
    <groupId></groupId>
    <artifactId>-api</artifactId>
    <version>2.0.1</version>
</dependency>
复制代码

4.3

custom:
  elasticsearch:
    host: localhost    #主机
    port: 9200         #端口
    username: elastic  #用户名
    password: 3bf24a76 #密码
复制代码

4.4 EsClient

@Setter
@ConfigurationProperties(prefix = "")
@Configuration
public class EsClient {

    /**
     * 主机
     */
    private String host;

    /**
     * 端口
     */
    private Integer port;

    /**
     * 用户名
     */
    private String username;

    /**
     * 密码
     */
    private String password;


    @Bean
    public ElasticsearchClient elasticsearchClient() {
        CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
        (
                , new UsernamePasswordCredentials(username, password));

        // Create the low-level client
        RestClient restClient = (new HttpHost(host, port))
                .setHttpClientConfigCallback(httpAsyncClientBuilder ->
                        (credentialsProvider))
                .build();
        // Create the transport with a Jackson mapper
        RestClientTransport transport = new RestClientTransport(
                restClient, new JacksonJsonpMapper());
        // Create the transport with a Jackson mapper
        return new ElasticsearchClient(transport);
    }
}
复制代码

4.5 Music实体类

@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class Music {

    /**
     * id
     */
    private String id;

    /**
     * 歌名
     */
    private String name;

    /**
     * 歌手名
     */
    private String singer;

    /**
     * 封面图地址
     */
    private String imageUrl;

    /**
     * 歌曲地址
     */
    private String musicUrl;

    /**
     * 歌词地址
     */
    private String lrcUrl;

    /**
     * 歌曲类型id
     */
    private String typeId;

    /**
     * 是否被逻辑删除,1 是,0 否
     */
    private Integer isDeleted;

    /**
     * 创建时间
     */
    @JsonFormat(shape = , pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
    private Date createTime;

    /**
     * 更新时间
     */
    @JsonFormat(shape = , pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
    private Date updateTime;

}
复制代码

4.6 CanalClient

@Slf4j
@Component
public class CanalClient {

    @Resource
    private ElasticsearchClient client;


    /**
     * 实时数据同步程序
     *
     * @throws InterruptedException
     * @throws InvalidProtocolBufferException
     */
    public void run() throws InterruptedException, IOException {
        CanalConnector connector = (new InetSocketAddress(
                "localhost", 11111), "example", "", "");

        while (true) {
            //连接
            ();
            //订阅数据库
            ("cloudmusic_music.music");
            //获取数据
            Message message = (100);

            List<> entryList = ();
            if ((entryList)) {
                //没有数据,休息一会
                (2);
            } else {
                for ( entry : entryList) {
                    //获取类型
                     entryType = ();

                    //判断类型是否为ROWDATA
                    if ((entryType)) {
                        //获取序列化后的数据
                        ByteString storeValue = ();
                        //反序列化数据
                         rowChange = (storeValue);
                        //获取当前事件操作类型
                         eventType = ();
                        //获取数据集
                        List<> rowDataList = ();

                        if (eventType == ) {
                            ("------新增操作------");

                            List<Music> musicList = new ArrayList<>();
                            for ( rowData : rowDataList) {
                                (createMusic(()));
                            }
                            //es批量新增文档
                            index(musicList);
                            //打印新增集合
                            ((()));
                        } else if (eventType == ) {
                            ("------更新操作------");

                            List<Music> beforeMusicList = new ArrayList<>();
                            List<Music> afterMusicList = new ArrayList<>();
                            for ( rowData : rowDataList) {
                                //更新前
                                (createMusic(()));
                                //更新后
                                (createMusic(()));
                            }
                            //es批量更新文档
                            index(afterMusicList);
                            //打印更新前集合
                            ("更新前:{}", (()));
                            //打印更新后集合
                            ("更新后:{}", (()));
                        } else if (eventType == ) {
                            //删除操作
                            ("------删除操作------");

                            List<String> idList = new ArrayList<>();
                            for ( rowData : rowDataList) {
                                for ( column : ()) {
                                    if("id".equals(())) {
                                        (());
                                        break;
                                    }
                                }
                            }
                            //es批量删除文档
                            delete(idList);
                            //打印删除id集合
                            ((()));
                        }
                    }
                }
            }
        }
    }

    /**
     * 根据canal获取的数据创建Music对象
     *
     * @param columnList
     * @return
     */
    private Music createMusic(List<> columnList) {
        Music music = new Music();
        DateTimeFormatter formatter = ("yyyy-MM-dd HH:mm:ss");

        for ( column : columnList) {
            switch (()) {
                case "id" -> (());
                case "name" -> (());
                case "singer" -> (());
                case "image_url" -> (());
                case "music_url" -> (());
                case "lrc_url" -> (());
                case "type_id" -> (());
                case "is_deleted" -> ((()));
                case "create_time" ->
                        ((((), formatter).atZone(()).toInstant()));
                case "update_time" ->
                        ((((), formatter).atZone(()).toInstant()));
                default -> {
                }
            }
        }

        return music;
    }

    /**
     * es批量新增、更新文档(不存在:新增, 存在:更新)
     * 
     * @param musicList 音乐集合
     * @throws IOException
     */
    private void index(List<Music> musicList) throws IOException {
         br = new ();

        (music -> br
                .operations(op -> op
                        .index(idx -> idx
                                .index("music")
                                .id(())
                                .document(music))));

        (());
    }

    /**
     * es批量删除文档
     * 
     * @param idList 音乐id集合
     * @throws IOException
     */
    private void delete(List<String> idList) throws IOException {
         br = new ();

        (id -> br
                .operations(op -> op
                        .delete(idx -> idx
                                .index("music")
                                .id(id))));

        (());
    }

}
复制代码

4.7 ApplicationContextAware

@Component
public class ApplicationContextUtil implements ApplicationContextAware {

    private static ApplicationContext applicationContext;

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
         = applicationContext;
    }

    public static <T> T getBean (Class<T> classType) {
        return (classType);
    }

}
复制代码

4.8 main

@Slf4j
@SpringBootApplication
public class CanalApplication {
    public static void main(String[] args) throws InterruptedException, IOException {
        (, args);
        ("数据同步程序启动");

        CanalClient client = ();
        ();
    }
}
复制代码

5.总结

那么以上就是Canal组件的介绍啦,希望大家都能有所收获~