es与mysql如何通过logstash实现数据同步

时间:2024-04-04 19:15:27

 

文件在E:(我的网盘\我的笔记)\ 学习文档子目录压缩\框架\微服务相关\分布式\日志收集\ELK

 

es与mysql如何通过logstash实现数据同步

 

es与mysql如何通过logstash实现数据同步

es与mysql如何通过logstash实现数据同步的原理如图所示:logstash向mysql发sql输入收集es需要的数据库信息时,会记录下此sql记录的updat_time的最大值保存下来,下一次再发sql的条件就是有没有记录 >updat_timed的数据,有就将其更到logstash,logstash发json到es

 

logstash mysql 同步到 elasticsearch

 

Logstash是一个开源数据收集引擎,具有实时管道功能。Logstash可以动态地将来自不同数据源的数据统一起来,并将数据标准化到你所选择的目的地

 

1.上传logstash-6.4.3.tar.gz到服务中

2.tar –zxvf  logstash-6.4.3.tar.gz

3.cd logstash-6.4.3

4. bin/logstash-plugin install logstash-input-jdbc

5. bin/logstash-plugin install logstash-output-elasticsearch

 

 

上传mysql.conf(包含logstash所需的intpu,output配置信息)到我docker安装的刚才安装的logstash-6.4.3目录下

mysql.conf相关配置文件说明

jdbc_driver_library: jdbc mysql 驱动的路径,在上一步中已经下载

jdbc_driver_class: 驱动类的名字,mysql 填 com.mysql.jdbc.Driver 就好了

jdbc_connection_string: mysql 地址

jdbc_user: mysql 用户

jdbc_password: mysql 密码

schedule: 执行 sql 时机,类似 crontab 的调度

statement: 要执行的 sql,以 “:” 开头是定义的变量,可以通过 parameters 来设置变量,这里的 sql_last_value 是内置的变量,表示上一次 sql 执行中 update_time 的值,这里 update_time 条件是 >= 因为时间有可能相等,没有等号可能会漏掉一些增量

use_column_value: 使用递增列的值

tracking_column_type: 递增字段的类型,numeric 表示数值类型, timestamp 表示时间戳类型

tracking_column: 递增字段的名称,这里使用 update_time 这一列,这列的类型是 timestamp

last_run_metadata_path: 同步点文件,这个文件记录了上次的同步点,重启时会读取这个文件,这个文件可以手动修改

 

相关配置参考附件

 

 

 

Crontab https://tool.lu/crontab/  注意:Crontab表达式以分为单位

上传mysql 驱动jar到如下地址  /usr/local/sql/ mysql-connector-java-5.1.46.jar

这个地址对应dbc_driver_library: jdbc mysql 驱动的路径

 

然后docker启动 es,和Kibana--需等es启动完毕即http://宿主机IP:9200/可以登陆后再启动Kibana

 

再cd 到mysql.conf 的父目录即logstash-6.4.3目录下执行如下命令,启动

./bin/logstash -f mysql.conf 启动,完成后

 

然后登陆Kibana--http://宿主机IP:5601/app/kibana 就可以看到单文件效果实现es和mysql同步效果

如下图,多文件方式同步ES数据

 

一个 logstash 实例可以借助 pipelines 机制同步多个表,只需要写多个配置文件就可以了,假设我们有三个表 table1 和 table2和table3,对应两个配置文件mysql1.conf和 mysql2.conf,mysql3.conf,那么上传mysql1.conf, mysql2.conf, mysql3.conf到/usr/local/logstash-6.4.3/config/目录下

在 config/pipelines.yml --/usr/local/logstash-6.4.3/config/pipelines.yml 最下面中配置

--path.confg:写绝对路径,pipeline.id: table1 这个table1这个名字可以随便写

- pipeline.id: table1

  path.config: " /usr/local/logstash-6.4.3/config/mysql1.conf"

- pipeline.id: table2

  path.config: "/usr/local/logstash-6.4.3/config/mysql2.conf "

- pipeline.id: table3

  path.config: "/usr/local/logstash-6.4.3/config/mysql3.conf "

然后docker启动 es,和Kibana--需等es启动完毕即http://宿主机IP:9200/可以登陆后再启动Kibana

再进入/usr/local/logstash-6.4.3 目录下执行如下命令进行启动

./bin/logstash启动,完成后

 

然后登陆Kibana--http://宿主机IP:5601/app/kibana 就可以看到单文件效果实现es和mysql同步效果\logstash-input-jdbc原理

使用 logstash-input-jdbc 插件读取 mysql 的数据,这个插件的工作原理比较简单,就是定时执行一个 sql,然后将 sql 执行的结果写入到流中,增量获取的方式没有通过 binlog 方式同步,而是用一个递增字段作为条件去查询,每次都记录当前查询的位置,由于递增的特性,只需要查询比当前大的记录即可获取这段时间内的全部增量,一般的递增字段有两种,AUTO_INCREMENT 的主键 id 和 ON UPDATE CURRENT_TIMESTAMP 的 update_time 字段,id 字段只适用于那种只有插入没有更新的表,update_time 更加通用一些,建议在 mysql 表设计的时候都增加一个 update_time 字段

 

基于Docker方式实现Elasticsearch集群

注意:spring-boot-starter-data-elasticsearch必须为集群方式连接,否则情况下会报一下错误

None of the configured nodes are available

因此要先删掉之前的ES容器,做成集群方式,如果没有重要的容器可以使用

sudo docker rm $(sudo docker ps -a -q) 删除所有未运行的容器

在/usr/local 下进行也可在其他位置执行如下

 

1.mkdir -p es/config

2.cd es

3.mkdir data1

4.mkdir data2

5.mkdir data3

##my需要执行 firewall-cmd --add-port=9300/tcp

## my需要执行 firewall-cmd --add-port=9301/tcp

  1. mkdir  plugins1
  2. mkdir  plugins2

 

     ##  执行firewall-cmd --add-port=9300/tcp 报 FirewallD is not running错误,

     ## 那么执行如下命令开启下防火墙即可---systemctl start firewalld

 

在es/config分别放入es1.yml、es2.yml

 

Es1:

cluster.name: elasticsearch-cluster

node.name: es-node1

network.bind_host: 0.0.0.0

network.publish_host: 宿主机IP

http.port: 9200

transport.tcp.port: 9300

http.cors.enabled: true

http.cors.allow-origin: "*"

node.master: true

node.data: true 

discovery.zen.ping.unicast.hosts: ["宿主机IP:9300","宿主机IP:9301"]

discovery.zen.minimum_master_nodes: 1

 

Es2:

cluster.name: elasticsearch-cluster

node.name: es-node2

network.bind_host: 0.0.0.0

network.publish_host: 宿主机IP

http.port: 9201

transport.tcp.port: 9301

http.cors.enabled: true

http.cors.allow-origin: "*"

node.master: true

node.data: true 

discovery.zen.ping.unicast.hosts: ["宿主机IP:9300","宿主机IP:9301"]

discovery.zen.minimum_master_nodes: 1

 

 

启动容器1

docker run -e ES_JAVA_OPTS="-Xms256m -Xmx256m" -d  -p 9200:9200 -p 9300:9300 -p 5601:5601 -v /usr/local/es/config/es1.yml:/usr/share/elasticsearch/config/elasticsearch.yml  -v /usr/local/es/plugins1:/usr/share/elasticsearch/plugins    -v /usr/local/es/data1:/usr/share/elasticsearch/data --name ES01 elasticsearch:5.6.12

启动容器2

docker run -e ES_JAVA_OPTS="-Xms256m -Xmx256m" -d -p 9201:9201 -p 9301:9301 -v /usr/local/es/config/es2.yml:/usr/share/elasticsearch/config/elasticsearch.yml  -v /usr/local/es/plugins2:/usr/share/elasticsearch/plugins    -v /usr/local/es/data2:/usr/share/elasticsearch/data --name ES02 elasticsearch:5.6.12

 

    docker start 启动两容器

如果docker start ES01容器ID报如下错误

Error response from daemon: Container acfe8ef928f4293c0786e6f8d6cccbd2b5b173f57edfd32c4cdccb81e30a7a99 is not running

     重启docker -- systemctl restart dockert 在启动两容器即可

   如果容器开启后闪退

输入命令查日志: docker logs -f 容器Id ,显示如下

max virtual memory areas vm.max_map_count [65530] is too low, increase to at least [262144]

问题翻译过来就是:elasticsearch用户拥有的内存权限太小,至少需要262144

   /etc/sysctl.conf文件最后添加一行

vm.max_map_count=262144

即可永久修改

    修改后重启虚拟机,再执行启动容器命令即可即可,

测试集群效果

http://192.168.212.185:9200/_cat/nodes?pretty

 

 

SpringBoot整合Elasticsearch

  1. 创建服务项目meite-shop-service-goods服务接口
  2. 创建商品搜索服务接口

public interface ProductSearchService {

@GetMapping("/search")

public BaseResponse<List<ProductDto>> search(String name);

}

3.dto实体类

@Data

public class ProductDto {

 

      /** 主键ID */

      private Integer id;

      /** 类型ID */

      private Integer categoryId;

      /** 名称 */

      private String name;

      /** 小标题 */

      private String subtitle;

      /** 主图像 */

      private String mainImage;

      /** 小标题图像 */

      private String subImages;

      /** 描述 */

      private String detail;

      /** 商品规格 */

      private String attributeList;

      /** 价格 */

      private Double price;

      /** 库存 */

      private Integer stock;

      /** 状态 */

      private Integer status;

      /** 乐观锁 */

      private Integer revision;

      /** 创建人 */

      private String createdBy;

      /** 创建时间 */

      private Date createdTime;

      /** 更新人 */

      private String updatedBy;

      /** 更新时间 */

      private Timestamp updatedTime;

}

4.maven依赖

<!-- springboot 整合ES -->

            <dependency>

                  <groupId>org.springframework.boot</groupId>

                  <artifactId>spring-boot-starter-data-elasticsearch</artifactId>

            </dependency>

            <dependency>

                  <groupId>com.querydsl</groupId>

                  <artifactId>querydsl-apt</artifactId>

            </dependency>

            <dependency>

                  <groupId>com.querydsl</groupId>

                  <artifactId>querydsl-jpa</artifactId>

            </dependency>

 

            <dependency>

                  <groupId>ma.glasnost.orika</groupId>

                  <artifactId>orika-core</artifactId>

                  <version>1.5.2</version>

            </dependency>

 

 

 

@RestController

public class ProductSearchServiceImpl extends BaseApiService<List<ProductDto>> implements ProductSearchService {

      @Autowired

      private ProductReposiory productReposiory;

 

      @Override

      public BaseResponse<List<ProductDto>> search(String name) {

            BoolQueryBuilder builder = QueryBuilders.boolQuery();

            // 模拟查询

            builder.must(QueryBuilders.fuzzyQuery("name", name));

            Pageable pageable = new QPageRequest(0, 5);

            Page<ProductEntity> page = productReposiory.search(builder, pageable);

            List<ProductEntity> content = page.getContent();

            MapperFactory mapperFactory = new DefaultMapperFactory.Builder().build();

            List<ProductDto> mapAsList = mapperFactory.getMapperFacade().mapAsList(content, ProductDto.class);

            return setResultSuccess(mapAsList);

      }

 

}

 

 

public interface ProductReposiory extends ElasticsearchRepository<ProductEntity, Long> {

 

}

@Document(indexName = "goods", type = "goods")

@Data

public class ProductEntity {

      /** 主键ID */

      private Integer id;

      /** 类型ID */

      private Integer categoryId;

      /** 名称 */

      private String name;

      /** 小标题 */

      private String subtitle;

      /** 主图像 */

      private String mainImage;

      /** 小标题图像 */

      private String subImages;

      /** 描述 */

      private String detail;

      /** 商品规格 */

      private String attributeList;

      /** 价格 */

      private Double price;

      /** 库存 */

      private Integer stock;

      /** 状态 */

      private Integer status;

 

      /** 创建人 */

      private String createdBy;

      /** 创建时间 */

      private Date createdTime;

 

      /** 更新时间 */

      private Timestamp updatedTime;

}

 

 

 

###服务启动端口号

server:

  port: 8500

###服务名称(服务注册到eureka名称

eureka:

  client:

    service-url:

           defaultZone: http://localhost:8100/eureka

 

 

 

spring:

  application:

    name:  app-mayikt-goods

  redis:

    host: IP

    port: 6379

    password: 123456

    pool:

      max-idle: 100

      min-idle: 1

      max-active: 1000

      max-wait: -1

###数据库相关连接     

  datasource:

    username: root

    password: root

    driver-class-name: com.mysql.jdbc.Driver

    url: jdbc:mysql://127.0.0.1:3306/goods?useUnicode=true&characterEncoding=UTF-

  data:

    elasticsearch:

    ####集群名称

     cluster-name: elasticsearch-cluster

    ####地址

     cluster-nodes: 宿主机IP:9300

    

 

 

 

 

Elasticsearch集成IK分词器

 

注意:docker下载ES默认版本号码5.6.12,IK分词器对应的也应该是5.6.12,否则情况下会报错版本号码不一致。

 

ES文档类型映射

 

GET /goods/_mapping

DELETE  /goods

PUT /goods

POST /goods/_mapping/goods

{

}

 

  1. MQ与logstash实现ES与数据库同步区别

 

Logstash实现ES与数据库同步:

使用定时器方式 、实现简单

MQ实现ES与数据库同步:

实时性、复杂性更高、一致性强

 

 

如果没有整合IK分词,es无法支持模糊查询