文章目录
一、数据模型
1.1 PacificA 算法
ES 的数据副本模型基于主从模式,参考了 PacificA 算法。
PacificA 是一个通用、抽象的框架,而不是具体的算法,主要由两部分组成:
- 存储管理:负责数据的读取和更新,采用多副本保证数据的可靠性和可用性。
- 配置管理:对配置信息进行管理,通过 Paxos 维护配置信息的一致性。
基本结构包括:
- Replica Group:副本组,包括主和从副本。
- Configuration:配置信息,维护了副本组的信息,包括主副本是谁、副本位于哪个节点等。
- Configuration Version:配置的版本号。
- Serial Number:写操作序列号,每个主副本维护。
- Prepared List:写操作的准备序列。
- Committted List:写操作的提交序列。
副本策略
分片副本使用主从模式,写操作写主副本,主副本故障从从副本中选取新主副本。
数据写入流程:
- 写请求主副本节点,节点创建 UpdateRequest,并分配Serial Number,插入到 Prepared List。
- 主副本将 带着 SN 的UpdateRequest广播给从副本节点,从节点也插入 Prepared List ,然后返回 ACK。
- 主副本收到所有从副本的 ACK 后,将 UpdateRequest 放入 Committted List,表示提交。
- 同样把提交信息通知到从副本节点。
配置管理
存在一个全局配置管理器对所有副本组的配置进行管理,通过一致性算法和版本号解决一致性以及并发问题。
错误检测
主要应用于检测主副本情况,采用租约的机制:
- 如果主副本在租约期限内,未收到从副本节点的租约回复,则认为从副本异常,向全局配置管理器汇报,移除该副本,并且自己不再作为主副本。
- 如果从副本在租约期限内,未收到主副本节点的租约请求,则认为主副本异常,向管理器汇报,移除主副本,并自己提升为主副本。
1.2 ES 数据副本模型
ES 数据副本模型参照 PacificA 算法实现:
- Master 负责维护索引元信息,类似配置管理器的角色。
- 集群状态中的 routing_table 存储了所有索引、索引的shard、主分片、位于哪个节点等信息,类似副本组的功能。
- ES 中有 Sequence Number 和 Checkpoint,类似 PacificA 的 Serial Number 和 Committed Point。
ES 一个索引会拆成多个分片,每个分片又有多个副本,对于文档的添加删除都是同步写的。
1.2.1 写入模型
写入过程会遵循以下基本流程:
- 请求到达协调节点,协调节点对操作进行验证合法性,失败则拒绝操作,然后会请求到对应主分片所在节点,每个shard都会在master结点里维护一个in-sync group。 每次写操作,都是先发给对应in-sync group里的主片(如果指定 routing 会根据该参数选择)。
- 主分片在本地执行操作,对操作合法性进行判断。(因为主分片本地会先执行,可能存在响应写入成功之前读取到更新的数据)。
- 如果执行成功会并行转发操作到所有副分片(主分片会维护需要同步的副分片列表)。
- 主分片等待所有副分片成功执行并返回ACK后,响应协调节点写入成功。
同步写入的过程,可能会出现一些异常情况:
- 主分片自身错误,会报告 Master,Master 提升一个副分片为主分片,并将对应操作交给副分片执行。
- 主分片同步转发操作到副分片,由于网络、硬件等问题,副分片没有返回 ACK,主分片会发送消息到 Master 剔除该副分片,并重新建立一个副分片。
- 同样主分片会根据副分片的响应来判断自己的状态,如果一个主分片存在分区或者其他异常情况,但仍继续处理读写操作,会收到副分片的拒绝响应,这时候会询问 Master 自己的身份来判断是否需要降级。
1.2.2 读取模型
读取过程遵循以下基本流程:
- 协调节点将读请求转发到相关的分片。
- 从分片中选取某个副本,可以是主副本也可以是从副本。
- 获取到数据到协调节点后,合并结果并给客户端响应。
在某个副本出现异常情况无法提供读能力,协调节点可以选择另一个副本读取,或者返回部分结果,可以在 _shards 字段分辨是否有副本读取失败。
"_shards" : {
"total" : 3,
"successful" : 3,
"skipped" : 0,
"failed" : 0
},
1.2.3 一致性保障
-
Master 节点会维护一个 in-sync group,里面包含所有同步了最新数据的副本,每个分片有一个唯一标识 Allocation ID,如果某分片主副本异常,会从这些分片中选取新的主副本,来保障主副本有最新的数据。
-
通过 Primary Terms 和 Squence Numbers 来校验分布式场景的数据的一致性,以及进行数据的矫正,类似 Raft 的 terms 和 index。
- Primary Terms 在每次主副本变更的时候递增。
- Squence Numbers 由主副本分配,写操作递增。
-
通过 global checkpoint 来降低历史数据恢复的复杂度,在全局保证一致的数据检查点会向前推进,在进行数据恢复的时候只需要校验检查点之后的数据。
-
每个文档会有一个自带字段 _version 作为乐观锁来保障并发写入的安全性。
二、ES 数据写入
ES 对于文档的写操作,单个处理称为 Index 请求,批量写入称为 Bulk 请求,逻辑相同,会被统一封装为 BulkRequest。
ES 对文档的操作主要包括几种类型:
enum OpType {
/** 索引操作,类似 Upsert */
INDEX(0),
/** 创建文档操作,如果文档存在,请求失败*/
CREATE(1),
/** 更新文档 */
UPDATE(2),
/** 删除文档 */
DELETE(3);
}
另外 Index 和 Bulk 的 API 都有可选参数指定,包括:
- version:文档的版本号,用于乐观锁。
- version_type:与 version 同时传入,用于控制版本的比较方式,可选以下参数:
- internal:当前版本和传入版本一样则写入;
- external:当前版本小于传入版本则写入;
- op_type:操作类型,如上四种。
- routing:路由到具体分片的分片键。
- wait_for_active_shards:用于控制写一致性,指定数量的副本可用才能执行写入,否则重试至超时,默认1,即主分片可用。
- refresh:写入刷盘的策略。
- timeout:超时事件,默认1min。
- pipeline:指定事先创建的pipeline,用于数据处理加工。
2.1 写入详细流程
写入大体流程是写请求到协调节点,然后转发至主分片,后主分片转发到副分片,写入成功后协调节点响应客户端。
2.1.1 协调节点流程
协调节点负责创建索引,转发请求到主分片,等待响应后回复客户端。
-
参数校验:对基础参数校验合法性。
-
处理 pipeline:如果有声明一些 pipeline 的处理器,会对数据进行加工。
-
自动创建索引:如果配置允许自动创建索引,会选出不存在的索引进行自动创建。
- 发现来一种比较有意思的写法,等待所有索引创建完的最后一个回调做一些后续操作。
if (autoCreateIndices.isEmpty()) {
executeBulk(task, bulkRequest, startTime, listener, responses, indicesThatCannotBeCreated);
} else {
// 定义一个原子整型
final AtomicInteger counter = new AtomicInteger(autoCreateIndices.size());
// 循环需要创建的索引
for (String index : autoCreateIndices) {
// 向 Master 请求创建索引
createIndex(index, bulkRequest.timeout(), minNodeVersion,
// 注册异步监听等待 Master 创建结果
new ActionListener<CreateIndexResponse>() {
@Override
public void onResponse(CreateIndexResponse result) {
// 在所有索引都创建完成最后一个回调的响应处理后续流程
if (counter.decrementAndGet() == 0) {
// do something
}
}
@Override
public void onFailure(Exception e) {
// do something
if (counter.decrementAndGet() == 0) {
// do something
}
}
});
}
}
- 请求预处理:对于请求的基本信息进行检查,例如文档id生成,routing 处理等。
- 检测集群状态:集群异常则取消写入。
- 构建基于 shard 的请求:通过 shardId 合并请求。
Map<ShardId, List<BulkItemRequest>> requestsByShard = new HashMap<>();
-
路由:通过 routing 来计算目标 shard;
- 一般情况的计算公式:shard_num = hash(_routing) % num_primary_shards;
- 为了解决部分数据倾斜,可以设置 index.routing_partition_size 参数,思想是通过 routing 计算一组分片,再由 id 选择组内的一个分片,公式为:
shard_num = (hash(_routing) + hash(_id) % routing_partition_size) % num_primary_shards
- 转发请求并等待响应:获取写操作主分片对应节点,如果本节点则执行,否则转发到对应节点。
2.1.2 主分片节点流程
-
写请求基础校验:包括是否主分片、主分片是否迁移,含有副分片数量是否满足 wait_for_active_shards 参数等。
-
写 lucene 和 translog,并根据配置的事物日志刷新策略来刷盘。
-
并发写每个副分片,并等待响应。
- 如果全部写入成功并返回ack,则响应协调节点成功。
- 如果有写入失败,主分片就上报 Master,Master 更新集群状态,将失败 shard 从 in_sync_allocations 中移除,并更改 shard 状态为 UNASSIGNED。
三、ES 数据读取
ES 的读取主要分为 GET 和 Search 两种操作
- GET 是会指定 _id 通过正排索引的搜索;
- Search 是一些复杂场景通过倒排索引的检索;
3.1 GET 流程
GET 请求也可以指定参数来定制一些特殊逻辑:
- realtime:默认为 true,默认 GET 会获取最新的数据,不受索引刷新影响,如果文档更新了但是没有刷盘则会触发一次 refresh。
- source filtering :默认情况 true,会返回文档全部内容,可以设置false,不返回文档内容,或者通过 _source_include 和 source_exclude 指定返回哪些字段。
- stored Fields:指定返回哪些mapping中设置为 stored 的字段。
- _source:返回 doc 原始内容,不反悔一些元信息。
- routing:分片键。
- preference:优先从某个分片获取数据。
- refresh:是否在读取之前进行刷盘操作。
- version:指定读取版本,如果最新版本不符合,返回 409。
GET 流程就只要包括两个步骤
- 协调节点对于读请求进行路由转发,后等待全部完成读取后响应客户端,对于批量处理场景,部分失败,不会影响整体返回,并在 resp 里标注。
- 数据节点对于读请求进行响应,包括数据读取,过滤等操作返回协调节点。
3.2 Search 流程
Search 流程主要会经过两个节点,协调节点和数据节点,协调节点负责数据获取加工,搜索任务又会分为两个阶段, query then fetch;数据节点则执行真正的数据读取。
-
协调节点:
- Query 阶段:遍历所有分片发送查询请求收集需要的返回结果,对结果进行排序筛选等。
- Fetch 阶段:通过文档的 id 获取文档的全部内容。
-
数据节点:
- 底层还是通过调用 lucene 进行分词,查询等,ES 负责打分,聚合等加工操作。