elasticsearch-1.3.0 之索引代码粗略梳理

时间:2022-09-17 23:53:15

elasticsearch-1.3.0

发送请求

创建

[root@centos ~]# curl -XPUT 172.16.136.159:9200/customer?pretty
{
"acknowledged" : true
}

索引

[root@centos ~]# curl -XPUT 172.16.136.159:9200/customer/external/1?pretty '-d { "name":"JOhn Doe"}'
{
"_index" : "customer",
"_type" : "external",
"_id" : "1",
"_version" : 1,
"created" : true
}
[root@centos ~]# curl -XPUT 172.16.136.159:9200/customer/external/1?pretty '-d { "name":"JOhn Doe"}'
{
"_index" : "customer",
"_type" : "external",
"_id" : "1",
"_version" : 2,
"created" : false
}

这里先跟踪下索引的流程,netty的bootstrap暂且不管,从HttpRequestHandler的messageReceived说起

public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
HttpRequest request = (HttpRequest) e.getMessage();
// the netty HTTP handling always copy over the buffer to its own buffer, either in NioWorker internally
// when reading, or using a cumalation buffer
NettyHttpRequest httpRequest = new NettyHttpRequest(request, e.getChannel());
serverTransport.dispatchRequest(httpRequest, new NettyHttpChannel(serverTransport, e.getChannel(), httpRequest));
super.messageReceived(ctx, e);
}

这里的dispatchRequest启示就是NettyHttpServerTransport

NettyHttpServerTransport

void dispatchRequest(HttpRequest request, HttpChannel channel) {
httpServerAdapter.dispatchRequest(request, channel);
}

Dispatcher,static class Dispatcher implements HttpServerAdapter

public void dispatchRequest(HttpRequest request, HttpChannel channel) {
server.internalDispatchRequest(request, channel);
}

HttpServer

public void internalDispatchRequest(final HttpRequest request, final HttpChannel channel) {
if (request.rawPath().startsWith("/_plugin/")) {
RestFilterChain filterChain = restController.filterChain(pluginSiteFilter);
filterChain.continueProcessing(request, channel);
return;
}
restController.dispatchRequest(request, channel);
}

RestController的dispatchRequest()主要是executeHandler()

try {
executeHandler(request, channel);
} catch (Throwable e) {

executeHandler方法中不同的handler处理请求,这里的handler是RestIndexAction,继承自

final RestHandler handler = getHandler(request);
if (handler != null) {
handler.handleRequest(request, channel);
}

在BaseRestHandler中

public final void handleRequest(RestRequest request, RestChannel channel) throws Exception {
handleRequest(request, channel, usefulHeaders.length == 0 ? client : new HeadersCopyClient(client, request, usefulHeaders));
}
protected abstract void handleRequest(RestRequest request, RestChannel channel, Client client) throws Exception;

实现父类在BaseRestHandler的handleRequest方法,最后调用NodeClient的index方法

client.index(indexRequest, new RestBuilderListener<IndexResponse>(channel) {

NodeClient的父类AbstractClient中index的实现

public void index(final IndexRequest request, final ActionListener<IndexResponse> listener) {
execute(IndexAction.INSTANCE, request, listener);
}

NodeClient中的execute方法实现

public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder, Client>> void execute(Action<Request, Response, RequestBuilder, Client> action, Request request, ActionListener<Response> listener) {
TransportAction<Request, Response> transportAction = actions.get((ClientAction)action);
transportAction.execute(request, listener);//TransportIndexAction extends TransportShardReplicationOperationAction
}

这里的transportAction是TransportIndexAction,

其中TransportShardReplicationOperationAction是TransportIndexAction的父类,中TransportShardReplicationOperationAction继承自TransportAction,TransportAction中execute的实现

public void execute(Request request, ActionListener<Response> listener) {
if (request.listenerThreaded()) {
listener = new ThreadedActionListener<>(threadPool, listener, logger);
}
ActionRequestValidationException validationException = request.validate();
if (validationException != null) {
listener.onFailure(validationException);
return;
}
try {
doExecute(request, listener);
} catch (Throwable e) {
logger.trace("Error during transport action execution.", e);
listener.onFailure(e);
}
}

直接调用TransportIndexAction的doExecute

 protected void doExecute(final IndexRequest request, final ActionListener<IndexResponse> listener) {
// if we don't have a master, we don't have metadata, that's fine, let it find a master using create index API
if (autoCreateIndex.shouldAutoCreate(request.index(), clusterService.state())) {
request.beforeLocalFork(); // we fork on another thread...
createIndexAction.execute(new CreateIndexRequest(request.index()).cause("auto(index api)").masterNodeTimeout(request.timeout()), new ActionListener<CreateIndexResponse>() {
@Override
public void onResponse(CreateIndexResponse result) {
innerExecute(request, listener);
} @Override
public void onFailure(Throwable e) {
if (ExceptionsHelper.unwrapCause(e) instanceof IndexAlreadyExistsException) {
// we have the index, do it
try {
innerExecute(request, listener);
} catch (Throwable e1) {
listener.onFailure(e1);
}
} else {
listener.onFailure(e);
}
}
});
} else {
innerExecute(request, listener);
}
}

这里走 innerExecute(request, listener);

private void innerExecute(final IndexRequest request, final ActionListener<IndexResponse> listener) {
super.doExecute(request, listener);
}

这里的super就是TransportShardReplicationOperationAction了,TransportShardReplicationOperationAction中doExecute的实现

protected void doExecute(Request request, ActionListener<Response> listener) {
new AsyncShardOperationAction(request, listener).start();
}

主要两个方法,一个是获取shard,另一个是shardOperationOnPrimary;

其中shard后边再说,shardOperationOnPrimary在TransportIndexAction实现

protected PrimaryResponse<IndexResponse, IndexRequest> shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) {
final IndexRequest request = shardRequest.request; // validate, if routing is required, that we got routing
IndexMetaData indexMetaData = clusterState.metaData().index(request.index());
MappingMetaData mappingMd = indexMetaData.mappingOrDefault(request.type());
if (mappingMd != null && mappingMd.routing().required()) {
if (request.routing() == null) {
throw new RoutingMissingException(request.index(), request.type(), request.id());
}
} IndexService indexService = indicesService.indexServiceSafe(shardRequest.request.index());
IndexShard indexShard = indexService.shardSafe(shardRequest.shardId);
SourceToParse sourceToParse = SourceToParse.source(SourceToParse.Origin.PRIMARY, request.source()).type(request.type()).id(request.id())
.routing(request.routing()).parent(request.parent()).timestamp(request.timestamp()).ttl(request.ttl());
long version;
boolean created;
Engine.IndexingOperation op;
if (request.opType() == IndexRequest.OpType.INDEX) {
Engine.Index index = indexShard.prepareIndex(sourceToParse, request.version(), request.versionType(), Engine.Operation.Origin.PRIMARY, request.canHaveDuplicates());
if (index.parsedDoc().mappingsModified()) {
mappingUpdatedAction.updateMappingOnMaster(request.index(), index.docMapper(), indexService.indexUUID());
}
indexShard.index(index);
version = index.version();
op = index;
created = index.created();
} else {
Engine.Create create = indexShard.prepareCreate(sourceToParse,
request.version(), request.versionType(), Engine.Operation.Origin.PRIMARY, request.canHaveDuplicates(), request.autoGeneratedId());
if (create.parsedDoc().mappingsModified()) {
mappingUpdatedAction.updateMappingOnMaster(request.index(), create.docMapper(), indexService.indexUUID());
}
indexShard.create(create);
version = create.version();
op = create;
created = true;
}
if (request.refresh()) {
try {
indexShard.refresh(new Engine.Refresh("refresh_flag_index").force(false));
} catch (Throwable e) {
// ignore
}
} // update the version on the request, so it will be used for the replicas
request.version(version);
request.versionType(request.versionType().versionTypeForReplicationAndRecovery()); assert request.versionType().validateVersionForWrites(request.version()); IndexResponse response = new IndexResponse(request.index(), request.type(), request.id(), version, created);
return new PrimaryResponse<>(shardRequest.request, response, op);
}

走request.opType() == IndexRequest.OpType.INDEX分支,主要是indexShard.prepareIndex,indexShard.index(index)这里IndexShard是InternalIndexShard,的index实现

public ParsedDocument index(Engine.Index index) throws ElasticsearchException {
writeAllowed(index.origin());
index = indexingService.preIndex(index);
try {
if (logger.isTraceEnabled()) {
logger.trace("index [{}][{}]{}", index.type(), index.id(), index.docs());
}
engine.index(index);
index.endTime(System.nanoTime());
} catch (RuntimeException ex) {
indexingService.failedIndex(index);
throw ex;
}
indexingService.postIndex(index);
return index.parsedDoc();
}

indexingService对应ShardIndexingService, engine是InternalEngine,InternalEngine的index()

public void index(Index index) throws EngineException {
final IndexWriter writer;
try (InternalLock _ = readLock.acquire()) {
writer = currentIndexWriter();
try (Releasable r = throttle.acquireThrottle()) {
innerIndex(index, writer);
}
dirty = true;
possibleMergeNeeded = true;
flushNeeded = true;
} catch (OutOfMemoryError | IllegalStateException | IOException t) {
maybeFailEngine(t, "index");
throw new IndexFailedEngineException(shardId, index, t);
}
checkVersionMapRefresh();
}

最终在InternalEngine的innerIndex方法中调用lunece的IndexWriter的,依据是不是存在有版本,来通过 writer.addDocuments或者updateDocument方法添加或者更新索引

添加add索引

if (index.docs().size() > 1) {
writer.addDocuments(index.docs(), index.analyzer());
} else {
writer.addDocument(index.docs().get(0), index.analyzer());
}

更新update索引

if (index.docs().size() > 1) {
writer.updateDocuments(index.uid(), index.docs(), index.analyzer());
} else {
writer.updateDocument(index.uid(), index.docs().get(0), index.analyzer());
}

最后Translog

Translog.Location translogLocation = translog.add(new Translog.Index(index));

具体代码

private void innerIndex(Index index, IndexWriter writer) throws IOException {
synchronized (dirtyLock(index.uid())) {
final long currentVersion;
VersionValue versionValue = versionMap.getUnderLock(index.uid().bytes());
if (versionValue == null) {
currentVersion = loadCurrentVersionFromIndex(index.uid());
} else {
if (enableGcDeletes && versionValue.delete() && (threadPool.estimatedTimeInMillis() - versionValue.time()) > gcDeletesInMillis) {
currentVersion = Versions.NOT_FOUND; // deleted, and GC
} else {
currentVersion = versionValue.version();
}
} long updatedVersion;
long expectedVersion = index.version();
if (index.versionType().isVersionConflictForWrites(currentVersion, expectedVersion)) {
if (index.origin() == Operation.Origin.RECOVERY) {
return;
} else {
throw new VersionConflictEngineException(shardId, index.type(), index.id(), currentVersion, expectedVersion);
}
}
updatedVersion = index.versionType().updateVersion(currentVersion, expectedVersion); index.updateVersion(updatedVersion);
if (currentVersion == Versions.NOT_FOUND) {
// document does not exists, we can optimize for create
index.created(true);
if (index.docs().size() > 1) {
writer.addDocuments(index.docs(), index.analyzer());
} else {
writer.addDocument(index.docs().get(0), index.analyzer());
}
} else {
if (versionValue != null) {
index.created(versionValue.delete()); // we have a delete which is not GC'ed...
}
if (index.docs().size() > 1) {
writer.updateDocuments(index.uid(), index.docs(), index.analyzer());
} else {
writer.updateDocument(index.uid(), index.docs().get(0), index.analyzer());
}
}
Translog.Location translogLocation = translog.add(new Translog.Index(index)); versionMap.putUnderLock(index.uid().bytes(), new VersionValue(updatedVersion, translogLocation)); indexingService.postIndexUnderLock(index);
}
}

link

分布式搜索Elasticsearch源码分析之二------索引过程源码概要分析

elasticsearch-1.3.0 之索引代码粗略梳理的更多相关文章

  1. Elasticsearch 7&period;4&period;0官方文档操作

    官方文档地址 https://www.elastic.co/guide/en/elasticsearch/reference/current/index.html 1.0.0 设置Elasticsea ...

  2. Linux下&comma;非Docker启动Elasticsearch 6&period;3&period;0&comma;安装ik分词器插件&comma;以及使用Kibana测试Elasticsearch&comma;

    Linux下,非Docker启动Elasticsearch 6.3.0 查看java版本,需要1.8版本 java -version yum -y install java 创建用户,因为elasti ...

  3. ElasticSearch基本操作(安装,索引的创建和删除,映射)

    ElasticSearch基于Lucene的搜索服务器,支持分布式,提供REST接口,可用于云计算,可以实现实时搜索,开源免费.这时很官方的一句话,在使用之前,我们简单的介绍一下安装过程.在官网下载之 ...

  4. Elasticsearch-基础介绍及索引原理分析(转载)

    最近在参与一个基于Elasticsearch作为底层数据框架提供大数据量(亿级)的实时统计查询的方案设计工作,花了些时间学习Elasticsearch的基础理论知识,整理了一下,希望能对Elastic ...

  5. Elasticsearch-基础介绍及索引原理分析

    介绍 Elasticsearch 是一个分布式可扩展的实时搜索和分析引擎,一个建立在全文搜索引擎 Apache Lucene(TM) 基础上的搜索引擎.当然 Elasticsearch 并不仅仅是 L ...

  6. elasticsearch简介和倒排序索引介绍

    介绍 我们为什么要用搜索引擎?我们的所有数据在数据库里面都有,而且 Oracle.SQL Server 等数据库里也能提供查询检索或者聚类分析功能,直接通过数据库查询不就可以了吗?确实,我们大部分的查 ...

  7. lucene&amp&semi;solr学习——创建和查询索引&lpar;代码篇&rpar;

    1. Lucene的下载 Lucene是开发全文检索功能的工具包,从官网下载Lucene4.10.3并解压. 官网:http://lucene.apache.org/ 版本:lucene7.7.0 ( ...

  8. 【拆分版】Docker-compose构建Elasticsearch 7&period;1&period;0集群

    写在前边 搞了两三天了,一直有个问题困扰着我,ES集群中配置怎么能正确映射到主机上,这边经常报ClusterFormationFailureHelper master not discovered o ...

  9. elasticsearch——海量文档高性能索引系统

    elasticsearch elasticsearch是一个高性能高扩展性的索引系统,底层基于apache lucene. 可结合kibana工具进行可视化. 概念: index 索引: 类似SQL中 ...

随机推荐

  1. 常用jQuery代码

    返回元素的html包括它本身 <div class="test"><p>hello,你好!</p></div> <script ...

  2. Android学习笔记之树形菜单的应用&period;&period;&period;

    PS:终于考完试了,总算是解脱了...可以正式上手项目开发了.... 学习内容: 1.掌握如何使用树形菜单...   对知识点进行一下补充...居然忘记了去学习树形菜单...不过在这里补上... Ex ...

  3. Tencent 的电话面试

    Tencent的实习生招聘投了简历.然后,万万没想到昨晚腾讯IEG直接给我电话了.当时就惊呆了,我都没有找人内推,就直接电话面试了. 就为昨晚的电话面试写写感想吧!问的挺多的,基本上简历上写了的都问到 ...

  4. mysql index的长度限制

    在InnoDB Storage Engine中单独一个列的最大的索引长度为767bytes,utf8字符集中,一个字符占3个字节,所以如果列的类型为char,那么要想在此列上建立索引,此列最多只能有2 ...

  5. Director Scene Layer and Sprite

    Scenes Director Layers Multiple Layers Example: Sprites References Scenes A scene (implemented with ...

  6. Swift - 短信发送功能的实现

    使用MessageUI.framework框架可以实现短信发送功能,步骤如下: (1)首先判断设备是否有发送短信功能 (2)如果设备允许发送短信,创建一个MFMessageComposeViewCon ...

  7. nginx的五种负载算法模式

    nginx 负载均衡5种配置方式 1.轮询(默认) 每个请求按时间顺序逐一分配到不同的后端服务器,如果后端服务器down掉,能自动剔除. 2.weight  指定轮询几率,weight和访问比率成正比 ...

  8. 使用前端技术和MySQL&plus;PHP制作自己的一个个人博客网站

    源代码地址:https://github.com/YauCheun/BlogCode 我的博客网站地址:http://www.yublog.fun/ 制作前景: 想拥有一个自己独自开发的一个小型博客网 ...

  9. Python装饰器、内置函数之金兰契友

    装饰器:装饰器的实质就是一个闭包,而闭包又是嵌套函数的一种.所以也可以理解装饰器是一种特殊的函数.因为程序一般都遵守开放封闭原则,软件在设计初期不可能把所有情况都想到,所以一般软件都支持功能上的扩展, ...

  10. (一)java基础

    注:本栏均为学习笔记 一.java标识符 标识符是用来命名的. 规则:字母数字下划线$组成,且不能以数字开头.不能使用java中的关键字. 一般:项目名.包名全部小写 变量名.方法名首字母小写,驼峰命 ...