系统环境: vm12 下的centos 7.2
当前安装版本: elasticsearch-2.4.0.tar.gz
Java操作es集群步骤1:配置集群对象信息;2:创建客户端;3:查看集群信息
1:集群名称
默认集群名为elasticsearch,如果集群名称和指定的不一致则在使用节点资源时会报错。
2:嗅探功能
通过client.transport.sniff启动嗅探功能,这样只需要指定集群中的某一个节点(不一定是主节点),然后会加载集群中的其他节点,这样只要程序不停即使此节点宕机仍然可以连接到其他节点。
3:查询类型SearchType.QUERY_THEN_FETCH
es 查询共有4种查询类型
QUERY_AND_FETCH:
主节点将查询请求分发到所有的分片中,各个分片按照自己的查询规则即词频文档频率进行打分排序,然后将结果返回给主节点,主节点对所有数据进行汇总排序然后再返回给客户端,此种方式只需要和es交互一次。
这种查询方式存在数据量和排序问题,主节点会汇总所有分片返回的数据这样数据量会比较大,二是各个分片上的规则可能不一致。
QUERY_THEN_FETCH:
主节点将请求分发给所有分片,各个分片打分排序后将数据的id和分值返回给主节点,主节点收到后进行汇总排序再根据排序后的id到对应的节点读取对应的数据再返回给客户端,此种方式需要和es交互两次。
这种方式解决了数据量问题但是排序问题依然存在而且是es的默认查询方式
DEF_QUERY_AND_FETCH: 和 DFS_QUERY_THEN_FETCH:
将各个分片的规则统一起来进行打分。解决了排序问题但是DFS_QUERY_AND_FETCH仍然存在数据量问题,DFS_QUERY_THEN_FETCH两种噢乖你问题都解决但是效率是最差的。
1, 获取client, 两种方式获取
1
2
3
4
5
6
7
8
|
@Before
public void before() throws Exception {
Map<String, String> map = new HashMap<String, String>();
map.put( "cluster.name" , "elasticsearch_wenbronk" );
Settings.Builder settings = Settings.builder().put(map);
client = TransportClient.builder().settings(settings).build()
.addTransportAddress( new InetSocketTransportAddress(InetAddress.getByName( "www.wenbronk.com" ), Integer.parseInt( "9300" )));
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
@Before
public void before11() throws Exception {
// 创建客户端, 使用的默认集群名, "elasticSearch"
// client = TransportClient.builder().build()
// .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("www.wenbronk.com"), 9300));
// 通过setting对象指定集群配置信息, 配置的集群名
Settings settings = Settings.settingsBuilder().put( "cluster.name" , "elasticsearch_wenbronk" ) // 设置集群名
// .put("client.transport.sniff", true) // 开启嗅探 , 开启后会一直连接不上, 原因未知
// .put("network.host", "192.168.50.37")
.put( "client.transport.ignore_cluster_name" , true ) // 忽略集群名字验证, 打开后集群名字不对也能连接上
// .put("client.transport.nodes_sampler_interval", 5) //报错,
// .put("client.transport.ping_timeout", 5) // 报错, ping等待时间,
.build();
client = TransportClient.builder().settings(settings).build()
.addTransportAddress( new InetSocketTransportAddress( new InetSocketAddress( "192.168.50.37" , 9300 )));
// 默认5s
// 多久打开连接, 默认5s
System.out.println( "success connect" );
}
|
PS: 官网给的2种方式都不能用, 需要合起来才能用, 浪费老子一下午...
其他参数的意义:
代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
|
package com.wenbronk.javaes;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkProcessor.Listener;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.get.MultiGetItemResponse;
import org.elasticsearch.action.get.MultiGetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.script.Script;
import org.junit.Before;
import org.junit.Test;
import com.alibaba.fastjson.JSONObject;
/**
* 使用java API操作elasticSearch
*
* @author 231
*
*/
public class JavaESTest {
private TransportClient client;
private IndexRequest source;
/**
* 获取连接, 第一种方式
* @throws Exception
*/
// @Before
public void before() throws Exception {
Map<String, String> map = new HashMap<String, String>();
map.put( "cluster.name" , "elasticsearch_wenbronk" );
Settings.Builder settings = Settings.builder().put(map);
client = TransportClient.builder().settings(settings).build()
.addTransportAddress( new InetSocketTransportAddress(InetAddress.getByName( "www.wenbronk.com" ), Integer.parseInt( "9300" )));
}
/**
* 查看集群信息
*/
@Test
public void testInfo() {
List<DiscoveryNode> nodes = client.connectedNodes();
for (DiscoveryNode node : nodes) {
System.out.println(node.getHostAddress());
}
}
/**
* 组织json串, 方式1,直接拼接
*/
public String createJson1() {
String json = "{" +
"\"user\":\"kimchy\"," +
"\"postDate\":\"2013-01-30\"," +
"\"message\":\"trying out Elasticsearch\"" +
"}" ;
return json;
}
/**
* 使用map创建json
*/
public Map<String, Object> createJson2() {
Map<String,Object> json = new HashMap<String, Object>();
json.put( "user" , "kimchy" );
json.put( "postDate" , new Date());
json.put( "message" , "trying out elasticsearch" );
return json;
}
/**
* 使用fastjson创建
*/
public JSONObject createJson3() {
JSONObject json = new JSONObject();
json.put( "user" , "kimchy" );
json.put( "postDate" , new Date());
json.put( "message" , "trying out elasticsearch" );
return json;
}
/**
* 使用es的帮助类
*/
public XContentBuilder createJson4() throws Exception {
// 创建json对象, 其中一个创建json的方式
XContentBuilder source = XContentFactory.jsonBuilder()
.startObject()
.field( "user" , "kimchy" )
.field( "postDate" , new Date())
.field( "message" , "trying to out ElasticSearch" )
.endObject();
return source;
}
/**
* 存入索引中
* @throws Exception
*/
@Test
public void test1() throws Exception {
XContentBuilder source = createJson4();
// 存json入索引中
IndexResponse response = client.prepareIndex( "twitter" , "tweet" , "1" ).setSource(source).get();
// // 结果获取
String index = response.getIndex();
String type = response.getType();
String id = response.getId();
long version = response.getVersion();
boolean created = response.isCreated();
System.out.println(index + " : " + type + ": " + id + ": " + version + ": " + created);
}
/**
* get API 获取指定文档信息
*/
@Test
public void testGet() {
// GetResponse response = client.prepareGet("twitter", "tweet", "1")
// .get();
GetResponse response = client.prepareGet( "twitter" , "tweet" , "1" )
.setOperationThreaded( false ) // 线程安全
.get();
System.out.println(response.getSourceAsString());
}
/**
* 测试 delete api
*/
@Test
public void testDelete() {
DeleteResponse response = client.prepareDelete( "twitter" , "tweet" , "1" )
.get();
String index = response.getIndex();
String type = response.getType();
String id = response.getId();
long version = response.getVersion();
System.out.println(index + " : " + type + ": " + id + ": " + version);
}
/**
* 测试更新 update API
* 使用 updateRequest 对象
* @throws Exception
*/
@Test
public void testUpdate() throws Exception {
UpdateRequest updateRequest = new UpdateRequest();
updateRequest.index( "twitter" );
updateRequest.type( "tweet" );
updateRequest.id( "1" );
updateRequest.doc(XContentFactory.jsonBuilder()
.startObject()
// 对没有的字段添加, 对已有的字段替换
.field( "gender" , "male" )
.field( "message" , "hello" )
.endObject());
UpdateResponse response = client.update(updateRequest).get();
// 打印
String index = response.getIndex();
String type = response.getType();
String id = response.getId();
long version = response.getVersion();
System.out.println(index + " : " + type + ": " + id + ": " + version);
}
/**
* 测试update api, 使用client
* @throws Exception
*/
@Test
public void testUpdate2() throws Exception {
// 使用Script对象进行更新
// UpdateResponse response = client.prepareUpdate("twitter", "tweet", "1")
// .setScript(new Script("hits._source.gender = \"male\""))
// .get();
// 使用XContFactory.jsonBuilder() 进行更新
// UpdateResponse response = client.prepareUpdate("twitter", "tweet", "1")
// .setDoc(XContentFactory.jsonBuilder()
// .startObject()
// .field("gender", "malelelele")
// .endObject()).get();
// 使用updateRequest对象及script
// UpdateRequest updateRequest = new UpdateRequest("twitter", "tweet", "1")
// .script(new Script("ctx._source.gender=\"male\""));
// UpdateResponse response = client.update(updateRequest).get();
// 使用updateRequest对象及documents进行更新
UpdateResponse response = client.update( new UpdateRequest( "twitter" , "tweet" , "1" )
.doc(XContentFactory.jsonBuilder()
.startObject()
.field( "gender" , "male" )
.endObject()
)).get();
System.out.println(response.getIndex());
}
/**
* 测试update
* 使用updateRequest
* @throws Exception
* @throws InterruptedException
*/
@Test
public void testUpdate3() throws InterruptedException, Exception {
UpdateRequest updateRequest = new UpdateRequest( "twitter" , "tweet" , "1" )
.script( new Script( "ctx._source.gender=\"male\"" ));
UpdateResponse response = client.update(updateRequest).get();
}
/**
* 测试upsert方法
* @throws Exception
*
*/
@Test
public void testUpsert() throws Exception {
// 设置查询条件, 查找不到则添加生效
IndexRequest indexRequest = new IndexRequest( "twitter" , "tweet" , "2" )
.source(XContentFactory.jsonBuilder()
.startObject()
.field( "name" , "214" )
.field( "gender" , "gfrerq" )
.endObject());
// 设置更新, 查找到更新下面的设置
UpdateRequest upsert = new UpdateRequest( "twitter" , "tweet" , "2" )
.doc(XContentFactory.jsonBuilder()
.startObject()
.field( "user" , "wenbronk" )
.endObject())
.upsert(indexRequest);
client.update(upsert).get();
}
/**
* 测试multi get api
* 从不同的index, type, 和id中获取
*/
@Test
public void testMultiGet() {
MultiGetResponse multiGetResponse = client.prepareMultiGet()
.add( "twitter" , "tweet" , "1" )
.add( "twitter" , "tweet" , "2" , "3" , "4" )
.add( "anothoer" , "type" , "foo" )
.get();
for (MultiGetItemResponse itemResponse : multiGetResponse) {
GetResponse response = itemResponse.getResponse();
if (response.isExists()) {
String sourceAsString = response.getSourceAsString();
System.out.println(sourceAsString);
}
}
}
/**
* bulk 批量执行
* 一次查询可以update 或 delete多个document
*/
@Test
public void testBulk() throws Exception {
BulkRequestBuilder bulkRequest = client.prepareBulk();
bulkRequest.add(client.prepareIndex( "twitter" , "tweet" , "1" )
.setSource(XContentFactory.jsonBuilder()
.startObject()
.field( "user" , "kimchy" )
.field( "postDate" , new Date())
.field( "message" , "trying out Elasticsearch" )
.endObject()));
bulkRequest.add(client.prepareIndex( "twitter" , "tweet" , "2" )
.setSource(XContentFactory.jsonBuilder()
.startObject()
.field( "user" , "kimchy" )
.field( "postDate" , new Date())
.field( "message" , "another post" )
.endObject()));
BulkResponse response = bulkRequest.get();
System.out.println(response.getHeaders());
}
/**
* 使用bulk processor
* @throws Exception
*/
@Test
public void testBulkProcessor() throws Exception {
// 创建BulkPorcessor对象
BulkProcessor bulkProcessor = BulkProcessor.builder(client, new Listener() {
public void beforeBulk( long paramLong, BulkRequest paramBulkRequest) {
// TODO Auto-generated method stub
}
// 执行出错时执行
public void afterBulk( long paramLong, BulkRequest paramBulkRequest, Throwable paramThrowable) {
// TODO Auto-generated method stub
}
public void afterBulk( long paramLong, BulkRequest paramBulkRequest, BulkResponse paramBulkResponse) {
// TODO Auto-generated method stub
}
})
// 1w次请求执行一次bulk
.setBulkActions( 10000 )
// 1gb的数据刷新一次bulk
.setBulkSize( new ByteSizeValue( 1 , ByteSizeUnit.GB))
// 固定5s必须刷新一次
.setFlushInterval(TimeValue.timeValueSeconds( 5 ))
// 并发请求数量, 0不并发, 1并发允许执行
.setConcurrentRequests( 1 )
// 设置退避, 100ms后执行, 最大请求3次
.setBackoffPolicy(
BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis( 100 ), 3 ))
.build();
// 添加单次请求
bulkProcessor.add( new IndexRequest( "twitter" , "tweet" , "1" ));
bulkProcessor.add( new DeleteRequest( "twitter" , "tweet" , "2" ));
// 关闭
bulkProcessor.awaitClose( 10 , TimeUnit.MINUTES);
// 或者
bulkProcessor.close();
}
}
|
tes2代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
|
package com.wenbronk.javaes;
import java.net.InetSocketAddress;
import org.apache.lucene.queryparser.xml.FilterBuilderFactory;
import org.elasticsearch.action.search.MultiSearchResponse;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.Settings.Builder;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.search.sort.SortParseElement;
import org.junit.Before;
import org.junit.Test;
/**
* 使用java API操作elasticSearch
* search API
* @author 231
*
*/
public class JavaESTest2 {
private TransportClient client;
/**
* 获取client对象
*/
@Before
public void testBefore() {
Builder builder = Settings.settingsBuilder();
builder.put( "cluster.name" , "wenbronk_escluster" );
// .put("client.transport.ignore_cluster_name", true);
Settings settings = builder.build();
org.elasticsearch.client.transport.TransportClient.Builder transportBuild = TransportClient.builder();
TransportClient client1 = transportBuild.settings(settings).build();
client = client1.addTransportAddress(( new InetSocketTransportAddress( new InetSocketAddress( "192.168.50.37" , 9300 ))));
System.out.println( "success connect to escluster" );
}
/**
* 测试查询
*/
@Test
public void testSearch() {
// SearchRequestBuilder searchRequestBuilder = client.prepareSearch("twitter", "tweet", "1");
// SearchResponse response = searchRequestBuilder.setTypes("type1", "type2")
// .setSearchType(SearchType.DFS_QUERY_THEN_FETCH)
// .setQuery(QueryBuilders.termQuery("user", "test"))
// .setPostFilter(QueryBuilders.rangeQuery("age").from(0).to(1))
// .setFrom(0).setSize(2).setExplain(true)
// .execute().actionGet();
SearchResponse response = client.prepareSearch()
.execute().actionGet();
// SearchHits hits = response.getHits();
// for (SearchHit searchHit : hits) {
// for(Iterator<SearchHitField> iterator = searchHit.iterator(); iterator.hasNext(); ) {
// SearchHitField next = iterator.next();
// System.out.println(next.getValues());
// }
// }
System.out.println(response);
}
/**
* 测试scroll api
* 对大量数据的处理更有效
*/
@Test
public void testScrolls() {
QueryBuilder queryBuilder = QueryBuilders.termQuery( "twitter" , "tweet" );
SearchResponse response = client.prepareSearch( "twitter" )
.addSort(SortParseElement.DOC_FIELD_NAME, SortOrder.ASC)
.setScroll( new TimeValue( 60000 ))
.setQuery(queryBuilder)
.setSize( 100 ).execute().actionGet();
while ( true ) {
for (SearchHit hit : response.getHits().getHits()) {
System.out.println( "i am coming" );
}
SearchResponse response2 = client.prepareSearchScroll(response.getScrollId())
.setScroll( new TimeValue( 60000 )).execute().actionGet();
if (response2.getHits().getHits().length == 0 ) {
System.out.println( "oh no=====" );
break ;
}
}
}
/**
* 测试multiSearch
*/
@Test
public void testMultiSearch() {
QueryBuilder qb1 = QueryBuilders.queryStringQuery( "elasticsearch" );
SearchRequestBuilder requestBuilder1 = client.prepareSearch().setQuery(qb1).setSize( 1 );
QueryBuilder qb2 = QueryBuilders.matchQuery( "user" , "kimchy" );
SearchRequestBuilder requestBuilder2 = client.prepareSearch().setQuery(qb2).setSize( 1 );
MultiSearchResponse multiResponse = client.prepareMultiSearch().add(requestBuilder1).add(requestBuilder2)
.execute().actionGet();
long nbHits = 0 ;
for (MultiSearchResponse.Item item : multiResponse.getResponses()) {
SearchResponse response = item.getResponse();
nbHits = response.getHits().getTotalHits();
SearchHit[] hits = response.getHits().getHits();
System.out.println(nbHits);
}
}
/**
* 测试聚合查询
*/
@Test
public void testAggregation() {
SearchResponse response = client.prepareSearch()
.setQuery(QueryBuilders.matchAllQuery()) // 先使用query过滤掉一部分
.addAggregation(AggregationBuilders.terms( "term" ).field( "user" ))
.addAggregation(AggregationBuilders.dateHistogram( "agg2" ).field( "birth" )
.interval(DateHistogramInterval.YEAR))
.execute().actionGet();
Aggregation aggregation2 = response.getAggregations().get( "term" );
Aggregation aggregation = response.getAggregations().get( "agg2" );
// SearchResponse response2 = client.search(new SearchRequest().searchType(SearchType.QUERY_AND_FETCH)).actionGet();
}
/**
* 测试terminate
*/
@Test
public void testTerminateAfter() {
SearchResponse response = client.prepareSearch( "twitter" ).setTerminateAfter( 1000 ).get();
if (response.isTerminatedEarly()) {
System.out.println( "ternimate" );
}
}
/**
* 过滤查询: 大于gt, 小于lt, 小于等于lte, 大于等于gte
*/
@Test
public void testFilter() {
SearchResponse response = client.prepareSearch( "twitter" )
.setTypes( "" )
.setQuery(QueryBuilders.matchAllQuery()) //查询所有
.setSearchType(SearchType.QUERY_THEN_FETCH)
// .setPostFilter(FilterBuilders.rangeFilter("age").from(0).to(19)
// .includeLower(true).includeUpper(true))
// .setPostFilter(FilterBuilderFactory .rangeFilter("age").gte(18).lte(22))
.setExplain( true ) //explain为true表示根据数据相关度排序,和关键字匹配最高的排在前面
.get();
}
/**
* 分组查询
*/
@Test
public void testGroupBy() {
client.prepareSearch( "twitter" ).setTypes( "tweet" )
.setQuery(QueryBuilders.matchAllQuery())
.setSearchType(SearchType.QUERY_THEN_FETCH)
.addAggregation(AggregationBuilders.terms( "user" )
.field( "user" ).size( 0 ) // 根据user进行分组
// size(0) 也是10
).get();
}
}
|
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持服务器之家。
原文链接:https://www.cnblogs.com/wenbronk/p/6386043.html