本文所说的java客户端主要只是讲解一下用于插数据的client的原理,我们都知道往ES发数据有三种protocol分别是node、http和transport;其实对于其他client而言最终都是使用的http;而java是可以使用node和transport的,node方式一般很少用,所以我们只探究transport client,那么我们且来看看吧。
发送端例子
对于java client的数据发送(这里以bulk为例),写过的人都知道,其实是很简单的,因为大部分事情都已经被client做掉了,那么我们先给出例子感知一下:
client初始化
Settings settings = Settings.settingsBuilder()
.put("cluster.name", "myClusterName")
.put("client.transport.sniff", true).build();
client=new TransportClient.builder().settings(settings).build()
.addTransportAddress(new InetSocketTransportAddress("host1",9300))
.addTransportAddress(new InetSocketTransportAddress("host2",9300));
bulk数据发送
对于数据的发送ES提供了两种方式:
第一种bulk api:
import static org.elasticsearch.common.xcontent.XContentFactory.*;
BulkRequestBuilder bulkRequest = client.prepareBulk();
// either use client#prepare, or use Requests# to directly build index/delete requests
bulkRequest.add(client.prepareIndex("twitter", "tweet", "1")
.setSource(jsonBuilder()
.startObject()
.field("user", "kimchy")
.field("postDate", new Date())
.field("message", "trying out Elasticsearch")
.endObject()
)
);
bulkRequest.add(client.prepareIndex("twitter", "tweet", "2")
.setSource(jsonBuilder()
.startObject()
.field("user", "kimchy")
.field("postDate", new Date())
.field("message", "another post")
.endObject()
)
);
BulkResponse bulkResponse = bulkRequest.get();
if (bulkResponse.hasFailures()) {
// process failures by iterating through each bulk response item
}
可以看到这种方式是由client端自己添加数据,然后调用BulkResponse
来完成数据的发送。
bulkResponse = bulkRequest.get();
第二种叫做Bulk Processor:
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
BulkProcessor bulkProcessor = BulkProcessor.builder(
client,
new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId,
BulkRequest request) { ... }
@Override
public void afterBulk(long executionId,
BulkRequest request,
BulkResponse response) { ... }
@Override
public void afterBulk(long executionId,
BulkRequest request,
Throwable failure) { ... }
})
.setBulkActions(10000)
.setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB))
.setFlushInterval(TimeValue.timeValueSeconds(5))
.setConcurrentRequests(1)
.build();
初始化bulk processor之后,客户端只需要往bulkProcessor添加数据即可bulkProcessor.add(new
,你可以先配置好bulk的size、interval等,其他的事情就交给processor自己去做吧。
IndexRequest("twitter", "tweet", "1").source(/* your doc here */));
两种方式各有利弊,第一种要自己控制bulk size和interval,但是有利于对发送失败的处理;而第二种简单易用,用户只管add数据就好,但是对于使用回调函数来处理异常会不那么方便,如何选择就看使用场景的了。
下来才来正式分析client的原理:
client初始化
初始化的逻辑有点类似于之前一遍说的ES启动过程,也是用过Guice来依赖注入的,添加了一些有用的模块:最后一句才是实例化这个client,那我们来看看实例化的逻辑:
super(injector.getInstance(Settings.class), injector.getInstance(ThreadPool.class), injector.getInstance(Headers.class));
this.injector = injector;
nodesService = injector.getInstance(TransportClientNodesService.class);
proxy = injector.getInstance(TransportProxyClient.class);
查看了一下,对于client的一些调用其实最后都追溯到了TransportClientNodesService中,这个service负责客户端和服务端的连接,所以我们来重点看看。
client与server的连接
在TransportClientNodesService中维护了三个node list:
- listedNodes:client初始化时addTransportAddress配置的node;
- nodes:和服务端建立的node;
- filteredNodes:没有建立连接的node;
在client调用addTransportAddress时,就会往listedNodes中添加配置的node,之后会调用nodesSampler.sample();
这里补充一下nodesSampler实例会根据client.transport.sniff
的配置来决定使用哪种方式,分别是SniffNodesSampler和SimpleNodeSampler;
我们先来看看SniffNodesSampler,这意味着client会主动发现集群里的其他节点;在sample中client会去ping listedNodes和nodes中所有节点,默认ping的interval为5s;如果ping的node在nodes list里面,意味着是要真正建立连接的node,则创建fully connct;如果不在则创建light connect。(这里的light是指只创建ping连接,fully则会创建前一篇文章所说的那五种连接)。然后对这些node发送一个获取其state的请求,获取集群所有的dataNodes,对这些nodes经过再次确认后就放入nodes中。
再来看看SimpleNodeSampler咯;同样是ping listedNodes中的所有node,区别在于这里创建的都是light connect。对这些node发送一个TransportLivenessAction的请求,这个请求会返回一个自发现的node info,把这个返回结果中真实的node加入nodes,如果返回时空,仍然会加入nodes,因为可能目标node还没有完成初始化还获取不到信息。
可以看到两者的最大不同之处在于nodes列表里面的node,也就是SimpleNodeSampler让集群中的某些个配置的节点,专门用于接受用户请求。SniffNodesSampler的话,所有节点都会参与负载。
那么发送数据的时候会选择哪一个呢?其实都在execute方法里面:其实关键的就一句话DiscoveryNode
通过Round robin来生成发送的node,以达到负载均衡的效果。
node = nodes.get((index) % nodes.size());
bulkRequest
先说说上面的第一种方式,通过prepareBulk()创建了一个bulkRequest实例,add的时候实际上是添加了一个IndexRequest的实例,添加的这些request都放在了一个list里面。当执行get()的时候,其实最终是落到了doExecute()上,其会用其代理中的execute,这里面:就是调用了我上面说的获得发送node的方法,获取node之后将请求发过去:
transportService.sendRequest(node, action.name(), request, transportOptions, new ActionListenerResponseHandler<Response>(listener) {
@Override
public Response newInstance() {
return action.newResponse();
}
});
Bulk Processor
Bulk Processor的数据发送逻辑和上面的bulkRequest是一样的,最后都落在了doExecute()上,所以这里就不重复了,就说说是怎么确定什么时候发送的。如果配置了bulk interval,那么会启动一个特定的scheduler,这里叫做scheduleWithFixedDelay,即使通过传入的interval来触发,时间到了就发送。另外的bulk size和bulk size是在每次add之后就会做一次判断:满足条件则发送。
总结
整个client大体就是这么些内容,我这边只是以bulk来说明的。对于server端收到数据后会做怎么样的事情,我们下一篇再讲。
好累,睡觉~~~