直观一点,现在源码已经可以进行调试,那就先熟悉一下,elasticsearch的操作逻辑。最基础的就是url,elasticsearch本身也做了request和response的封装,有一定了解的话,用起来还是比较方便的。
URL
1、创建index[当然了,你要是直接导入数据,elasticsearch也是会自动为你创建对应的mappings结构]
indexName:test 对应的可以理解成一个数据库吧
type:document 一张表
properties:表的元数据
curl -X PUT "localhost:9200/test" -H 'Content-Type: application/json' -d'
{
"settings" : {
"number_of_shards" : 1
},
"mappings" : {
"document" : {
"properties" : {
"user" : { "type" : "text" },
"post_date" : { "type" : "text" },
"message" : { "type" : "text" }
}
}
}
}'
2、导入数据
如果没有指定_id,那就要定义为post请求
curl -X POST "localhost:9200/test/document" -H 'Content-Type: application/json' -d'
{
"user" : "kimchy",
"post_date" : "2009-11-15T14:12:12",
"message" : "trying out Elasticsearch"
}'
如果定义了_id那就是put请求,_id为1
curl -X PUT "localhost:9200/test/document/1" -H 'Content-Type: application/json' -d'
{
"user" : "kimchy",
"post_date" : "2009-11-15T14:12:12",
"message" : "trying out Elasticsearch"
}'
3、删除index
curl -X DELETE "localhost:9200/twitter"
4、查询的语法太多了,慢慢了解吧。
curl -X POST "localhost:9200/_search" -H 'Content-Type: application/json' -d'
{
"query" : {
"match_all" : {}
}
}'
参考 Elasticsearch 5.x 源码分析(2)TransportClient和RestClient
RestClient 9200
在后续的版本中提供了RestHighLevelClient,做的就是对json数据进行了包装。request有很多builder包装。response包装成了对应的response。但是并不是包装了所有的请求。
maven pom文件
<dependencies>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>6.2.3</version>
</dependency>
</dependencies>
LowLevelClient
简单的初始化
static RestClientBuilder restClientBuilder = RestClient.builder(
new HttpHost("127.0.0.1", 9200, "http"));
static RestClient client = restClientBuilder.build();
参考一下,可以配置线程池和socket连接的参数
builder.setRequestConfigCallback(new RestClientBuilder.RequestConfigCallback() {
@Override
public RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder requestConfigBuilder) {
requestConfigBuilder.setConnectTimeout(10000); // 连接到server的超时时间
requestConfigBuilder.setSocketTimeout(30000); // 获取到返回结果的超时时间
requestConfigBuilder.setConnectionRequestTimeout(10000); // 从线程池获取connection的超时时间
return requestConfigBuilder;
}
});
// 线程池配置
builder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
@Override
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
return httpClientBuilder.setDefaultIOReactorConfig(
IOReactorConfig.custom()
.setIoThreadCount(100)
.setConnectTimeout(10000)
.setSoTimeout(10000)
.build());
}
});
RestHighLevelClient
high level就是在原有的client上做了一层包装
static RestClientBuilder restClientBuilder = RestClient.builder(
new HttpHost("127.0.0.1", 9200, "http"));
static RestClient client = restClientBuilder.build();
static RestHighLevelClient highLevelClient = new RestHighLevelClient(restClientBuilder);
查询逻辑
SearchRequest searchRequest = new SearchRequest();
searchRequest.indices("test"); // indexName
searchRequest.types("type1"); // type
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.query(QueryBuilders.matchAllQuery()); // 查询请求,也就是entity的构造,做了包装
searchRequest.source(sourceBuilder);
SearchResponse searchResponse = highLevelClient.search(searchRequest); // 不用手动构造json字符串
如果想了解的话,简单看一下highLevelClient是怎么做封装的。
/** * Request::search java8特性 作用就是将SearchRequest解析成对应的HttpRequest,就是以下几个参数HttpPost.METHOD_NAME, endpoint, params, entity * * SearchResponse::fromXContent 将返回的json解析成SearchResponse对象 */
public final SearchResponse search(SearchRequest searchRequest, Header... headers) throws IOException {
return performRequestAndParseEntity(searchRequest, Request::search, SearchResponse::fromXContent, emptySet(), headers);
}
这里很多运用java8的特性,函数式变成,类似于C++将函数指针传递。CheckedFunction,使用注解构造的函数接口。
// performRequest方法就是原始的RestClient的方法,直接就是http请求。
protected final <Req extends ActionRequest, Resp> Resp performRequestAndParseEntity(Req request,
CheckedFunction<Req, Request, IOException> requestConverter,
CheckedFunction<XContentParser, Resp, IOException> entityParser,
Set<Integer> ignores, Header... headers) throws IOException {
return performRequest(request, requestConverter, (response) -> parseEntity(response.getEntity(), entityParser), ignores, headers);
}
ps:看一下CheckedFunction结构 使用泛型,定义可参数和返回值的类型
@FunctionalInterface
public interface CheckedFunction<T, R, E extends Exception> {
R apply(T t) throws E;
}
再看一下parseEntity的逻辑
protected final <Resp> Resp parseEntity(final HttpEntity entity,
final CheckedFunction<XContentParser, Resp, IOException> entityParser) throws IOException {
if (entity == null) {
throw new IllegalStateException("Response body expected but not returned");
}
if (entity.getContentType() == null) {
throw new IllegalStateException("Elasticsearch didn't return the [Content-Type] header, unable to parse response body");
}
XContentType xContentType = XContentType.fromMediaTypeOrFormat(entity.getContentType().getValue());
if (xContentType == null) {
throw new IllegalStateException("Unsupported Content-Type: " + entity.getContentType().getValue());
}
// 构造一个parser,其实这里用的就是JsonParser,registry在前面有构造,使用的都是默认的列表。也就是对应的各种Builder
try (XContentParser parser = xContentType.xContent().createParser(registry, entity.getContent())) {
return entityParser.apply(parser); // 方法的具体执行,就是前面看到的SearchResponse::fromXContent
}
}
之前看java8比较少,所以最初看到这样的函数式变成,停费脑子的,总是跳来跳去,逻辑不是一条线。
TcpClient 9300
这个的话就不像RestClient那么简单了,直接用tcp传递序列化的数据。比如IndexRequest中就会对应的看到readFrom和writeTo这样的方法。
参照官方文档: https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/transport-client.html
初始化
TransportClient client = new PreBuiltTransportClient(Settings.EMPTY)
.addTransportAddress(new TransportAddress(InetAddress.getByName("host1"), 9300))
.addTransportAddress(new TransportAddress(InetAddress.getByName("host2"), 9300));
// on shutdown
client.close();
// 比起RestClient,TransportClient有一个功能,就是嗅探功能,默认是关闭的。后台回去维护集群列表。
Settings settings = Settings.builder()
.put("client.transport.sniff", true).build();
TransportClient client = new PreBuiltTransportClient(settings);
创建index
引用一篇博客的内容 JAVA客户端之TransportClient操作详解
设置类型,参数其实都可以对准标准的url,这里都是做了一定的封装。
CreateIndexRequestBuilder cib=client.admin().indices().prepareCreate(article);
XContentBuilder mapping = XContentFactory.jsonBuilder()
.startObject()
.startObject("properties") //设置之定义字段
.startObject("author")
.field("type","string") //设置数据类型
.endObject()
.startObject("title")
.field("type","string")
.endObject()
.startObject("content")
.field("type","string")
.endObject()
.startObject("price")
.field("type","string")
.endObject()
.startObject("view")
.field("type","string")
.endObject()
.startObject("tag")
.field("type","string")
.endObject()
.startObject("date")
.field("type","date") //设置Date类型
.field("format","yyyy-MM-dd HH:mm:ss") //设置Date的格式
.endObject()
.endObject()
.endObject();
cib.addMapping(content, mapping);
CreateIndexResponse res=cib.execute().actionGet();
注: 官方的API文档上能看到具体的使用方式,这里说一说之前遇到的胃
1、很多时候集群上层会包装一个LB,LVS之类的做负载均衡,客户端其实已经有一个简单的轮询是的负载均衡。需要注意的就是,挂接LB时u,一定要保持长连接。之前就遇到这样的问题,没有保存连接session,下一次请求过来之后,LB是将session断开了,此时就一直报Node Not Avalibale的问题。应该是因为session断开了,此时连接到其它节点就会因为没有建立连接,报错。
2、如果有自定义的setting配置,要记得。elasticsearch在解析settings的时候,会讲json数据转换为嵌套的map这样,所以如果插件或者直接改源码,没有一一注册对应的settings的时候,就会报错。