文件在E:(我的网盘\我的笔记)\ 学习文档子目录压缩\框架\微服务相关\分布式\日志收集\ELK
es与mysql如何通过logstash实现数据同步的原理如图所示:logstash向mysql发sql输入收集es需要的数据库信息时,会记录下此sql记录的updat_time的最大值保存下来,下一次再发sql的条件就是有没有记录 >updat_timed的数据,有就将其更到logstash,logstash发json到es
logstash mysql 同步到 elasticsearch
Logstash是一个开源数据收集引擎,具有实时管道功能。Logstash可以动态地将来自不同数据源的数据统一起来,并将数据标准化到你所选择的目的地
1.上传logstash-6.4.3.tar.gz到服务中 2.tar –zxvf logstash-6.4.3.tar.gz 3.cd logstash-6.4.3 4. bin/logstash-plugin install logstash-input-jdbc 5. bin/logstash-plugin install logstash-output-elasticsearch |
上传mysql.conf(包含logstash所需的intpu,output配置信息)到我docker安装的刚才安装的logstash-6.4.3目录下
mysql.conf相关配置文件说明
jdbc_driver_library: jdbc mysql 驱动的路径,在上一步中已经下载 jdbc_driver_class: 驱动类的名字,mysql 填 com.mysql.jdbc.Driver 就好了 jdbc_connection_string: mysql 地址 jdbc_user: mysql 用户 jdbc_password: mysql 密码 schedule: 执行 sql 时机,类似 crontab 的调度 statement: 要执行的 sql,以 “:” 开头是定义的变量,可以通过 parameters 来设置变量,这里的 sql_last_value 是内置的变量,表示上一次 sql 执行中 update_time 的值,这里 update_time 条件是 >= 因为时间有可能相等,没有等号可能会漏掉一些增量 use_column_value: 使用递增列的值 tracking_column_type: 递增字段的类型,numeric 表示数值类型, timestamp 表示时间戳类型 tracking_column: 递增字段的名称,这里使用 update_time 这一列,这列的类型是 timestamp last_run_metadata_path: 同步点文件,这个文件记录了上次的同步点,重启时会读取这个文件,这个文件可以手动修改 |
相关配置参考附件
Crontab https://tool.lu/crontab/ 注意:Crontab表达式以分为单位
上传mysql 驱动jar到如下地址 /usr/local/sql/ mysql-connector-java-5.1.46.jar
这个地址对应dbc_driver_library: jdbc mysql 驱动的路径
然后docker启动 es,和Kibana--需等es启动完毕即http://宿主机IP:9200/可以登陆后再启动Kibana
再cd 到mysql.conf 的父目录即logstash-6.4.3目录下执行如下命令,启动
./bin/logstash -f mysql.conf 启动,完成后
然后登陆Kibana--http://宿主机IP:5601/app/kibana 就可以看到单文件效果实现es和mysql同步效果
如下图,多文件方式同步ES数据
一个 logstash 实例可以借助 pipelines 机制同步多个表,只需要写多个配置文件就可以了,假设我们有三个表 table1 和 table2和table3,对应两个配置文件mysql1.conf和 mysql2.conf,mysql3.conf,那么上传mysql1.conf, mysql2.conf, mysql3.conf到/usr/local/logstash-6.4.3/config/目录下
在 config/pipelines.yml --/usr/local/logstash-6.4.3/config/pipelines.yml 最下面中配置
--path.confg:写绝对路径,pipeline.id: table1 这个table1这个名字可以随便写
- pipeline.id: table1 path.config: " /usr/local/logstash-6.4.3/config/mysql1.conf" - pipeline.id: table2 path.config: "/usr/local/logstash-6.4.3/config/mysql2.conf " - pipeline.id: table3 path.config: "/usr/local/logstash-6.4.3/config/mysql3.conf " |
然后docker启动 es,和Kibana--需等es启动完毕即http://宿主机IP:9200/可以登陆后再启动Kibana
再进入/usr/local/logstash-6.4.3 目录下执行如下命令进行启动
./bin/logstash启动,完成后
然后登陆Kibana--http://宿主机IP:5601/app/kibana 就可以看到单文件效果实现es和mysql同步效果\logstash-input-jdbc原理
使用 logstash-input-jdbc 插件读取 mysql 的数据,这个插件的工作原理比较简单,就是定时执行一个 sql,然后将 sql 执行的结果写入到流中,增量获取的方式没有通过 binlog 方式同步,而是用一个递增字段作为条件去查询,每次都记录当前查询的位置,由于递增的特性,只需要查询比当前大的记录即可获取这段时间内的全部增量,一般的递增字段有两种,AUTO_INCREMENT 的主键 id 和 ON UPDATE CURRENT_TIMESTAMP 的 update_time 字段,id 字段只适用于那种只有插入没有更新的表,update_time 更加通用一些,建议在 mysql 表设计的时候都增加一个 update_time 字段
基于Docker方式实现Elasticsearch集群
注意:spring-boot-starter-data-elasticsearch必须为集群方式连接,否则情况下会报一下错误
None of the configured nodes are available
因此要先删掉之前的ES容器,做成集群方式,如果没有重要的容器可以使用
sudo docker rm $(sudo docker ps -a -q) 删除所有未运行的容器
在/usr/local 下进行也可在其他位置执行如下
1.mkdir -p es/config 2.cd es 3.mkdir data1 4.mkdir data2 5.mkdir data3 ##my需要执行 firewall-cmd --add-port=9300/tcp ## my需要执行 firewall-cmd --add-port=9301/tcp
|
## 执行firewall-cmd --add-port=9300/tcp 报 FirewallD is not running错误,
## 那么执行如下命令开启下防火墙即可---systemctl start firewalld
在es/config分别放入es1.yml、es2.yml
Es1: cluster.name: elasticsearch-cluster node.name: es-node1 network.bind_host: 0.0.0.0 network.publish_host: 宿主机IP http.port: 9200 transport.tcp.port: 9300 http.cors.enabled: true http.cors.allow-origin: "*" node.master: true node.data: true discovery.zen.ping.unicast.hosts: ["宿主机IP:9300","宿主机IP:9301"] discovery.zen.minimum_master_nodes: 1
Es2: cluster.name: elasticsearch-cluster node.name: es-node2 network.bind_host: 0.0.0.0 network.publish_host: 宿主机IP http.port: 9201 transport.tcp.port: 9301 http.cors.enabled: true http.cors.allow-origin: "*" node.master: true node.data: true discovery.zen.ping.unicast.hosts: ["宿主机IP:9300","宿主机IP:9301"] discovery.zen.minimum_master_nodes: 1
|
启动容器1
docker run -e ES_JAVA_OPTS="-Xms256m -Xmx256m" -d -p 9200:9200 -p 9300:9300 -p 5601:5601 -v /usr/local/es/config/es1.yml:/usr/share/elasticsearch/config/elasticsearch.yml -v /usr/local/es/plugins1:/usr/share/elasticsearch/plugins -v /usr/local/es/data1:/usr/share/elasticsearch/data --name ES01 elasticsearch:5.6.12 |
启动容器2
docker run -e ES_JAVA_OPTS="-Xms256m -Xmx256m" -d -p 9201:9201 -p 9301:9301 -v /usr/local/es/config/es2.yml:/usr/share/elasticsearch/config/elasticsearch.yml -v /usr/local/es/plugins2:/usr/share/elasticsearch/plugins -v /usr/local/es/data2:/usr/share/elasticsearch/data --name ES02 elasticsearch:5.6.12 |
docker start 启动两容器
如果docker start ES01容器ID报如下错误
Error response from daemon: Container acfe8ef928f4293c0786e6f8d6cccbd2b5b173f57edfd32c4cdccb81e30a7a99 is not running
重启docker -- systemctl restart dockert 在启动两容器即可
如果容器开启后闪退
输入命令查日志: docker logs -f 容器Id ,显示如下
max virtual memory areas vm.max_map_count [65530] is too low, increase to at least [262144]
问题翻译过来就是:elasticsearch用户拥有的内存权限太小,至少需要262144;
在 /etc/sysctl.conf文件最后添加一行
vm.max_map_count=262144
即可永久修改
修改后重启虚拟机,再执行启动容器命令即可即可,
测试集群效果
http://192.168.212.185:9200/_cat/nodes?pretty
SpringBoot整合Elasticsearch
- 创建服务项目meite-shop-service-goods服务接口
- 创建商品搜索服务接口
public interface ProductSearchService { @GetMapping("/search") public BaseResponse<List<ProductDto>> search(String name); } |
3.dto实体类
@Data public class ProductDto {
/** 主键ID */ private Integer id; /** 类型ID */ private Integer categoryId; /** 名称 */ private String name; /** 小标题 */ private String subtitle; /** 主图像 */ private String mainImage; /** 小标题图像 */ private String subImages; /** 描述 */ private String detail; /** 商品规格 */ private String attributeList; /** 价格 */ private Double price; /** 库存 */ private Integer stock; /** 状态 */ private Integer status; /** 乐观锁 */ private Integer revision; /** 创建人 */ private String createdBy; /** 创建时间 */ private Date createdTime; /** 更新人 */ private String updatedBy; /** 更新时间 */ private Timestamp updatedTime; } |
4.maven依赖
<!-- springboot 整合ES --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-elasticsearch</artifactId> </dependency> <dependency> <groupId>com.querydsl</groupId> <artifactId>querydsl-apt</artifactId> </dependency> <dependency> <groupId>com.querydsl</groupId> <artifactId>querydsl-jpa</artifactId> </dependency>
<dependency> <groupId>ma.glasnost.orika</groupId> <artifactId>orika-core</artifactId> <version>1.5.2</version> </dependency>
|
@RestController public class ProductSearchServiceImpl extends BaseApiService<List<ProductDto>> implements ProductSearchService { @Autowired private ProductReposiory productReposiory;
@Override public BaseResponse<List<ProductDto>> search(String name) { BoolQueryBuilder builder = QueryBuilders.boolQuery(); // 模拟查询 builder.must(QueryBuilders.fuzzyQuery("name", name)); Pageable pageable = new QPageRequest(0, 5); Page<ProductEntity> page = productReposiory.search(builder, pageable); List<ProductEntity> content = page.getContent(); MapperFactory mapperFactory = new DefaultMapperFactory.Builder().build(); List<ProductDto> mapAsList = mapperFactory.getMapperFacade().mapAsList(content, ProductDto.class); return setResultSuccess(mapAsList); }
}
|
public interface ProductReposiory extends ElasticsearchRepository<ProductEntity, Long> {
} @Document(indexName = "goods", type = "goods") @Data public class ProductEntity { /** 主键ID */ private Integer id; /** 类型ID */ private Integer categoryId; /** 名称 */ private String name; /** 小标题 */ private String subtitle; /** 主图像 */ private String mainImage; /** 小标题图像 */ private String subImages; /** 描述 */ private String detail; /** 商品规格 */ private String attributeList; /** 价格 */ private Double price; /** 库存 */ private Integer stock; /** 状态 */ private Integer status;
/** 创建人 */ private String createdBy; /** 创建时间 */ private Date createdTime;
/** 更新时间 */ private Timestamp updatedTime; }
|
###服务启动端口号 server: port: 8500 ###服务名称(服务注册到eureka名称) eureka: client: service-url: defaultZone: http://localhost:8100/eureka
spring: application: name: app-mayikt-goods redis: host: IP port: 6379 password: 123456 pool: max-idle: 100 min-idle: 1 max-active: 1000 max-wait: -1 ###数据库相关连接 datasource: username: root password: root driver-class-name: com.mysql.jdbc.Driver url: jdbc:mysql://127.0.0.1:3306/goods?useUnicode=true&characterEncoding=UTF- data: elasticsearch: ####集群名称 cluster-name: elasticsearch-cluster ####地址 cluster-nodes: 宿主机IP:9300
|
Elasticsearch集成IK分词器
注意:docker下载ES默认版本号码5.6.12,IK分词器对应的也应该是5.6.12,否则情况下会报错版本号码不一致。
ES文档类型映射
GET /goods/_mapping DELETE /goods PUT /goods POST /goods/_mapping/goods { |
} |
- MQ与logstash实现ES与数据库同步区别
Logstash实现ES与数据库同步:
使用定时器方式 、实现简单
MQ实现ES与数据库同步:
实时性、复杂性更高、一致性强
如果没有整合IK分词,es无法支持模糊查询