ElasticSearch 数据读写

时间:2022-12-24 07:53:08

一、数据模型

1.1 PacificA 算法

ES 的数据副本模型基于主从模式,参考了 PacificA 算法。

PacificA 是一个通用、抽象的框架,而不是具体的算法,主要由两部分组成:

  • 存储管理:负责数据的读取和更新,采用多副本保证数据的可靠性和可用性。
  • 配置管理:对配置信息进行管理,通过 Paxos 维护配置信息的一致性。

基本结构包括:

  1. Replica Group:副本组,包括主和从副本。
  2. Configuration:配置信息,维护了副本组的信息,包括主副本是谁、副本位于哪个节点等。
  3. Configuration Version:配置的版本号。
  4. Serial Number:写操作序列号,每个主副本维护。
  5. Prepared List:写操作的准备序列。
  6. Committted List:写操作的提交序列。

副本策略

分片副本使用主从模式,写操作写主副本,主副本故障从从副本中选取新主副本。

数据写入流程:

  1. 写请求主副本节点,节点创建 UpdateRequest,并分配Serial Number,插入到 Prepared List。
  2. 主副本将 带着 SN 的UpdateRequest广播给从副本节点,从节点也插入 Prepared List ,然后返回 ACK。
  3. 主副本收到所有从副本的 ACK 后,将 UpdateRequest 放入 Committted List,表示提交。
  4. 同样把提交信息通知到从副本节点。

配置管理

存在一个全局配置管理器对所有副本组的配置进行管理,通过一致性算法和版本号解决一致性以及并发问题。

错误检测

主要应用于检测主副本情况,采用租约的机制:

  • 如果副本在租约期限内,未收到从副本节点的租约回复,则认为从副本异常,向全局配置管理器汇报,移除该副本,并且自己不再作为主副本。
  • 如果副本在租约期限内,未收到主副本节点的租约请求,则认为主副本异常,向管理器汇报,移除主副本,并自己提升为主副本。

1.2 ES 数据副本模型

ES 数据副本模型参照 PacificA 算法实现:

  • Master 负责维护索引元信息,类似配置管理器的角色。
  • 集群状态中的 routing_table 存储了所有索引、索引的shard、主分片、位于哪个节点等信息,类似副本组的功能。
  • ES 中有 Sequence Number 和 Checkpoint,类似 PacificA 的 Serial Number 和 Committed Point。

ES 一个索引会拆成多个分片,每个分片又有多个副本,对于文档的添加删除都是同步写的。

1.2.1 写入模型

写入过程会遵循以下基本流程:

  1. 请求到达协调节点,协调节点对操作进行验证合法性,失败则拒绝操作,然后会请求到对应主分片所在节点,每个shard都会在master结点里维护一个in-sync group。 每次写操作,都是先发给对应in-sync group里的主片(如果指定 routing 会根据该参数选择)。
  2. 主分片在本地执行操作,对操作合法性进行判断。(因为主分片本地会先执行,可能存在响应写入成功之前读取到更新的数据)。
  3. 如果执行成功会并行转发操作到所有副分片(主分片会维护需要同步的副分片列表)。
  4. 主分片等待所有副分片成功执行并返回ACK后,响应协调节点写入成功。

同步写入的过程,可能会出现一些异常情况:

  1. 主分片自身错误,会报告 Master,Master 提升一个副分片为主分片,并将对应操作交给副分片执行。
  2. 主分片同步转发操作到副分片,由于网络、硬件等问题,副分片没有返回 ACK,主分片会发送消息到 Master 剔除该副分片,并重新建立一个副分片。
  3. 同样主分片会根据副分片的响应来判断自己的状态,如果一个主分片存在分区或者其他异常情况,但仍继续处理读写操作,会收到副分片的拒绝响应,这时候会询问 Master 自己的身份来判断是否需要降级。

1.2.2 读取模型

读取过程遵循以下基本流程:

  1. 协调节点将读请求转发到相关的分片。
  2. 从分片中选取某个副本,可以是主副本也可以是从副本。
  3. 获取到数据到协调节点后,合并结果并给客户端响应。

在某个副本出现异常情况无法提供读能力,协调节点可以选择另一个副本读取,或者返回部分结果,可以在 _shards 字段分辨是否有副本读取失败。

  "_shards" : {
    "total" : 3,
    "successful" : 3,
    "skipped" : 0,
    "failed" : 0
  },

1.2.3 一致性保障

  1. Master 节点会维护一个 in-sync group,里面包含所有同步了最新数据的副本,每个分片有一个唯一标识 Allocation ID,如果某分片主副本异常,会从这些分片中选取新的主副本,来保障主副本有最新的数据。

  2. 通过 Primary Terms 和 Squence Numbers 来校验分布式场景的数据的一致性,以及进行数据的矫正,类似 Raft 的 terms 和 index。

    • Primary Terms 在每次主副本变更的时候递增。
    • Squence Numbers 由主副本分配,写操作递增。
  3. 通过 global checkpoint 来降低历史数据恢复的复杂度,在全局保证一致的数据检查点会向前推进,在进行数据恢复的时候只需要校验检查点之后的数据。

  4. 每个文档会有一个自带字段 _version 作为乐观锁来保障并发写入的安全性。

二、ES 数据写入

ES 对于文档的写操作,单个处理称为 Index 请求,批量写入称为 Bulk 请求,逻辑相同,会被统一封装为 BulkRequest。

ES 对文档的操作主要包括几种类型:

enum OpType {
        /** 索引操作,类似 Upsert */
        INDEX(0),
        /** 创建文档操作,如果文档存在,请求失败*/
        CREATE(1),
        /** 更新文档 */
        UPDATE(2),
        /** 删除文档  */
        DELETE(3);
}

另外 Index 和 Bulk 的 API 都有可选参数指定,包括:

  • version:文档的版本号,用于乐观锁。
  • version_type:与 version 同时传入,用于控制版本的比较方式,可选以下参数:
    • internal:当前版本和传入版本一样则写入;
    • external:当前版本小于传入版本则写入;
  • op_type:操作类型,如上四种。
  • routing:路由到具体分片的分片键。
  • wait_for_active_shards:用于控制写一致性,指定数量的副本可用才能执行写入,否则重试至超时,默认1,即主分片可用。
  • refresh:写入刷盘的策略。
  • timeout:超时事件,默认1min。
  • pipeline:指定事先创建的pipeline,用于数据处理加工。

2.1 写入详细流程

写入大体流程是写请求到协调节点,然后转发至主分片,后主分片转发到副分片,写入成功后协调节点响应客户端。

2.1.1 协调节点流程

协调节点负责创建索引,转发请求到主分片,等待响应后回复客户端。

  1. 参数校验:对基础参数校验合法性。

  2. 处理 pipeline:如果有声明一些 pipeline 的处理器,会对数据进行加工。

  3. 自动创建索引:如果配置允许自动创建索引,会选出不存在的索引进行自动创建。

    • 发现来一种比较有意思的写法,等待所有索引创建完的最后一个回调做一些后续操作。
if (autoCreateIndices.isEmpty()) {
                executeBulk(task, bulkRequest, startTime, listener, responses, indicesThatCannotBeCreated);
            } else {
                // 定义一个原子整型
                final AtomicInteger counter = new AtomicInteger(autoCreateIndices.size());
                // 循环需要创建的索引
                for (String index : autoCreateIndices) {
                	// 向 Master 请求创建索引
                    createIndex(index, bulkRequest.timeout(), minNodeVersion,
                    	// 注册异步监听等待 Master 创建结果
                        new ActionListener<CreateIndexResponse>() {
                        @Override
                        public void onResponse(CreateIndexResponse result) {
                            // 在所有索引都创建完成最后一个回调的响应处理后续流程
                            if (counter.decrementAndGet() == 0) {
                                // do something
                            }
                        }

                        @Override
                        public void onFailure(Exception e) {
                            // do something
                            if (counter.decrementAndGet() == 0) {
                                // do something
                            }
                        }
                    });
                }
            }
  1. 请求预处理:对于请求的基本信息进行检查,例如文档id生成,routing 处理等。
  2. 检测集群状态:集群异常则取消写入。
  3. 构建基于 shard 的请求:通过 shardId 合并请求。
Map<ShardId, List<BulkItemRequest>> requestsByShard = new HashMap<>();
  1. 路由:通过 routing 来计算目标 shard;

    • 一般情况的计算公式:shard_num = hash(_routing) % num_primary_shards;
    • 为了解决部分数据倾斜,可以设置 index.routing_partition_size 参数,思想是通过 routing 计算一组分片,再由 id 选择组内的一个分片,公式为:
shard_num = (hash(_routing) + hash(_id) % routing_partition_size) % num_primary_shards
  1. 转发请求并等待响应:获取写操作主分片对应节点,如果本节点则执行,否则转发到对应节点。

2.1.2 主分片节点流程

  1. 写请求基础校验:包括是否主分片、主分片是否迁移,含有副分片数量是否满足 wait_for_active_shards 参数等。

  2. 写 lucene 和 translog,并根据配置的事物日志刷新策略来刷盘。

  3. 并发写每个副分片,并等待响应。

    • 如果全部写入成功并返回ack,则响应协调节点成功。
    • 如果有写入失败,主分片就上报 Master,Master 更新集群状态,将失败 shard 从 in_sync_allocations 中移除,并更改 shard 状态为 UNASSIGNED。

三、ES 数据读取

ES 的读取主要分为 GET 和 Search 两种操作

  • GET 是会指定 _id 通过正排索引的搜索;
  • Search 是一些复杂场景通过倒排索引的检索;

3.1 GET 流程

GET 请求也可以指定参数来定制一些特殊逻辑:

  • realtime:默认为 true,默认 GET 会获取最新的数据,不受索引刷新影响,如果文档更新了但是没有刷盘则会触发一次 refresh。
  • source filtering :默认情况 true,会返回文档全部内容,可以设置false,不返回文档内容,或者通过 _source_include 和 source_exclude 指定返回哪些字段。
  • stored Fields:指定返回哪些mapping中设置为 stored 的字段。
  • _source:返回 doc 原始内容,不反悔一些元信息。
  • routing:分片键。
  • preference:优先从某个分片获取数据。
  • refresh:是否在读取之前进行刷盘操作。
  • version:指定读取版本,如果最新版本不符合,返回 409。

GET 流程就只要包括两个步骤

  1. 协调节点对于读请求进行路由转发,后等待全部完成读取后响应客户端,对于批量处理场景,部分失败,不会影响整体返回,并在 resp 里标注。
  2. 数据节点对于读请求进行响应,包括数据读取,过滤等操作返回协调节点。

3.2 Search 流程

Search 流程主要会经过两个节点,协调节点和数据节点,协调节点负责数据获取加工,搜索任务又会分为两个阶段, query then fetch;数据节点则执行真正的数据读取。

  1. 协调节点:

    • Query 阶段:遍历所有分片发送查询请求收集需要的返回结果,对结果进行排序筛选等。
    • Fetch 阶段:通过文档的 id 获取文档的全部内容。
  2. 数据节点:

    • 底层还是通过调用 lucene 进行分词,查询等,ES 负责打分,聚合等加工操作。