elasticsearch源码分析——客户端

时间:2023-01-11 08:32:52

直观一点,现在源码已经可以进行调试,那就先熟悉一下,elasticsearch的操作逻辑。最基础的就是url,elasticsearch本身也做了request和response的封装,有一定了解的话,用起来还是比较方便的。

URL

1、创建index[当然了,你要是直接导入数据,elasticsearch也是会自动为你创建对应的mappings结构]
indexName:test 对应的可以理解成一个数据库吧
type:document 一张表
properties:表的元数据
curl -X PUT "localhost:9200/test" -H 'Content-Type: application/json' -d'
{
    "settings" : {
        "number_of_shards" : 1
    },
    "mappings" : {
        "document" : {
            "properties" : {
                "user" : { "type" : "text" },
                "post_date" : { "type" : "text" },
                "message" : { "type" : "text" }
            }
        }
    }
}'
2、导入数据
如果没有指定_id,那就要定义为post请求
curl -X POST "localhost:9200/test/document" -H 'Content-Type: application/json' -d'
{
    "user" : "kimchy",
    "post_date" : "2009-11-15T14:12:12",
    "message" : "trying out Elasticsearch"
}'
如果定义了_id那就是put请求,_id为1
curl -X PUT "localhost:9200/test/document/1" -H 'Content-Type: application/json' -d'
{
    "user" : "kimchy",
    "post_date" : "2009-11-15T14:12:12",
    "message" : "trying out Elasticsearch"
}'
3、删除index
curl -X DELETE "localhost:9200/twitter"
4、查询的语法太多了,慢慢了解吧。
curl -X POST "localhost:9200/_search" -H 'Content-Type: application/json' -d'
{
    "query" : {
        "match_all" : {}
    }
}'

参考 Elasticsearch 5.x 源码分析(2)TransportClient和RestClient

RestClient 9200

在后续的版本中提供了RestHighLevelClient,做的就是对json数据进行了包装。request有很多builder包装。response包装成了对应的response。但是并不是包装了所有的请求。
maven pom文件

    <dependencies>
        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>elasticsearch-rest-high-level-client</artifactId>
            <version>6.2.3</version>
        </dependency>
    </dependencies>

LowLevelClient

简单的初始化
static RestClientBuilder restClientBuilder = RestClient.builder(
            new HttpHost("127.0.0.1", 9200, "http"));
static RestClient client = restClientBuilder.build();

参考一下,可以配置线程池和socket连接的参数
builder.setRequestConfigCallback(new RestClientBuilder.RequestConfigCallback() {  
            @Override  
            public RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder requestConfigBuilder) {  
                requestConfigBuilder.setConnectTimeout(10000);   // 连接到server的超时时间
                requestConfigBuilder.setSocketTimeout(30000);   // 获取到返回结果的超时时间
                requestConfigBuilder.setConnectionRequestTimeout(10000);  // 从线程池获取connection的超时时间
                return requestConfigBuilder;  
            }  
        }); 
// 线程池配置
builder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
            @Override
            public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
                return httpClientBuilder.setDefaultIOReactorConfig(
                        IOReactorConfig.custom()
                        .setIoThreadCount(100)
                    .setConnectTimeout(10000)
                    .setSoTimeout(10000)
                    .build());
            }
        });

RestHighLevelClient

high level就是在原有的client上做了一层包装
static RestClientBuilder restClientBuilder = RestClient.builder(
            new HttpHost("127.0.0.1", 9200, "http"));
static RestClient client = restClientBuilder.build();
static RestHighLevelClient highLevelClient = new RestHighLevelClient(restClientBuilder);

查询逻辑
SearchRequest searchRequest = new SearchRequest();
searchRequest.indices("test"); // indexName
searchRequest.types("type1");  // type
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.query(QueryBuilders.matchAllQuery()); // 查询请求,也就是entity的构造,做了包装
searchRequest.source(sourceBuilder);
SearchResponse searchResponse = highLevelClient.search(searchRequest); // 不用手动构造json字符串

如果想了解的话,简单看一下highLevelClient是怎么做封装的。

/** * Request::search java8特性 作用就是将SearchRequest解析成对应的HttpRequest,就是以下几个参数HttpPost.METHOD_NAME, endpoint, params, entity * * SearchResponse::fromXContent 将返回的json解析成SearchResponse对象 */
    public final SearchResponse search(SearchRequest searchRequest, Header... headers) throws IOException {
        return performRequestAndParseEntity(searchRequest, Request::search, SearchResponse::fromXContent, emptySet(), headers);
    }

这里很多运用java8的特性,函数式变成,类似于C++将函数指针传递。CheckedFunction,使用注解构造的函数接口。

// performRequest方法就是原始的RestClient的方法,直接就是http请求。
protected final <Req extends ActionRequest, Resp> Resp performRequestAndParseEntity(Req request,
                                                                            CheckedFunction<Req, Request, IOException> requestConverter,
                                                                            CheckedFunction<XContentParser, Resp, IOException> entityParser,
                                                                            Set<Integer> ignores, Header... headers) throws IOException {
        return performRequest(request, requestConverter, (response) -> parseEntity(response.getEntity(), entityParser), ignores, headers);
}

ps:看一下CheckedFunction结构 使用泛型,定义可参数和返回值的类型
@FunctionalInterface
public interface CheckedFunction<T, R, E extends Exception> {
    R apply(T t) throws E;
}

再看一下parseEntity的逻辑

protected final <Resp> Resp parseEntity(final HttpEntity entity,
                                      final CheckedFunction<XContentParser, Resp, IOException> entityParser) throws IOException {
        if (entity == null) {
            throw new IllegalStateException("Response body expected but not returned");
        }
        if (entity.getContentType() == null) {
            throw new IllegalStateException("Elasticsearch didn't return the [Content-Type] header, unable to parse response body");
        }
        XContentType xContentType = XContentType.fromMediaTypeOrFormat(entity.getContentType().getValue());
        if (xContentType == null) {
            throw new IllegalStateException("Unsupported Content-Type: " + entity.getContentType().getValue());
        }
        // 构造一个parser,其实这里用的就是JsonParser,registry在前面有构造,使用的都是默认的列表。也就是对应的各种Builder
        try (XContentParser parser = xContentType.xContent().createParser(registry, entity.getContent())) {
            return entityParser.apply(parser);  // 方法的具体执行,就是前面看到的SearchResponse::fromXContent
        }
    }

之前看java8比较少,所以最初看到这样的函数式变成,停费脑子的,总是跳来跳去,逻辑不是一条线。

TcpClient 9300

这个的话就不像RestClient那么简单了,直接用tcp传递序列化的数据。比如IndexRequest中就会对应的看到readFrom和writeTo这样的方法。

参照官方文档: https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/transport-client.html

初始化

TransportClient client = new PreBuiltTransportClient(Settings.EMPTY)
        .addTransportAddress(new TransportAddress(InetAddress.getByName("host1"), 9300))
        .addTransportAddress(new TransportAddress(InetAddress.getByName("host2"), 9300));

// on shutdown

client.close();
// 比起RestClient,TransportClient有一个功能,就是嗅探功能,默认是关闭的。后台回去维护集群列表。

Settings settings = Settings.builder()
        .put("client.transport.sniff", true).build();
TransportClient client = new PreBuiltTransportClient(settings);

创建index

引用一篇博客的内容 JAVA客户端之TransportClient操作详解

设置类型,参数其实都可以对准标准的url,这里都是做了一定的封装。
CreateIndexRequestBuilder  cib=client.admin().indices().prepareCreate(article); 
            XContentBuilder mapping = XContentFactory.jsonBuilder()  
                    .startObject()  
                        .startObject("properties") //设置之定义字段  
                          .startObject("author")  
                            .field("type","string") //设置数据类型  
                          .endObject()  
                          .startObject("title")  
                             .field("type","string")  
                          .endObject()  
                          .startObject("content")  
                             .field("type","string")  
                          .endObject()  
                          .startObject("price")  
                             .field("type","string")  
                          .endObject()  
                          .startObject("view")  
                             .field("type","string")  
                          .endObject()  
                          .startObject("tag")  
                             .field("type","string")  
                          .endObject()  
                          .startObject("date")  
                             .field("type","date")  //设置Date类型  
                             .field("format","yyyy-MM-dd HH:mm:ss") //设置Date的格式  
                          .endObject()  
                      .endObject()  
                    .endObject(); 
            cib.addMapping(content, mapping); 

            CreateIndexResponse res=cib.execute().actionGet(); 

注: 官方的API文档上能看到具体的使用方式,这里说一说之前遇到的胃
1、很多时候集群上层会包装一个LB,LVS之类的做负载均衡,客户端其实已经有一个简单的轮询是的负载均衡。需要注意的就是,挂接LB时u,一定要保持长连接。之前就遇到这样的问题,没有保存连接session,下一次请求过来之后,LB是将session断开了,此时就一直报Node Not Avalibale的问题。应该是因为session断开了,此时连接到其它节点就会因为没有建立连接,报错。
2、如果有自定义的setting配置,要记得。elasticsearch在解析settings的时候,会讲json数据转换为嵌套的map这样,所以如果插件或者直接改源码,没有一一注册对应的settings的时候,就会报错。