使用Spring Boot + Elasticsearch + Logstash 实现图书查询检索服务

时间:2021-08-18 00:19:43

使用Spring Boot + Elasticsearch + Logstash 实现图书查询检索服务

前面我们介绍了Spring Boot 整合 Elasticsearch 实现数据查询检索的功能,在实际项目中,我们的数据一般存储在数据库中,而且随着业务的发送,数据也会随时变化。

那么如何保证数据库中的数据与Elasticsearch存储的索引数据保持一致呢? 最原始的方案就是:当数据发生增删改操作时同步更新Elasticsearch。但是这样的设计耦合太高。接下来我们介绍一种非常简单的数据同步方式:Logstash 数据同步。

一、Logstash简介

1.什么是Logstash

logstash是一个开源的服务器端数据处理工具。简单来说,就是一根具备实时数据传输能力的管道,负责将数据信息从管道的输入端传输到管道的输出端;与此同时这根管道还可以让你根据自己的需求在中间加上滤网,Logstash提供里很多功能强大的滤网以满足你的各种应用场景。

Logstash常用于日志系统中做日志采集设备,最常用于ELK中作为日志收集器使用。

2.Logstash的架构原理

Logstash的基本流程架构:input=》 filter =》 output 。

input(输入):采集各种样式,大小和来源数据,从各个服务器中收集数据。常用的有:jdbc、file、syslog、redis等。

filter(过滤器)负责数据处理与转换。主要是将event通过output发出之前对其实现的某些处理功能。

output(输出):将我们过滤出的数据保存到那些数据库和相关存储中,。

使用Spring Boot + Elasticsearch + Logstash 实现图书查询检索服务

3.Logstash如何与Elasticsearch数据同步

实际项目中,我们不可能通过手动添加的方式将数据插入索引库,所以需要借助第三方工具,将数据库的数据同步到索引库。此时,Logstash出现了,它可以将不同数据库的数据同步到Elasticsearch中。保证数据库与Elasticsearch的数据保持一致。

使用Spring Boot + Elasticsearch + Logstash 实现图书查询检索服务

目前支持数据库与ES数据同步的插件有很多,个人认为Logstash是众多同步mysql数据到es的插件中,最稳定并且最容易配置的一个。

二、安装Logstash

Logstash的使用方法也很简单,下面讲解一下,Logstash是如何使用的。需要说明的是:这里以windows 环境为例,演示Logstash的安装和配置。

1.下载Logstash

首先,下载对应版本的Logstash包,可以通过上面提供下载elasticsearch的地址进行下载,完成后解压。

使用Spring Boot + Elasticsearch + Logstash 实现图书查询检索服务

上面是Logstash解压后的目录,我们需要关注是bin目录中的执行文件和config中的配置文件。一般生产情况下,会使用Linux服务器,并且会将Logstash配置成自启动的服务。这里测试的话,直接启动。

2.配置Logstash

接下来,配置Logstash。需要我们编写配置文件,根据官网和网上提供的配置文件,将其进行修改。

第一步:在Logstash根目录下创建mysql文件夹,添加mysql.conf配置文件,配置Logstash需要的相应信息,具体配置如下:

  1. input {
  2. stdin {
  3. }
  4. jdbc {
  5. # mysql数据库连接
  6. jdbc_connection_string => "jdbc:mysql://localhost:3306/book_test?characterEncoding=utf-8&useSSL=false&serverTimezone=UTC"
  7. # mysqly用户名和密码
  8. jdbc_user => "root"
  9. jdbc_password => "root"
  10. # 驱动配置
  11. jdbc_driver_library => "C:\Users\Administrator\Desktop\logstash-7.5.1\mysql\mysql-connector-java-8.0.20.jar"
  12. # 驱动类名
  13. jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
  14. #jdbc_paging_enabled => "true"
  15. #jdbc_page_size => "50000"
  16. jdbc_default_timezone => "Asia/Shanghai"
  17. # 执行指定的sql文件
  18. statement_filepath => "C:\Users\Administrator\Desktop\logstash-7.5.1\mysql\sql\bookquery.sql"
  19. use_column_value => true
  20. # 是否将字段名转换为小写,默认true(如果有数据序列化、反序列化需求,建议改为false);
  21. lowercase_column_names => false
  22. # 需要记录的字段,用于增量同步,需是数据库字段
  23. tracking_column => updatetime
  24. # Value can be any of: numeric,timestampDefault value is "numeric"
  25. tracking_column_type => timestamp
  26. # record_last_run上次数据存放位置;
  27. record_last_run => true
  28. #上一个sql_last_value值的存放文件路径, 必须要在文件中指定字段的初始值
  29. last_run_metadata_path => "C:\Users\Administrator\Desktop\logstash-7.5.1\mysql\sql\logstash_default_last_time.log"
  30. # 是否清除last_run_metadata_path的记录,需要增量同步时此字段必须为false
  31. clean_run => false
  32. # 设置监听 各字段含义 分 时 天 月 年 ,默认全部为*代表含义:每分钟都更新
  33. schedule => "* * * * *"
  34. # 索引类型
  35. type => "id"
  36. }
  37. }
  38. output {
  39. elasticsearch {
  40. #es服务器
  41. hosts => ["10.2.1.231:9200"]
  42. #ES索引名称
  43. index => "book"
  44. #自增ID
  45. document_id => "%{id}"
  46. }
  47. stdout {
  48. codec => json_lines
  49. }
  50. }

第二步:将mysql-connector-java.jar 拷贝到前面配置的目录下。上面的mysql.conf配置的是:C:\Users\Administrator\Desktop\logstash-7.5.1\mysql\

mysql-connector-java-8.0.20.jar。那么jar包拷贝到此目录下即可:

使用Spring Boot + Elasticsearch + Logstash 实现图书查询检索服务

上面是mysql的驱动,如果是sqlserver数据库,下载SqlServer对应的驱动即可。放置的位置要与mysql.conf 配置文件中的jdbc_driver_library 地址保持一致。

第三步:创建sql目录,创建bookquery.sql文件用于保存需要执行的sql 脚本。示例代码如下:

  1. select * from book where updatetime >= :sql_last_value
  2. order by updatetime desc

这里使用的增量更新,所以使用:sql_last_value 记录上一次记录的最后时间。

3.启动Logstash

进入logstash的bin目录,执行如下命令:

  1. logstash.bat -f C:\Users\Administrator\Desktop\logstash-7.5.1\mysql\mysql.conf

启动成功之后,Logstash就会自动定时将数据写入到Elasticsearch。如下图所示:

使用Spring Boot + Elasticsearch + Logstash 实现图书查询检索服务

同步完成后,我们使用Postman查询Elasticsearch,验证索引是否都创建成功。在postman中,发送 Get 请求:

http://10.2.1.231:9200/book/_search 。返回结果如下图所示:

使用Spring Boot + Elasticsearch + Logstash 实现图书查询检索服务

可以看到,数据库中的数据已经通过Logstash同步至Elasticsearch。说明Logstash配置成功。

三、创建查询服务

数据同步完成后,接下来我们使用Spring Boot 构建Elasticsearch查询服务。首先创建Spring Boot项目并整合Elasticsearch,这个之前都已经介绍过,不清楚的朋友可以看我之前的文章。

接下来演示如何封装完整的数据查询服务。

1.数据实体

  1. @Document( indexName = "book" , replicas = 0)
  2. public class Book {
  3. @Id
  4. private Long id;
  5. @Field(analyzer = "ik_max_word",type = FieldType.Text)
  6. private String bookName;
  7. @Field(analyzer = "ik_max_word",type = FieldType.Text)
  8. private String author;
  9. private float price;
  10. private int page;
  11. @Field(type = FieldType.Date,format = DateFormat.custom,pattern = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")
  12. private Date createTime;
  13. @Field(type = FieldType.Date,format = DateFormat.custom,pattern = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")
  14. private Date updateTime;
  15. @Field(analyzer = "ik_max_word",type = FieldType.Text)
  16. private String category;
  17. public Long getId() {
  18. return id;
  19. }
  20. public void setId(Long id) {
  21. this.id = id;
  22. }
  23. public String getBookName() {
  24. return bookName;
  25. }
  26. public void setBookName(String bookName) {
  27. this.bookName = bookName;
  28. }
  29. public String getAuthor() {
  30. return author;
  31. }
  32. public void setAuthor(String author) {
  33. this.author = author;
  34. }
  35. public float getPrice() {
  36. return price;
  37. }
  38. public void setPrice(float price) {
  39. this.price = price;
  40. }
  41. public int getPage() {
  42. return page;
  43. }
  44. public void setPage(int page) {
  45. this.page = page;
  46. }
  47. public String getCategory() {
  48. return category;
  49. }
  50. public void setCategory(String category) {
  51. this.category = category;
  52. }
  53. public Book(){
  54. }
  55. public Date getCreateTime() {
  56. return createTime;
  57. }
  58. public void setCreateTime(Date createTime) {
  59. this.createTime = createTime;
  60. }
  61. public Date getUpdateTime() {
  62. return updateTime;
  63. }
  64. public void setUpdateTime(Date updateTime) {
  65. this.updateTime = updateTime;
  66. }
  67. }

2.请求封装类

  1. public class BookQuery {
  2. public String category;
  3. public String bookName;
  4. public String author;
  5. public int priceMin;
  6. public int priceMax;
  7. public int pageMin;
  8. public int pageMax;
  9. public String sort;
  10. public String sortType;
  11. public int page;
  12. public int limit;
  13. }

3.创建Controller控制器

  1. @RestController
  2. public class ElasticSearchController {
  3. @Autowired
  4. private ElasticsearchRestTemplate elasticsearchRestTemplate;
  5. /**
  6. * 查询信息
  7. * @param
  8. * @return
  9. */
  10. @PostMapping(value = "/book/query")
  11. public JSONResult query(@RequestBody BookQuery bookQuery){
  12. Query query= getQueryBuilder(bookQuery);
  13. SearchHits searchHits = elasticsearchRestTemplate.search(query, Book.class);
  14. List> result = searchHits.getSearchHits();
  15. return JSONResult.ok(result);
  16. }
  17. public Query getQueryBuilder(BookQuery query) {
  18. BoolQueryBuilder builder = boolQuery();
  19. // 匹配器 模糊查询部分,分析器使用ik (ik_max_word)
  20. List must = builder.must();
  21. if (query.getBookName()!=null && !query.getBookName().isEmpty())
  22. must.add(wildcardQuery("bookName", "*" +query.getBookName()+ "*"));
  23. if (query.getCategory()!=null && !query.getCategory().isEmpty())
  24. must.add(wildcardQuery("category", "*" +query.getCategory()+ "*"));
  25. if (query.getAuthor()!=null && !query.getAuthor().isEmpty())
  26. must.add(wildcardQuery("author", "*" +query.getAuthor()+ "*"));
  27. // 筛选器 精确查询部分
  28. List filter = builder.filter();
  29. // 范围查询
  30. if (query.getPriceMin()>0 && query.getPriceMax()>0) {
  31. RangeQueryBuilder price = rangeQuery("price").gte(query.getPriceMin()).lte(query.getPriceMax());
  32. filter.add(price);
  33. }
  34. // 范围查询
  35. if (query.getPageMin()>0 && query.getPageMax()>0) {
  36. RangeQueryBuilder page = rangeQuery("page").gte(query.getPageMin()).lte(query.getPageMax());
  37. filter.add(page);
  38. }
  39. // 分页
  40. PageRequest pageable = PageRequest.of(query.getPage() - 1, query.getLimit());
  41. // 排序
  42. SortBuilder sort = SortBuilders.fieldSort("price").order(SortOrder.DESC);
  43. //设置高亮效果
  44. String preTag = "";//google的色值
  45. String postTag = "";
  46. HighlightBuilder.Field highlightFields = new HighlightBuilder.Field("category").preTags(preTag).postTags(postTag);
  47. Query searchQuery = new NativeSearchQueryBuilder()
  48. .withQuery(builder)
  49. .withHighlightFields(highlightFields)
  50. .withPageable(pageable)
  51. .withSort(sort)
  52. .build();
  53. return searchQuery;
  54. }
  55. }

4.测试验证

启动项目,在Postman中,请求

http://localhost:8080/book/query 接口查询书籍信息数据。查看接口返回情况。

使用Spring Boot + Elasticsearch + Logstash 实现图书查询检索服务

我们看到接口成功返回数据。说明数据查询服务创建成功。

最后

以上,我们就把使用Spring Boot + Elasticsearch + Logstash 实现完整的数据查询检索服务介绍完了。

原文链接:https://mp.weixin.qq.com/s/Y5Wq0Q8CAHgc_6aYHr7CTA