---
toc: true
title: 一周一个中间件-ES搜索引擎
date: 2019-09-19 18:43:36
tags:
- 中间件
- 搜索引擎
---
## 前言
> 在众多搜索引擎中,solr,es是我所知道其他公司最为广泛使用的中间件。他可以解决你复杂的搜索需求。当你需要在大量数据的情况下搜索一下关键字,使用mysql的like查询是非常缓慢的,而es可以做到近实时的搜索。
## 背景
> 我们公司最近对我们的fungo的游戏,用户,文章提出了更加复杂的搜索要求,要求对指定的关键字进行相似度匹配。
> 例如 搜索 ‘fungo小助手’ 搜索的结果应该含有fungo小助手,还应有只含有fungo关键字的东西,还应有只含有小助手关键字的东西。并且根据所含元素的多少,进行优先排名。
> 面对这些需求,以我对mysql和现在java框架的了解,无法解决这个问题。使用mysql的like功能只能检索到‘fungo小助手’的关键字。所以百度一下主流的解决方案。发现大部分公司都会使用solr和es这些搜索框架,来作为中间件解决复杂搜索功能。
## 知识准备
- node 节点
> 就是一個es实例。
- cluster 集群
- 集群健康
green yellow red
- 主节点
主资格节点的主要职责是和集群操作相关的内容,如创建或删除索引,跟踪哪些节点是群集的一部分,并决定哪些分片分配给相关的节点。稳定的主节点对集群的健康是非常重要的,默认情况下任何一个集群中的节点都有可能被选为主节点,索引数据和搜索查询等操作会占用大量的cpu,内存,io资源,为了确保一个集群的稳定,分离主节点和数据节点是一个比较好的选择。
- 数据节点
数据节点主要是存储索引数据的节点,主要对文档进行增删改查操作,聚合操作等。数据节点对cpu,内存,io要求较高, 在优化的时候需要监控数据节点的状态,当资源不够的时候,需要在集群中添加新的节点。
> 具有相同的cluster.name的node节点集合。
- index 索引
> 一个用来指向一个或者多个分片的逻辑命名空间。
- shard 分片
> 最小级别的工作单元,他只是保存了索引中的所有数据的一部分,最重要的分片就是一个Lucene实例。他本身就是一个完整的搜索引擎。我们的文档存储在分片中,并且在分片中被索引,我们的应用程序不会直接与他通信,而是直接与索引通信。
> 分片是集群中分发数据的关键,文档数据存储在分片中,分片分配到集群中的节点上,当你的集群扩容和缩容时,es会主动在你的节点迁移分片,以使集群保持平衡。
> 分片分为主分片和复制分片。你的索引的文档属于一个单独的主分片,主分片的数量决定你的索引最多存储多少数据。复制分片只是主分片的一个副本。做一个高可用,防止主分片出现故障,造成数据丢失,且对外提供读请求。索引建成后主分片数据就固定了,但是复制分片可以随时调整。
- 倒排索引
> Lucene的倒排索引实现比关系型数据更快的过滤。特别他对多条件的过滤支持非常好。一个字段由一个自己的倒排索引。18,20.这些叫做term,而[1,3]就是posting list.
> Posting list就是一个int的数组。存储所有符合某个term的文档id.
> term dictionary 和 term index
> 假如我们由很多term,就是由很多18.20....。如果我们查询某个term一定很慢。因为term没有排序。需要全部过滤一遍才能查到。这样我们可以使用二分查找方式。这个就是term dictionary.可以用logN次磁盘查到目标。但是磁盘读取仍然是非常昂贵。所以引进term index.他就像一本字典的大的章节表。比如:A开头的term ……… Xxx页;C开头的term ……… Xxx页;E开头的term ………Xxx页。实际上term index是一颗trie树.这个树不会包含所有的term,它包含是term的一些前缀,通过term index可以快速定位到term dictionary的offset,然后从这个位置往后顺序查找。再加上一些压缩技术(Lucene Finite State Transducers).term index 尺寸是term的尺寸的几十分之一,使得内存缓存整个term index变得可能。term index在内存中以FST的形式保存的,特点非常节省内存。term dictionary因为在磁盘上是以分block的方式保存的,一个block内部利用公共前缀压缩,比如都是Ab开头的单词就可以把Ab省去。这样term dictionary可以比b-tree更节约磁盘空间。
> 例子: 查询过滤条件 age=18 的过程就是先从term index找到18在term dictionary的大概位置,然后再从term dictionary里精确地找到18这个term,然后得到一个posting list或者一个指向posting list位置的指针。
>联合索引查询
> skip list数据结构。同时遍历各个属性的posting list.互相skip.使用bitset数据结构。
如图所示
![Image text](http://output-mingbo.oss-cn-beijing.aliyuncs.com/imgs/elas_0202.png)
- es
> es面向文档,意味着他可以存储整个对象和文档(document).并且索引每个文档内容使其可以被搜索到。你可以对文档进行索引,搜索,排序,过滤。这就是es可以执行复杂的全文搜索的原因。
## 参考文献
Elasticsearch权威指南(中文版).pdf [提取码:c4th](https://pan.baidu.com/s/1bq5fAqju8ZElULt4jBHSKw)
阮一峰的网络日志 [全文搜索引擎 Elasticsearch 入门教程](http://www.ruanyifeng.com/blog/2017/08/elasticsearch.html)
Elasticsearch linux安装包 elasticsearch-7.2.0-linux-x86_64.tar.gz [提取码:ya9s](https://pan.baidu.com/s/1HPEMzaBcmdaiYxnYtY2FPw)
Elasticsearch 中文分词器 elasticsearch-analysis-ik-master [提取码:o2ih](https://pan.baidu.com/s/1kx79pepNSvT9EP3AQZvMtw)
Elasticsearch 拼音分词器 elasticsearch-analysis-pinyin-master [提取码:qsd1](https://pan.baidu.com/s/1_V7ZiaxQBwRlMUgtTotXUw)
Logstash中文文档[Logstash简介](https://www.kancloud.cn/aiyinsi-tan/logstash/849518)
Logstash linux安装包 logstash-7.2.0.tar.gz [提取码:se1s](https://pan.baidu.com/s/1eblvZ9n_t4RnwGQ2AedziQ)
阿里云Elasticsearch产品文档介绍 [阿里云Elasticsearch](https://help.aliyun.com/product/57736.html?spm=a2c4g.11186623.6.540.2ad8731ct6WbSY)
Es为什么比MYSQL快 [博客链接](https://blog.****.net/u011635492/article/details/81023158)
<!-- more -->
如果无法下载请联系我 zhangdelei000@gmail.com ,或者评论下留下邮箱百度云账号,我会私发给您。
## 安装es
通过参考文献下载对应的es插件.通过xshell软件和Xftp插件传递到指定得linux服务器。
tar解压文件.进入bin文件内,因为es无法使用root权限去启动,所以创建一个用户和用户组。去启动es。
如要修改配置需要进去config文件内,修改主配置elasticsearch.yml
```json
# ======================== Elasticsearch Configuration =========================
# ---------------------------------- Cluster -----------------------------------
#
# Use a descriptive name for your cluster:
#
cluster.name: my-application
#
# ------------------------------------ Node ------------------------------------
#
# Use a descriptive name for the node:
#
node.name: node-1
#
# Add custom attributes to the node:
#
#node.attr.rack: r1
#
# ----------------------------------- Paths ------------------------------------
#
# Path to directory where to store the data (separate multiple locations by comma):
#
#path.data: /path/to/data
#
# Path to log files:
#
#path.logs: /path/to/logs
#
# ----------------------------------- Memory -----------------------------------
#
# Lock the memory on startup:
#
#bootstrap.memory_lock: true
#
# Elasticsearch performs poorly when the system is swapping the memory.
#
# ---------------------------------- Network -----------------------------------
#
# Set the bind address to a specific IP (IPv4 or IPv6):
#
#network.host: 192.168.0.1
network.host: 0.0.0.0
# Set a custom port for HTTP:
#
http.port: 9200
#
# For more information, consult the network module documentation.
#
# --------------------------------- Discovery ----------------------------------
#
# Pass an initial list of hosts to perform discovery when this node is started:
# The default list of hosts is ["127.0.0.1", "[::1]"]
#
#discovery.seed_hosts: ["host1", "host2"]
#
# Bootstrap the cluster using an initial set of master-eligible nodes:
#
#cluster.initial_master_nodes: ["node-1", "node-2"]
cluster.initial_master_nodes: ["node-1"]
http.cors.enabled: true
http.cors.allow-origin: "*"
# For more information, consult the discovery and cluster formation module documentation.
#
# ---------------------------------- Gateway -----------------------------------
#
# Block initial recovery after a full cluster restart until N nodes are started:
#
#gateway.recover_after_nodes: 3
#
# For more information, consult the gateway module documentation.
#
# ---------------------------------- Various -----------------------------------
#
# Require explicit names when deleting indices:
#
#action.destructive_requires_name: true
```
上面是默认的配置,不修改配置的默认使用9200端口。
切换到非root用户,使用vim console.out 建立文件。
使用nohup ./elasticsearch > console.out & 启动es,并将输出指向console.out文件。
如图
![Image text](http://output-mingbo.oss-cn-beijing.aliyuncs.com/imgs/15690363408763.png)
## es的可视化
我们一般使用elasticsearch-head这个插件来连接es服务器来可视化es的索引和数据。
git地址 https://github.com/mobz/elasticsearch-head
在linux中 git clone https://github.com/mobz/elasticsearch-head
```
cd elasticsearch-head
npm install
npm run start
```
open http://ip:9100/
如图
![Image text](http://output-mingbo.oss-cn-beijing.aliyuncs.com/imgs/156904197022.png)
## es客户端操作
启动一个spring boot工程。
pom.xml添加一下es依赖
```xml
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.2.0</version>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>7.2.0</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-client</artifactId>
<version>7.2.0</version>
</dependency>
```
在配置文件中添加es信息
```
##es节点信息
es.cluster-nodes.ip= xx.xx.xx.xx
##es节点端口
es.cluster-nodes.port= 9200
##es索引名称
es.cluster-nodes.index= uat-cloudcmmpost
##es索引类型
es.cluster-node.type= CmmPost
```
这里省略了配置文件映射到java文件的过程,大家自行处理。
具体es使用方法
```java
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.mapper.EntityWrapper;
import com.baomidou.mybatisplus.mapper.Wrapper;
import com.baomidou.mybatisplus.plugins.Page;
import com.fungo.community.config.NacosFungoCircleConfig;
import com.fungo.community.controller.PostController;
import com.fungo.community.dao.service.CmmPostDaoService;
import com.fungo.community.entity.CmmPost;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHost;
import org.apache.http.entity.ContentType;
import org.apache.http.nio.entity.NStringEntity;
import org.apache.http.util.EntityUtils;
import org.apache.lucene.search.TotalHits;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.MatchQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.FieldSortBuilder;
import org.elasticsearch.search.sort.ScoreSortBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Repository;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
* <p>ES搜索引擎</p>
* @Author: dl.zhang
* @Date: 2019/7/24
*/
@Repository
public class ESDAOServiceImpl {
private static final Logger LOGGER = LoggerFactory.getLogger(ESDAOServiceImpl.class);
private RestHighLevelClient client;
@Autowired
private NacosFungoCircleConfig nacosFungoCircleConfig;
@Autowired
private CmmPostDaoService postService;
@PostConstruct
public void init() {
client = new RestHighLevelClient(
RestClient.builder(
new HttpHost(nacosFungoCircleConfig.getEsHttpIp(), nacosFungoCircleConfig.getEsHttpPort(), "http")
// new HttpHost("localhost", 9201, "http")
));
}
@PreDestroy
public void destroy(){
try {
client.close();
} catch (IOException e) {
}
}
public List<CmmPost> addESPosts() {
// Wrapper<CmmPost> wrapperCmmPost = new EntityWrapper<>();
// List<CmmPost> posts = postService.selectList(wrapperCmmPost);
CmmPost param = new CmmPost();
param.setId("b1f1f35d4b4242a0b794e17ed0d1d64a");
CmmPost cmmPost = postService.selectById(param);
try {
// 创建索引
IndexRequest request = new IndexRequest(nacosFungoCircleConfig.getIndex());
// 准备文档数据
String jsonStr = JSON.toJSONString(cmmPost);
// 转成 MAP
Map<String, Object> jsonMap = JSON.parseObject(jsonStr, Map.class);
// jsonMap.put("createdAt", new Date());
//Document source provided as a Map which gets automatically converted to JSON format
request.source(jsonMap);
client.indexAsync(request, RequestOptions.DEFAULT, new ActionListener<IndexResponse>() {
@Override
public void onResponse(IndexResponse indexResponse) {
}
@Override
public void onFailure(Exception e) {
}
});
}catch (Exception e){
LOGGER.error("获取es数据异常,索引id="+nacosFungoCircleConfig.getIndex(),e);
}
return null;
}
public Page<CmmPost> getAllPosts(String keyword, int page, int limit ) {
Page<CmmPost> postPage = new Page<>();
try {
// 1、创建search请求
SearchRequest searchRequest = new SearchRequest(nacosFungoCircleConfig.getIndex());
searchRequest.types(nacosFungoCircleConfig.getSearchIndexType());
// 2、用SearchSourceBuilder来构造查询请求体 ,请仔细查看它的方法,构造各种查询的方法都在这。
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
if(keyword != null && !"".equals(keyword)){
// BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
//普通模糊匹配
// boolQueryBuilder.must(QueryBuilders.wildcardQuery("title",keyword));
// sourceBuilder.query(boolQueryBuilder);
MatchQueryBuilder matchQueryBuilder1 = QueryBuilders.matchQuery("state",1);
MatchQueryBuilder matchQueryBuilder2 = QueryBuilders.matchQuery("title",keyword);
MatchQueryBuilder matchQueryBuilder3 = QueryBuilders.matchQuery("content",keyword);
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
BoolQueryBuilder childBoolQueryBuilder = new BoolQueryBuilder()
.should(matchQueryBuilder2)
.should(matchQueryBuilder3);
boolQueryBuilder.must(childBoolQueryBuilder);
boolQueryBuilder.must(matchQueryBuilder1);
sourceBuilder.query(boolQueryBuilder);
}
// sourceBuilder.query( QueryBuilders.termQuery("title", keyword));
// 结果开始处
sourceBuilder.from((page-1)*limit);// sourceBuilder.from(0);
// 查询结果终止处
sourceBuilder.size(page*limit);// sourceBuilder.size(10);
sourceBuilder.timeout(new TimeValue(60, TimeUnit.SECONDS));
//指定排序
sourceBuilder.sort(new ScoreSortBuilder().order( SortOrder.DESC));
sourceBuilder.sort(new FieldSortBuilder("watch_num").order(SortOrder.DESC));
//将请求体加入到请求中
searchRequest.source(sourceBuilder);
//3、发送请求
SearchResponse searchResponse = client.search(searchRequest,RequestOptions.DEFAULT);
//4、处理响应
//搜索结果状态信息
RestStatus status = searchResponse.status();
TimeValue took = searchResponse.getTook();
Boolean terminatedEarly = searchResponse.isTerminatedEarly();
boolean timedOut = searchResponse.isTimedOut();
//分片搜索情况
int totalShards = searchResponse.getTotalShards();
int successfulShards = searchResponse.getSuccessfulShards();
int failedShards = searchResponse.getFailedShards();
for (ShardSearchFailure failure : searchResponse.getShardFailures()) {
// failures should be handled here
}
//处理搜索命中文档结果
SearchHits hits = searchResponse.getHits();
TotalHits totalHits = hits.getTotalHits();
float maxScore = hits.getMaxScore();
SearchHit[] searchHits = hits.getHits();
List<CmmPost> list = new ArrayList<>();
for (SearchHit hit : searchHits) {
// do something with the SearchHit
String index = hit.getIndex();
String type = hit.getType();
String id = hit.getId();
float score = hit.getScore();
//取_source字段值
String sourceAsString = hit.getSourceAsString(); //取成json串
JSONObject jsonObj = (JSONObject) JSON.parse(sourceAsString);
CmmPost cmmPost= JSONObject.toJavaObject(jsonObj,CmmPost.class);
// CmmPost cmmPost = (CmmPost) JSON.parse( sourceAsString );
list.add(cmmPost);
// Map<String, Object> sourceAsMap = hit.getSourceAsMap(); // 取成map对象
//从map中取字段值
/*
String documentTitle = (String) sourceAsMap.get("title");
List<Object> users = (List<Object>) sourceAsMap.get("user");
Map<String, Object> innerObject = (Map<String, Object>) sourceAsMap.get("innerObject");
*/
// LOGGER.info("index:" + index + " type:" + type + " id:" + id);
// LOGGER.info(sourceAsString);
//取高亮结果
/*Map<String, HighlightField> highlightFields = hit.getHighlightFields();
HighlightField highlight = highlightFields.get("title");
Text[] fragments = highlight.fragments();
String fragmentString = fragments[0].string();*/
}
postPage.setRecords(list);
postPage.setTotal(Long.valueOf(totalHits.value).intValue());
// SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
// SearchRequest rq = new SearchRequest();
// //索引
// rq.indices(index);
// //各种组合条件
// rq.source(sourceBuilder);
//
// //请求
// System.out.println(rq.source().toString());
// SearchResponse rp = client.search(rq);
}catch (Exception e){
LOGGER.error("获取es数据异常,索引id="+nacosFungoCircleConfig.getIndex(),e);
}
finally {
try {
client.close();
} catch (IOException e) {
e.printStackTrace();
}
}
return postPage;
}
}
```
这就可以使用es搜索引擎来解决复杂的搜索需求。
因为阿里云上的es服务只有5.5,和6.3和6.7的版本,所有这里又给你整理出一个阿里云版本的RestHighLevelClient代码。
es框架使用
```java
import org.elasticsearch.client.HttpAsyncResponseConsumerFactory;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.elasticsearch.client.*;
import org.springframework.beans.factory.annotation.Autowired;
/**
* <p></p>
*
* @Author: dl.zhang
* @Date: 2019/10/17
*/
public class AliESRestClient {
private static final RequestOptions COMMON_OPTIONS;
private static RestHighLevelClient highClient;
static {
RequestOptions.Builder builder = RequestOptions.DEFAULT.toBuilder();
// 默认缓冲限制为100MB,此处修改为30MB。
builder.setHttpAsyncResponseConsumerFactory(
new HttpAsyncResponseConsumerFactory
.HeapBufferedResponseConsumerFactory(30 * 1024 * 1024));
COMMON_OPTIONS = builder.build();
}
public static void initClinet(){
NacosFungoCircleConfig nacosFungoCircleConfig = new NacosFungoCircleConfig();
// 阿里云ES集群需要basic auth验证。
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
//访问用户名和密码为您创建阿里云Elasticsearch实例时设置的用户名和密码,也是Kibana控制台的登录用户名和密码。
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(nacosFungoCircleConfig.getEsUser(), nacosFungoCircleConfig.getEsPassword()));
// 通过builder创建rest client,配置http client的HttpClientConfigCallback。
// 单击所创建的Elasticsearch实例ID,在基本信息页面获取公网地址,即为ES集群地址。
RestClientBuilder builder = RestClient.builder(new HttpHost(nacosFungoCircleConfig.getEsHttpIp(), 9200))
.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
@Override
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
}
});
// RestHighLevelClient实例通过REST low-level client builder进行构造。
highClient = new RestHighLevelClient(builder);
// return highClient;
}
public static RestHighLevelClient getAliEsHighClient(){
if(highClient != null){
return highClient;
}else {
initClinet();
return highClient;
}
}
public static RequestOptions getCommonOptions(){
return COMMON_OPTIONS;
}
}
```
使用范例
```java
public Page<Game> searchGame(){
try {
RestHighLevelClient highClient = AliESRestClient.getAliEsHighClient();
RequestOptions COMMON_OPTIONS = AliESRestClient.getCommonOptions();
// 创建request。
Map<String, Object> jsonMap = new HashMap<>();
// field_01、field_02为字段名,value_01、value_02为对应的值。
jsonMap.put("{field_01}", "{value_01}");
jsonMap.put("{field_02}", "{value_02}");
//index_name为索引名称;type_name为类型名称;doc_id为文档的id。
IndexRequest indexRequest = new IndexRequest("{index_name}", "{type_name}", "{doc_id}").source(jsonMap);
// 同步执行,并使用自定义RequestOptions(COMMON_OPTIONS)。
IndexResponse indexResponse = highClient.index(indexRequest, COMMON_OPTIONS);
long version = indexResponse.getVersion();
System.out.println("Index document successfully! " + version);
//index_name为索引名称;type_name为类型名称;doc_id为文档的id。与以上创建索引的名称和id相同。
DeleteRequest request = new DeleteRequest("{index_name}", "{type_name}", "{doc_id}");
DeleteResponse deleteResponse = highClient.delete(request, COMMON_OPTIONS);
System.out.println("Delete document successfully! \n" + deleteResponse.toString() + "\n" + deleteResponse.status());
highClient.close();
}catch (IOException e){
LOGGER.error( "haode",e );
}
return null;
}
```
## 分布式集群
> 当杀死主节点,必须再次选举出主节点,来让集群的功能可用。当我们删除主节点,主机点上的主分片也就丢失了,所在的索引也就不能的正常工作。这时候就需要将其他的节点的副本提升为主分片。这样就可以使用es集群了。
所有的请求都会到主节点。这个节点我们称为请求节点。然后分发到各个子节点,返回到请求节点,再返回给客户端。
es集群查询请求
1. 客户端给 Node 1 发送get请求。
2. 节点使用文档的 _id 确定文档属于分片 0 。分片 0 对应的复制分片在三个节点上都有。此时,它转发请求到 Node 2 。
3. Node 2 返回endangered给 Node 1 然后返回给客户端
对于读请求,为了平衡负载,请求节点会为每个请求选择不同的分片——它会循环所有分片副本。
### 搭建
> 因为只有二台机器,所以只有在这二台机器上演示es三节点集群。
> 214 9200 9300
> 168 9200 9300
> 168 9201 9301
第一修改主节点配置elasticsearch.yml
```
# ======================== Elasticsearch Configuration =========================
#
# NOTE: Elasticsearch comes with reasonable defaults for most settings.
# Before you set out to tweak and tune the configuration, make sure you
# understand what are you trying to accomplish and the consequences.
#
# The primary way of configuring a node is via this file. This template lists
# the most important settings you may want to configure for a production cluster.
#
# Please consult the documentation for further information on configuration options:
# https://www.elastic.co/guide/en/elasticsearch/reference/index.html
#
# ---------------------------------- Cluster -----------------------------------
#
# Use a descriptive name for your cluster:
#
cluster.name: my-application
#
# ------------------------------------ Node ------------------------------------
#
# Use a descriptive name for the node:
#
node.name: node-1
node.master: true
# Add custom attributes to the node:
#
#node.attr.rack: r1
#
# ----------------------------------- Paths ------------------------------------
#
# Path to directory where to store the data (separate multiple locations by comma):
#
#path.data: /path/to/data
#
# Path to log files:
#
#path.logs: /path/to/logs
#
# ----------------------------------- Memory -----------------------------------
#
# Lock the memory on startup:
#
#bootstrap.memory_lock: true
#
# Make sure that the heap size is set to about half the memory available
# on the system and that the owner of the process is allowed to use this
# limit.
#
# Elasticsearch performs poorly when the system is swapping the memory.
#
# ---------------------------------- Network -----------------------------------
#
# Set the bind address to a specific IP (IPv4 or IPv6):
#
#network.host: 192.168.0.1
network.host: 0.0.0.0
# Set a custom port for HTTP:
#
http.port: 9200
http.cors.enabled: true
http.cors.allow-origin: "*"
#transport.tcp.port: 9300
# For more information, consult the network module documentation.
#
# --------------------------------- Discovery ----------------------------------
#
# Pass an initial list of hosts to perform discovery when this node is started:
# The default list of hosts is ["127.0.0.1", "[::1]"]
#
discovery.seed_hosts: ["127.0.0.1", "::1"]
#
# Bootstrap the cluster using an initial set of master-eligible nodes:
#
#cluster.initial_master_nodes: ["node-1", "node-2"]
cluster.initial_master_nodes: ["node-1"]
# For more information, consult the discovery and cluster formation module documentation.
#
# ---------------------------------- Gateway -----------------------------------
#
# Block initial recovery after a full cluster restart until N nodes are started:
#
#gateway.recover_after_nodes: 3
#
# For more information, consult the gateway module documentation.
#
# ---------------------------------- Various -----------------------------------
#
# Require explicit names when deleting indices:
#
#action.destructive_requires_name: true
```
修改从节点 168机器9200 elasticsearch.yml
```
# ======================== Elasticsearch Configuration =========================
#
# NOTE: Elasticsearch comes with reasonable defaults for most settings.
# Before you set out to tweak and tune the configuration, make sure you
# understand what are you trying to accomplish and the consequences.
#
# The primary way of configuring a node is via this file. This template lists
# the most important settings you may want to configure for a production cluster.
#
# Please consult the documentation for further information on configuration options:
# https://www.elastic.co/guide/en/elasticsearch/reference/index.html
#
# ---------------------------------- Cluster -----------------------------------
#
# Use a descriptive name for your cluster:
#
cluster.name: my-application
#
# ------------------------------------ Node ------------------------------------
#
# Use a descriptive name for the node:
#
node.name: node-2
node.master: false
# Add custom attributes to the node:
#
#node.attr.rack: r1
#
# ----------------------------------- Paths ------------------------------------
#
# Path to directory where to store the data (separate multiple locations by comma):
#
#path.data: /path/to/data
#
# Path to log files:
#
#path.logs: /path/to/logs
#
# ----------------------------------- Memory -----------------------------------
#
# Lock the memory on startup:
#
#bootstrap.memory_lock: true
#
# Make sure that the heap size is set to about half the memory available
# on the system and that the owner of the process is allowed to use this
# limit.
#
# Elasticsearch performs poorly when the system is swapping the memory.
#
# ---------------------------------- Network -----------------------------------
#
# Set the bind address to a specific IP (IPv4 or IPv6):
#
#network.host: 192.168.0.1
network.host: 0.0.0.0
# Set a custom port for HTTP:
#
http.port: 9200
http.cors.enabled: true
http.cors.allow-origin: "*"
# For more information, consult the network module documentation.
#
# --------------------------------- Discovery ----------------------------------
#
# Pass an initial list of hosts to perform discovery when this node is started:
# The default list of hosts is ["127.0.0.1", "[::1]"]
#
#discovery.seed_hosts: ["host1", "host2"]
discovery.seed_hosts: ["39.105.18.214:9300"]
# Bootstrap the cluster using an initial set of master-eligible nodes:
#
#cluster.initial_master_nodes: ["node-1", "node-2"]
#
# For more information, consult the discovery and cluster formation module documentation.
#
# ---------------------------------- Gateway -----------------------------------
#
# Block initial recovery after a full cluster restart until N nodes are started:
#
#gateway.recover_after_nodes: 3
#
# For more information, consult the gateway module documentation.
#
# ---------------------------------- Various -----------------------------------
#
# Require explicit names when deleting indices:
#
#action.destructive_requires_name: true
```
修改从节点 168机器9201 elasticsearch.yml
```
# ======================== Elasticsearch Configuration =========================
#
# NOTE: Elasticsearch comes with reasonable defaults for most settings.
# Before you set out to tweak and tune the configuration, make sure you
# understand what are you trying to accomplish and the consequences.
#
# The primary way of configuring a node is via this file. This template lists
# the most important settings you may want to configure for a production cluster.
#
# Please consult the documentation for further information on configuration options:
# https://www.elastic.co/guide/en/elasticsearch/reference/index.html
#
# ---------------------------------- Cluster -----------------------------------
#
# Use a descriptive name for your cluster:
#
cluster.name: my-application
#
# ------------------------------------ Node ------------------------------------
#
# Use a descriptive name for the node:
#
node.name: node-3
node.master: false
# Add custom attributes to the node:
#
#node.attr.rack: r1
#
# ----------------------------------- Paths ------------------------------------
#
# Path to directory where to store the data (separate multiple locations by comma):
#
#path.data: /path/to/data
#
# Path to log files:
#
#path.logs: /path/to/logs
#
# ----------------------------------- Memory -----------------------------------
#
# Lock the memory on startup:
#
#bootstrap.memory_lock: true
#
# Make sure that the heap size is set to about half the memory available
# on the system and that the owner of the process is allowed to use this
# limit.
#
# Elasticsearch performs poorly when the system is swapping the memory.
#
# ---------------------------------- Network -----------------------------------
#
# Set the bind address to a specific IP (IPv4 or IPv6):
#
#network.host: 192.168.0.1
network.host: 0.0.0.0
# Set a custom port for HTTP:
#
http.port: 9201
transport.tcp.port: 9301
http.cors.enabled: true
http.cors.allow-origin: "*"
# For more information, consult the network module documentation.
#
# --------------------------------- Discovery ----------------------------------
#
# Pass an initial list of hosts to perform discovery when this node is started:
# The default list of hosts is ["127.0.0.1", "[::1]"]
#
#discovery.seed_hosts: ["host1", "host2"]
discovery.seed_hosts: ["39.105.18.214:9300"]
# Bootstrap the cluster using an initial set of master-eligible nodes:
#
#cluster.initial_master_nodes: ["node-1", "node-2"]
#
# For more information, consult the discovery and cluster formation module documentation.
#
# ---------------------------------- Gateway -----------------------------------
#
# Block initial recovery after a full cluster restart until N nodes are started:
#
#gateway.recover_after_nodes: 3
#
# For more information, consult the gateway module documentation.
#
# ---------------------------------- Various -----------------------------------
#
# Require explicit names when deleting indices:
#
#action.destructive_requires_name: true
```
配置内容解析
```
cluster.name: ES-Cluster
#ES集群名称,同一个集群内的所有节点集群名称必须保持一致
node.name: ES-master-10.150.55.94
#ES集群内的节点名称,同一个集群内的节点名称要具备唯一性
node.master: true
#允许节点是否可以成为一个master节点,ES是默认集群中的第一台机器成为master,如果这台机器停止就会重新选举
node.data: false
#允许该节点存储索引数据(默认开启)
#关于Elasticsearch节点的角色功能详解,请看:https://www.dockerc.com/elasticsearch-master-or-data/
path.data: /data/ES-Cluster/master/ES-master-10.150.55.94/data1,/data/ES-Cluster/master/ES-master-10.150.55.94/data2
#ES是搜索引擎,会创建文档,建立索引,此路径是索引的存放目录,如果我们的日志数据较为庞大,那么索引所占用的磁盘空间也是不可小觑的
#这个路径建议是专门的存储系统,如果不是存储系统,最好也要有冗余能力的磁盘,此目录还要对elasticsearch的运行用户有写入权限
#path可以指定多个存储位置,分散存储,有助于性能提升,以至于怎么分散存储请看详解https://www.dockerc.com/elk-theory-elasticsearch/
path.logs: /data/ES-Cluster/master/ES-master-10.150.55.94/logs
#elasticsearch专门的日志存储位置,生产环境中建议elasticsearch配置文件与elasticsearch日志分开存储
bootstrap.memory_lock: true
#在ES运行起来后锁定ES所能使用的堆内存大小,锁定内存大小一般为可用内存的一半左右;锁定内存后就不会使用交换分区
#如果不打开此项,当系统物理内存空间不足,ES将使用交换分区,ES如果使用交换分区,那么ES的性能将会变得很差
network.host: 10.150.55.94
#es绑定地址,支持IPv4及IPv6,默认绑定127.0.0.1;es的HTTP端口和集群通信端口就会监听在此地址上
network.tcp.no_delay: true
#是否启用tcp无延迟,true为启用tcp不延迟,默认为false启用tcp延迟
network.tcp.keep_alive: true
#是否启用TCP保持活动状态,默认为true
network.tcp.reuse_address: true
#是否应该重复使用地址。默认true,在Windows机器上默认为false
network.tcp.send_buffer_size: 128mb
#tcp发送缓冲区大小,默认不设置
network.tcp.receive_buffer_size: 128mb
#tcp接收缓冲区大小,默认不设置
transport.tcp.port: 9301
#设置集群节点通信的TCP端口,默认就是9300
transport.tcp.compress: true
#设置是否压缩TCP传输时的数据,默认为false
http.max_content_length: 200mb
#设置http请求内容的最大容量,默认是100mb
http.cors.enabled: true
#是否开启跨域访问
http.cors.allow-origin: “*”
#开启跨域访问后的地址限制,*表示无限制
http.port: 9201
#定义ES对外调用的http端口,默认是9200
discovery.zen.ping.unicast.hosts: [“10.150.55.94:9301”, “10.150.55.95:9301”,“10.150.30.246:9301”] #在Elasticsearch7.0版本已被移除,配置错误
#写入候选主节点的设备地址,来开启服务时就可以被选为主节点
#默认主机列表只有127.0.0.1和IPV6的本机回环地址
#上面是书写格式,discover意思为发现,zen是判定集群成员的协议,unicast是单播的意思,ES5.0版本之后只支持单播的方式来进行集群间的通信,hosts为主机
#总结下来就是:使用zen协议通过单播方式去发现集群成员主机,在此建议将所有成员的节点名称都写进来,这样就不用仅靠集群名称cluster.name来判别集群关系了
discovery.zen.minimum_master_nodes: 2 #在Elasticsearch7.0版本已被移除,配置无效
#为了避免脑裂,集群的最少节点数量为,集群的总节点数量除以2加一
discovery.zen.fd.ping_timeout: 120s #在Elasticsearch7.0版本已被移除,配置无效
#探测超时时间,默认是3秒,我们这里填120秒是为了防止网络不好的时候ES集群发生脑裂现象
discovery.zen.fd.ping_retries: 6 #在Elasticsearch7.0版本已被移除,配置无效
#探测次数,如果每次探测90秒,连续探测超过六次,则认为节点该节点已脱离集群,默认为3次
discovery.zen.fd.ping_interval: 15s #在Elasticsearch7.0版本已被移除,配置无效
#节点每隔15秒向master发送一次心跳,证明自己和master还存活,默认为1秒太频繁,
discovery.seed_hosts: [“10.150.55.94:9301”, “10.150.55.95:9301”,“10.150.30.246:9301”]
#Elasticsearch7新增参数,写入候选主节点的设备地址,来开启服务时就可以被选为主节点,由discovery.zen.ping.unicast.hosts:参数改变而来
cluster.initial_master_nodes: [“10.150.55.94:9301”, “10.150.55.95:9301”,“10.150.30.246:9301”]
#Elasticsearch7新增参数,写入候选主节点的设备地址,来开启服务时就可以被选为主节点
cluster.fault_detection.leader_check.interval: 15s
#Elasticsearch7新增参数,设置每个节点在选中的主节点的检查之间等待的时间。默认为1秒
discovery.cluster_formation_warning_timeout: 30s
#Elasticsearch7新增参数,启动后30秒内,如果集群未形成,那么将会记录一条警告信息,警告信息未master not fount开始,默认为10秒
cluster.join.timeout: 30s
#Elasticsearch7新增参数,节点发送请求加入集群后,在认为请求失败后,再次发送请求的等待时间,默认为60秒
cluster.publish.timeout: 90s
#Elasticsearch7新增参数,设置主节点等待每个集群状态完全更新后发布到所有节点的时间,默认为30秒
cluster.routing.allocation.cluster_concurrent_rebalance: 32
#集群内同时启动的数据任务个数,默认是2个
cluster.routing.allocation.node_concurrent_recoveries: 32
#添加或删除节点及负载均衡时并发恢复的线程个数,默认4个
cluster.routing.allocation.node_initial_primaries_recoveries: 32
#初始化数据恢复时,并发恢复线程的个数,默认4个
————————————————
版权声明:本文为****博主「运维工程师 Linke」的原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.****.net/qq_31547771/article/details/100665922
```
所有节点统一修改一下jvm.options
```
## JVM configuration
################################################################
## IMPORTANT: JVM heap size
################################################################
##
## You should always set the min and max JVM heap
## size to the same value. For example, to set
## the heap to 4 GB, set:
##
## -Xms4g
## -Xmx4g
##
## See https://www.elastic.co/guide/en/elasticsearch/reference/current/heap-size.html
## for more information
##
################################################################
# Xms represents the initial size of total heap space
# Xmx represents the maximum size of total heap space
# 最重要的修改
-Xms2g
-Xmx2g
################################################################
## Expert settings
################################################################
##
## All settings below this section are considered
## expert settings. Don't tamper with them unless
## you understand what you are doing
##
################################################################
## GC configuration
-XX:+UseConcMarkSweepGC
-XX:CMSInitiatingOccupancyFraction=75
-XX:+UseCMSInitiatingOccupancyOnly
## G1GC Configuration
# NOTE: G1GC is only supported on JDK version 10 or later.
# To use G1GC uncomment the lines below.
# 10-:-XX:-UseConcMarkSweepGC
# 10-:-XX:-UseCMSInitiatingOccupancyOnly
# 10-:-XX:+UseG1GC
# 10-:-XX:InitiatingHeapOccupancyPercent=75
## DNS cache policy
# cache ttl in seconds for positive DNS lookups noting that this overrides the
# JDK security property networkaddress.cache.ttl; set to -1 to cache forever
-Des.networkaddress.cache.ttl=60
# cache ttl in seconds for negative DNS lookups noting that this overrides the
# JDK security property networkaddress.cache.negative ttl; set to -1 to cache
# forever
-Des.networkaddress.cache.negative.ttl=10
## optimizations
# pre-touch memory pages used by the JVM during initialization
-XX:+AlwaysPreTouch
## basic
# explicitly set the stack size
-Xss1m
# set to headless, just in case
-Djava.awt.headless=true
# ensure UTF-8 encoding by default (e.g. filenames)
-Dfile.encoding=UTF-8
# use our provided JNA always versus the system one
-Djna.nosys=true
# turn off a JDK optimization that throws away stack traces for common
# exceptions because stack traces are important for debugging
-XX:-OmitStackTraceInFastThrow
# flags to configure Netty
-Dio.netty.noUnsafe=true
-Dio.netty.noKeySetOptimization=true
-Dio.netty.recycler.maxCapacityPerThread=0
# log4j 2
-Dlog4j.shutdownHookEnabled=false
-Dlog4j2.disable.jmx=true
-Djava.io.tmpdir=${ES_TMPDIR}
## heap dumps
# generate a heap dump when an allocation from the Java heap fails
# heap dumps are created in the working directory of the JVM
-XX:+HeapDumpOnOutOfMemoryError
# specify an alternative path for heap dumps; ensure the directory exists and
# has sufficient space
-XX:HeapDumpPath=data
# specify an alternative path for JVM fatal error logs
-XX:ErrorFile=logs/hs_err_pid%p.log
## JDK 8 GC logging
8:-XX:+PrintGCDetails
8:-XX:+PrintGCDateStamps
8:-XX:+PrintTenuringDistribution
8:-XX:+PrintGCApplicationStoppedTime
8:-Xloggc:logs/gc.log
8:-XX:+UseGCLogFileRotation
8:-XX:NumberOfGCLogFiles=32
8:-XX:GCLogFileSize=64m
# JDK 9+ GC logging
9-:-Xlog:gc*,gc+age=trace,safepoint:file=logs/gc.log:utctime,pid,tags:filecount=32,filesize=64m
# due to internationalization enhancements in JDK 9 Elasticsearch need to set the provider to COMPAT otherwise
# time/date parsing will break in an incompatible way for some date patterns and locals
9-:-Djava.locale.providers=COMPAT
```
统一修改linux系统的es的内存权限。
elasticsearch用户拥有的内存权限太小,至少需要262144;
> 切换到root用户
执行命令:
sysctl -w vm.max_map_count=262144
查看结果:
sysctl -a|grep vm.max_map_count
显示:
vm.max_map_count = 262144
创建非root用户。
```
建立用户 useradd es
建立用户密码 passwd es
```
将es文件夹内的config文件和logs文件夹的文件权限赋予es.
chmod 777 config/*
chmod 777 logs/* 很多问题是因为es用户没有权限导致的。尽可能多的赋予es用户。
全部以es用户启动
> nohup ./elasticsearch > console.out &
如图 集群节点建立完成
![Image text](http://output-mingbo.oss-cn-beijing.aliyuncs.com/imgs/es-cluster15710408203718.png)
如果head显示无法连接,但是es确正确启动(1)
## 索引数据
> es是一个分布式的文档(document)存储引擎,可以实时存储并检索出数据结构--序列化的JSON文档,通常,我们认为对象(object)和文档(documnet)是等价相同的。
```json
检索文档 GET /megacorp/employee/1
{
"_index" : "megacorp",
"_type" : "employee",
"_id" : "1",
"_version" : 1,
"found" : true,
"_source" : {
"first_name" : "John",
"last_name" : "Smith",
"age" : 25,
"about" : "I love to go rock climbing",
"interests": [ "sports", "music" ]
}
}
```
|名称 | 备注|
| -----: | :----: |
|_index|索引名称 类似mysql数据库的表 索引这个名字必须是全部小写,不能以下划线开头,不能包含逗号。|
|_type|索引对象类型 类似每个对象都属于一个类class 可以是大写或小写,不能包含下划线或逗号|
|_id|索引主键 它与 _index 和 _type 组合时,就可以在ELasticsearch中唯一标识一个文档|
|_version|索引数据版本|
ID
- 使用自己的ID
- ES可以自增ID
_version
每次修改数据都会version都会自增1.使用乐观锁来做数据安全。
数据如何存储到某个主分片
> shard = hash(routing) % number_of_primary_shards
routing是任意字符串,默认_id。通过hash生成一个数字。除以主分片数量取余,这个余数就是存储的分片。这就是为什么创建索引时主分片数量就确定了,一旦值改变之前的路由值就无效了,文档就找不到了。
所有的新建,索引,删除请求都是些操作,必须再主分片中操作成功才可以复制到复制分片。
1. 客户端给 Node 1 发送新建、索引或删除请求。
2. 节点使用文档的 _id 确定文档属于分片 0 。它转发请求到 Node 3 ,分片 0 位于这个节点上。
3. Node 3 在主分片上执行请求,如果成功,它转发请求到相应的位于 Node 1 和 Node 2的复制节点上。当所有的复制节点报告成功, Node 3 报告成功到请求的节点,请求的节点再报告给客户端。
这表明必须主分片和复制分片全部成功,客户端猜得到相应。
replication默认为sync。
你可以设置replication 为 async,这样es在主分片执行成功后就会相应客户端。但是es依旧会转发请求给复制节点。
默认主分片在尝试写入时需要规定数量quorum或过半的分片(可以是主节点或复制节点)可用。这是防止数据被写入到错的网络分区。
> int( (primary + number_of_replicas) / 2 ) + 1
consistency允许值 one,all,默认的quorum或过半分片.
number_of_replicas是在索引中的的设置,用来定义复制分片的数量.但如果你只有2个节点,那你的活动分片不够规定数量,也就不能索引或删除任何文档。
## 搜索
es搜索可以做
1. 在类似于 gender 或者 age 这样的字段上使用结构化查询, join_date 这样的字段上使用排序,就像SQL的结构化查询一
样。
2. 全文检索,可以使用所有字段来匹配关键字,然后按照关联性(relevance)排序返回结果。
3. 或者结合以上两条。
|概念|解释|
| -----: | :----:|
|映射(Mapping)|数据在每个字段中的解释说明|
|分析(Analysis)|全文是如何处理的可以被搜索的|
|领域特定语言查询(Query DSL)|Elasticsearch使用的灵活的、强大的查询语言|
搜索相应体
```json
{
"hits": {
"total": 14,
"hits": [
{
"_index": "us",
"_type": "tweet",
"_id": "7",
"_score": 1,
"_source": {
"date": "2014-09-17",
"name": "John Smith",
"tweet": "The Query DSL is really powerful and flexible",
"user_id": 2
}
}
],
"max_score": 1
},
"took": 4,
"_shards": {
"failed": 0,
"successful": 10,
"total": 10
},
"timed_out": false
}
```
hits
响应中最重要的部分是 hits ,它包含了 total 字段来表示匹配到的文档总数, hits 数组还包含了匹配到的前10条数据。
hits 数组中的每个结果都包含 _index 、 _type 和文档的 _id 字段,被加入到 _source 字段中这意味着在搜索结果中我们将
可以直接使用全部文档。这不像其他搜索引擎只返回文档ID,需要你单独去获取文档。
每个节点都有一个 _score 字段,这是相关性得分(relevance score),它衡量了文档与查询的匹配程度。默认的,返回的结
果中关联性最大的文档排在首位;这意味着,它是按照 _score 降序排列的。这种情况下,我们没有指定任何查询,所以所有
文档的相关性是一样的,因此所有结果的 _score 都是取得一个中间值 1
max_score 指的是所有文档匹配查询中 _score 的最大值。
took
请求时间
shards
_shards 节点告诉我们参与查询的分片数( total 字段),有多少是成功的( successful字段),有多少的是失败的( failed 字段)。
timeout
是否超时
## 数据同步
无论是使用什么中间件,都是根据具体的情形和公司的情况来选择不同的方案。
在我们公司的情况下,我们使用es只是作为负责复杂搜索的方案,并不作为增删改的持久化方案,依旧采用通过mysql数据库同步数据到es。本身可以通过binlog日志同步,但是我们是使用阿里云的RDS,数据同步需要额外收费,所以我们选择的方案,采用另外一个中件件logstash来做数据同步。
在上面参考文献中下载logstash文件,安装在lunux服务器中,修改logstash.conf
```xml
# Sample Logstash configuration for creating a simple
# Beats -> Logstash -> Elasticsearch pipeline.
input {
jdbc {
#需要同步的数据库
type => "uat"
jdbc_connection_string => "jdbc:mysql://xx.xx.xx.xx:3306/fungo_games_uat"
jdbc_user => "xxx"
jdbc_password => "xxxx"
#本地jar包
jdbc_driver_library => "/opt/ELK/logstash-7.2.0/mysqltool/mysql-connector-java-8.0.15.jar"
jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "5000"
#获取到记录的SQL查询语句
use_column_value => true
tracking_column => "updated_at"
tracking_column_type => "timestamp"
lowercase_column_names => false
record_last_run => true
last_run_metadata_path => "/opt/ELK/logstash-7.2.0/config/station_parameter.txt"
clean_run => false
statement => "SELECT * FROM t_cmm_post where updated_at >= :sql_last_value order by updated_at desc "
#定时字段 各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新
schedule => "*/1 * * * *"
}
}
filter {
}
output {
if [type] == "uat" {
elasticsearch {
hosts => "xx.xx.xx.xx:9200"
index => "uat-cloudcmmpost"
document_type => "CmmPost"
document_id => "%{post_id}"
}
}
}
```
注意
需要将mysql的连接包下载指定位置,jdbc_driver_library 来指定位置,
安装logstash相关插件
安装jdbc的插件
./logstash-plugin install logstash-input-jdbc
./logstash-plugin install logstash-output-elasticsearch
启动数据同步
直接 在bin文件内 ./logstash 启动 如果报错 可能需要删除data里面的 .lock 文件
## 备注
### 1 阿里云的解决思路
阿里云上面也是采用单个主机上搭建3个es节点的集群。性能分别是1核2G的配置.因为无法登录到logstash服务的机器上,所有 我们无法直接通过链接MYSQL服务的形式,就不能像我们上面展示那样通过sql语句去同步数据,这时候阿里云上有一个DTS服务,可以解决数据传输和同步。
### 2 附属内容
elasticsearch-head 无法连接elasticsearch的原因和解决
https://blog.****.net/fst438060684/article/details/80936201
### 3 ES比mysql快
> 为什么ES比mysql快
> mysql只有term dictionary这一层,是以b-tree排序方式存储在磁盘上。检索一个term需要 若干次random access磁盘操作。但是es在此基础上添加term index来加速检索。term index以树的形式缓存内存中,减少磁盘读取次数。