elasticsearch 口水篇(4)java客户端 - 原生esClient

时间:2022-05-30 03:50:41

上一篇(elasticsearch 口水篇(3)java客户端 - Jest)Jest是第三方客户端,基于REST Api进行调用(httpClient),本篇简单介绍下elasticsearch原生的java客户端。

具体参考:

http://www.elasticsearch.org/guide/en/elasticsearch/client/java-api/current/

elasticsearch 口水篇(4)java客户端 - 原生esClient

下面我们做一个很简单的实例,以下几个功能:

1)批量添加1000个user对象;

2)通过name进行查询;

package com.fox.c1;

import java.io.IOException;

import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory; /**
* @author huangfox
* @date 2014年2月10日 下午3:27:43
*
*/
public class ESClient { private Client client; public void init() {
client = new TransportClient()
.addTransportAddress(new InetSocketTransportAddress(
"localhost", 9300));
} public void close() {
client.close();
} /**
* index
*/
public void createIndex() {
for (int i = 0; i < 1000; i++) {
User user = new User();
user.setId(new Long(i));
user.setName("huang fox " + i);
user.setAge(i % 100);
client.prepareIndex("users", "user").setSource(generateJson(user))
.execute().actionGet();
}
} /**
* 转换成json对象
*
* @param user
* @return
*/
private String generateJson(User user) {
String json = "";
try {
XContentBuilder contentBuilder = XContentFactory.jsonBuilder()
.startObject();
contentBuilder.field("id", user.getId() + "");
contentBuilder.field("name", user.getName());
contentBuilder.field("age", user.getAge() + "");
json = contentBuilder.endObject().string();
} catch (IOException e) {
e.printStackTrace();
}
return json;
} public static void main(String[] args) {
ESClient client = new ESClient();
client.init();
client.createIndex();
client.close();
} }

  

这里有两点需要注意下:

1)NodeClinet 和 TransportClient

Instantiating a node based client is the simplest way to get a Client that can execute operations against elasticsearch.

The TransportClient connects remotely to an elasticsearch cluster using the transport module. It does not join the cluster, but simply gets one or more initial transport addresses and communicates with them in round robin fashion on each action (though most actions will probably be "two hop" operations).

2)generate json document

There are different way of generating JSON document:

  • Manually (aka do it yourself) using native byte[] or as a String
  • Using Map that will be automatically converted to its JSON equivalent
  • Using a third party library to serialize your beans such as Jackson
  • Using built-in helpers XContentFactory.jsonBuilder()

------------------------------------------------

search

public void search() {
SearchResponse response = client.prepareSearch("users")
.setTypes("user")
.setSearchType(SearchType.DFS_QUERY_THEN_FETCH)
.setQuery(QueryBuilders.termQuery("name", "fox")) // Query
.setFilter(FilterBuilders.rangeFilter("age").from(20).to(30)) // Filter
.setFrom(0).setSize(60).setExplain(true).execute().actionGet();
SearchHits hits = response.getHits();
System.out.println(hits.getTotalHits());
for (int i = 0; i < hits.getHits().length; i++) {
System.out.println(hits.getHits()[i].getSourceAsString());
}
}

  

这两篇简单应用了esClient和jest,至于两者的区别目前还没有定论。

后续还有更多功能:

index:create、update、delete

search:query(queryParser)、filter、sort、paging、highlight、facet

multiSearch

cache


keywords

1)elasticsearch java api VS rest api

2)elasticsearch nodeBuilder VS transportClient

补张ppt。

elasticsearch 口水篇(4)java客户端 - 原生esClient


20140630 补充

TransportClient

The transport client allows to create a client that is not part of the cluster, but simply connects to one or more nodes directly by adding their respective addresses using addTransportAddress(org.elasticsearch.common.transport.TransportAddress).

addTransportAddress方法

Adds a transport address that will be used to connect to.The Node this transport address represents will be used if its possible to connect to it. If it is unavailable, it will be automatically connected to once it is up.In order to get the list of all the current connected nodes, please see connectedNodes().

从上文可知,可以为transportClient添加多个transportAddress,添加多个的目的是什么呢?

当一个es服务(对应一个transportAddress)不可用时,client会自动发现当前可用的nodes(the current connected nodes),从以下这段代码可知:

TransportClientNodesService

int index = randomNodeGenerator.incrementAndGet();
if (index < 0) {
index = 0;
randomNodeGenerator.set(0);
}
RetryListener<Response> retryListener = new RetryListener<Response>(callback, listener, nodes, index);
try {
callback.doWithNode(nodes.get((index) % nodes.size()), retryListener);
} catch (ElasticsearchException e) {
if (e.unwrapCause() instanceof ConnectTransportException) {
retryListener.onFailure(e);
} else {
throw e;
}
}

retryListener保证当前可用的nodes列表。

index是一个自增的int(针对同一个client),nodes.get((index) % nodes.size()可以将请求均发到nodes上。注意这里和索引的分片不是一回事。

理论上,Client可以添加ES集群中部分或全部nodes,然后轮询“拿到”一个node,届时client可以和ES集群进行通信,并进行相应的操作。

至于具体的操作——

Index:分片(sharding,分片策略)->选定具体的node(Master)Index ->同步到对应的slave node

Search:从replSet中选定node(负载策略)->请求分发 ->结果集合并

等,后面再分析。