Elasticsearch笔记五之java操作es

时间:2024-05-27 21:03:02

Java操作es集群步骤1:配置集群对象信息;2:创建客户端;3:查看集群信息

1:集群名称

默认集群名为elasticsearch,如果集群名称和指定的不一致则在使用节点资源时会报错。

2:嗅探功能

通过client.transport.sniff启动嗅探功能,这样只需要指定集群中的某一个节点(不一定是主节点),然后会加载集群中的其他节点,这样只要程序不停即使此节点宕机仍然可以连接到其他节点。

3:查询类型SearchType.QUERY_THEN_FETCH

Es中一共有四种查询类型。

QUERY_AND_FETCH:

主节点将查询请求分发到所有的分片中,各个分片按照自己的查询规则即词频文档频率进行打分排序,然后将结果返回给主节点,主节点对所有数据进行汇总排序然后再返回给客户端,此种方式只需要和es交互一次。

这种查询方式存在数据量和排序问题,主节点会汇总所有分片返回的数据这样数据量会比较大,二是各个分片上的规则可能不一致。

QUERY_THEN_FETCH:

主节点将请求分发给所有分片,各个分片打分排序后将数据的id和分值返回给主节点,主节点收到后进行汇总排序再根据排序后的id到对应的节点读取对应的数据再返回给客户端,此种方式需要和es交互两次。

这种方式解决了数据量问题但是排序问题依然存在而且是es的默认查询方式。

DFS_QUERY_AND_FETCH和DFS_QUERY_THEN_FETCH:

这两种方式和前面两种的区别在于将各个分片的规则统一起来进行打分。解决了排序问题但是DFS_QUERY_AND_FETCH仍然存在数据量问题,DFS_QUERY_THEN_FETCH两种噢乖你问题都解决但是效率是最差的。

特点:

一个交互两次,一个交互一次;一个统一打分规则一个不统一;一个分片返回详细数据一个分片返回id。

4:分页压力

我们通过curl和java查询时都可以指定分页,但是页数越往后服务器的压力会越大。大多数搜索引擎都不会提供非常大的页数搜索,原因有两个一是用户习惯一般不会看页数大的搜索结果因为越往后越不准确,二是服务器压力。

比如分片是5分页单位是10查询第10000到10010条记录,es需要在所有分片上进行查询,每个分片会产生10010条排序后的数据然后返回给主节点,主节点接收5个分片的数据一共是50050条然后再进行汇总最后再取其中的10000到10010条数据返回给客户端,这样一来看似只请求了10条数据但实际上es要汇总5万多条数据,所以页码越大服务器的压力就越大。

5:超时timeout

查询时如果数据量很大,可以指定超时时间即到达此时间后无论查询的结果是什么都会返回并且关闭连接,这样用户体验较好缺点是查询出的数据可能不完整,Java和curl都可以指定超时时间。

6:maven依赖

[java] view plain copy
  1. <dependency>
  2. <groupId>org.elasticsearch</groupId>
  3. <artifactId>elasticsearch</artifactId>
  4. <version>1.4.4</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>com.fasterxml.jackson.core</groupId>
  8. <artifactId>jackson-databind</artifactId>
  9. <version>2.1.3</version>
  10. </dependency>

以下是java代码

[java] view plain copy
  1. package elasticsearch;
  2. import java.io.IOException;
  3. import java.util.HashMap;
  4. import java.util.List;
  5. import java.util.Map;
  6. import java.util.concurrent.ExecutionException;
  7. import online.elasticsearch.bean.Student;
  8. import org.elasticsearch.ElasticsearchException;
  9. import org.elasticsearch.action.bulk.BulkItemResponse;
  10. import org.elasticsearch.action.bulk.BulkRequestBuilder;
  11. import org.elasticsearch.action.bulk.BulkResponse;
  12. import org.elasticsearch.action.delete.DeleteRequest;
  13. import org.elasticsearch.action.delete.DeleteResponse;
  14. import org.elasticsearch.action.get.GetResponse;
  15. import org.elasticsearch.action.index.IndexRequest;
  16. import org.elasticsearch.action.index.IndexResponse;
  17. import org.elasticsearch.action.search.SearchResponse;
  18. import org.elasticsearch.action.search.SearchType;
  19. import org.elasticsearch.action.update.UpdateRequest;
  20. import org.elasticsearch.action.update.UpdateResponse;
  21. import org.elasticsearch.client.transport.TransportClient;
  22. import org.elasticsearch.cluster.node.DiscoveryNode;
  23. import org.elasticsearch.common.collect.ImmutableList;
  24. import org.elasticsearch.common.settings.ImmutableSettings;
  25. import org.elasticsearch.common.settings.Settings;
  26. import org.elasticsearch.common.text.Text;
  27. import org.elasticsearch.common.transport.InetSocketTransportAddress;
  28. import org.elasticsearch.common.transport.TransportAddress;
  29. import org.elasticsearch.common.xcontent.XContentBuilder;
  30. import org.elasticsearch.common.xcontent.XContentFactory;
  31. import org.elasticsearch.index.query.FilterBuilders;
  32. import org.elasticsearch.index.query.MatchQueryBuilder.Operator;
  33. import org.elasticsearch.index.query.QueryBuilders;
  34. import org.elasticsearch.search.SearchHit;
  35. import org.elasticsearch.search.SearchHits;
  36. import org.elasticsearch.search.aggregations.Aggregation;
  37. import org.elasticsearch.search.aggregations.AggregationBuilders;
  38. import org.elasticsearch.search.aggregations.Aggregations;
  39. import org.elasticsearch.search.aggregations.bucket.terms.Terms;
  40. import org.elasticsearch.search.aggregations.bucket.terms.Terms.Bucket;
  41. import org.elasticsearch.search.aggregations.metrics.sum.Sum;
  42. import org.elasticsearch.search.highlight.HighlightField;
  43. import org.elasticsearch.search.sort.SortOrder;
  44. import org.junit.Before;
  45. import org.junit.Test;
  46. import com.fasterxml.jackson.core.JsonProcessingException;
  47. import com.fasterxml.jackson.databind.ObjectMapper;
  48. public class elastaicTest {
  49. TransportClient transportClient;
  50. //索引库名
  51. String index = "shb01";
  52. //类型名称
  53. String type = "stu";
  54. @Before
  55. public void before()
  56. {
  57. /**
  58. * 1:通过 setting对象来指定集群配置信息
  59. */
  60. Settings setting = ImmutableSettings.settingsBuilder()
  61. .put("cluster.name", "shb01")//指定集群名称
  62. .put("client.transport.sniff", true)//启动嗅探功能
  63. .build();
  64. /**
  65. * 2:创建客户端
  66. * 通过setting来创建,若不指定则默认链接的集群名为elasticsearch
  67. * 链接使用tcp协议即9300
  68. */
  69. transportClient = new TransportClient(setting);
  70. TransportAddress transportAddress = new InetSocketTransportAddress("192.168.79.131", 9300);
  71. transportClient.addTransportAddresses(transportAddress);
  72. /**
  73. * 3:查看集群信息
  74. * 注意我的集群结构是:
  75. *   131的elasticsearch.yml中指定为主节点不能存储数据,
  76. *   128的elasticsearch.yml中指定不为主节点只能存储数据。
  77. * 所有控制台只打印了192.168.79.128,只能获取数据节点
  78. *
  79. */
  80. ImmutableList<DiscoveryNode> connectedNodes = transportClient.connectedNodes();
  81. for(DiscoveryNode node : connectedNodes)
  82. {
  83. System.out.println(node.getHostAddress());
  84. }
  85. }
  86. /**
  87. * 通过prepareGet方法获取指定文档信息
  88. */
  89. @Test
  90. public void testGet() {
  91. GetResponse getResponse = transportClient.prepareGet(index, type, "1").get();
  92. System.out.println(getResponse.getSourceAsString());
  93. }
  94. /**
  95. * prepareUpdate更新索引库中文档,如果文档不存在则会报错
  96. * @throws IOException
  97. *
  98. */
  99. @Test
  100. public void testUpdate() throws IOException
  101. {
  102. XContentBuilder source = XContentFactory.jsonBuilder()
  103. .startObject()
  104. .field("name", "will")
  105. .endObject();
  106. UpdateResponse updateResponse = transportClient
  107. .prepareUpdate(index, type, "6").setDoc(source).get();
  108. System.out.println(updateResponse.getVersion());
  109. }
  110. /**
  111. * 通过prepareIndex增加文档,参数为json字符串
  112. */
  113. @Test
  114. public void testIndexJson()
  115. {
  116. String source = "{\"name\":\"will\",\"age\":18}";
  117. IndexResponse indexResponse = transportClient
  118. .prepareIndex(index, type, "3").setSource(source).get();
  119. System.out.println(indexResponse.getVersion());
  120. }
  121. /**
  122. * 通过prepareIndex增加文档,参数为Map<String,Object>
  123. */
  124. @Test
  125. public void testIndexMap()
  126. {
  127. Map<String, Object> source = new HashMap<String, Object>(2);
  128. source.put("name", "Alice");
  129. source.put("age", 16);
  130. IndexResponse indexResponse = transportClient
  131. .prepareIndex(index, type, "4").setSource(source).get();
  132. System.out.println(indexResponse.getVersion());
  133. }
  134. /**
  135. * 通过prepareIndex增加文档,参数为javaBean
  136. *
  137. * @throws ElasticsearchException
  138. * @throws JsonProcessingException
  139. */
  140. @Test
  141. public void testIndexBean() throws ElasticsearchException, JsonProcessingException
  142. {
  143. Student stu = new Student();
  144. stu.setName("Fresh");
  145. stu.setAge(22);
  146. ObjectMapper mapper = new ObjectMapper();
  147. IndexResponse indexResponse = transportClient
  148. .prepareIndex(index, type, "5").setSource(mapper.writeValueAsString(stu)).get();
  149. System.out.println(indexResponse.getVersion());
  150. }
  151. /**
  152. * 通过prepareIndex增加文档,参数为XContentBuilder
  153. *
  154. * @throws IOException
  155. * @throws InterruptedException
  156. * @throws ExecutionException
  157. */
  158. @Test
  159. public void testIndexXContentBuilder() throws IOException, InterruptedException, ExecutionException
  160. {
  161. XContentBuilder builder = XContentFactory.jsonBuilder()
  162. .startObject()
  163. .field("name", "Avivi")
  164. .field("age", 30)
  165. .endObject();
  166. IndexResponse indexResponse = transportClient
  167. .prepareIndex(index, type, "6")
  168. .setSource(builder)
  169. .execute().get();
  170. //.execute().get();和get()效果一样
  171. System.out.println(indexResponse.getVersion());
  172. }
  173. /**
  174. * 通过prepareDelete删除文档
  175. *
  176. */
  177. @Test
  178. public void testDelete()
  179. {
  180. String id = "9";
  181. DeleteResponse deleteResponse = transportClient.prepareDelete(index,
  182. type, id).get();
  183. System.out.println(deleteResponse.getVersion());
  184. //删除所有记录
  185. transportClient.prepareDeleteByQuery(index).setTypes(type)
  186. .setQuery(QueryBuilders.matchAllQuery()).get();
  187. }
  188. /**
  189. * 删除索引库,不可逆慎用
  190. */
  191. @Test
  192. public void testDeleteeIndex()
  193. {
  194. transportClient.admin().indices().prepareDelete("shb01","shb02").get();
  195. }
  196. /**
  197. * 求索引库文档总数
  198. */
  199. @Test
  200. public void testCount()
  201. {
  202. long count = transportClient.prepareCount(index).get().getCount();
  203. System.out.println(count);
  204. }
  205. /**
  206. * 通过prepareBulk执行批处理
  207. *
  208. * @throws IOException
  209. */
  210. @Test
  211. public void testBulk() throws IOException
  212. {
  213. //1:生成bulk
  214. BulkRequestBuilder bulk = transportClient.prepareBulk();
  215. //2:新增
  216. IndexRequest add = new IndexRequest(index, type, "10");
  217. add.source(XContentFactory.jsonBuilder()
  218. .startObject()
  219. .field("name", "Henrry").field("age", 30)
  220. .endObject());
  221. //3:删除
  222. DeleteRequest del = new DeleteRequest(index, type, "1");
  223. //4:修改
  224. XContentBuilder source = XContentFactory.jsonBuilder().startObject().field("name", "jack_1").field("age", 19).endObject();
  225. UpdateRequest update = new UpdateRequest(index, type, "2");
  226. update.doc(source);
  227. bulk.add(del);
  228. bulk.add(add);
  229. bulk.add(update);
  230. //5:执行批处理
  231. BulkResponse bulkResponse = bulk.get();
  232. if(bulkResponse.hasFailures())
  233. {
  234. BulkItemResponse[] items = bulkResponse.getItems();
  235. for(BulkItemResponse item : items)
  236. {
  237. System.out.println(item.getFailureMessage());
  238. }
  239. }
  240. else
  241. {
  242. System.out.println("全部执行成功!");
  243. }
  244. }
  245. /**
  246. * 通过prepareSearch查询索引库
  247. * setQuery(QueryBuilders.matchQuery("name", "jack"))
  248. * setSearchType(SearchType.QUERY_THEN_FETCH)
  249. *
  250. */
  251. @Test
  252. public void testSearch()
  253. {
  254. SearchResponse searchResponse = transportClient.prepareSearch(index)
  255. .setTypes(type)
  256. .setQuery(QueryBuilders.matchAllQuery()) //查询所有
  257. //.setQuery(QueryBuilders.matchQuery("name", "tom").operator(Operator.AND)) //根据tom分词查询name,默认or
  258. //.setQuery(QueryBuilders.multiMatchQuery("tom", "name", "age")) //指定查询的字段
  259. //.setQuery(QueryBuilders.queryString("name:to* AND age:[0 TO 19]")) //根据条件查询,支持通配符大于等于0小于等于19
  260. //.setQuery(QueryBuilders.termQuery("name", "tom"))//查询时不分词
  261. .setSearchType(SearchType.QUERY_THEN_FETCH)
  262. .setFrom(0).setSize(10)//分页
  263. .addSort("age", SortOrder.DESC)//排序
  264. .get();
  265. SearchHits hits = searchResponse.getHits();
  266. long total = hits.getTotalHits();
  267. System.out.println(total);
  268. SearchHit[] searchHits = hits.hits();
  269. for(SearchHit s : searchHits)
  270. {
  271. System.out.println(s.getSourceAsString());
  272. }
  273. }
  274. /**
  275. * 多索引,多类型查询
  276. * timeout
  277. */
  278. @Test
  279. public void testSearchsAndTimeout()
  280. {
  281. SearchResponse searchResponse = transportClient.prepareSearch("shb01","shb02").setTypes("stu","tea")
  282. .setQuery(QueryBuilders.matchAllQuery())
  283. .setSearchType(SearchType.QUERY_THEN_FETCH)
  284. .setTimeout("3")
  285. .get();
  286. SearchHits hits = searchResponse.getHits();
  287. long totalHits = hits.getTotalHits();
  288. System.out.println(totalHits);
  289. SearchHit[] hits2 = hits.getHits();
  290. for(SearchHit h : hits2)
  291. {
  292. System.out.println(h.getSourceAsString());
  293. }
  294. }
  295. /**
  296. * 过滤,
  297. * lt 小于
  298. * gt 大于
  299. * lte 小于等于
  300. * gte 大于等于
  301. *
  302. */
  303. @Test
  304. public void testFilter()
  305. {
  306. SearchResponse searchResponse = transportClient.prepareSearch(index)
  307. .setTypes(type)
  308. .setQuery(QueryBuilders.matchAllQuery()) //查询所有
  309. .setSearchType(SearchType.QUERY_THEN_FETCH)
  310. //              .setPostFilter(FilterBuilders.rangeFilter("age").from(0).to(19)
  311. //                      .includeLower(true).includeUpper(true))
  312. .setPostFilter(FilterBuilders.rangeFilter("age").gte(18).lte(22))
  313. .setExplain(true) //explain为true表示根据数据相关度排序,和关键字匹配最高的排在前面
  314. .get();
  315. SearchHits hits = searchResponse.getHits();
  316. long total = hits.getTotalHits();
  317. System.out.println(total);
  318. SearchHit[] searchHits = hits.hits();
  319. for(SearchHit s : searchHits)
  320. {
  321. System.out.println(s.getSourceAsString());
  322. }
  323. }
  324. /**
  325. * 高亮
  326. */
  327. @Test
  328. public void testHighLight()
  329. {
  330. SearchResponse searchResponse = transportClient.prepareSearch(index)
  331. .setTypes(type)
  332. //.setQuery(QueryBuilders.matchQuery("name", "Fresh")) //查询所有
  333. .setQuery(QueryBuilders.queryString("name:F*"))
  334. .setSearchType(SearchType.QUERY_THEN_FETCH)
  335. .addHighlightedField("name")
  336. .setHighlighterPreTags("<font color='red'>")
  337. .setHighlighterPostTags("</font>")
  338. .get();
  339. SearchHits hits = searchResponse.getHits();
  340. System.out.println("sum:" + hits.getTotalHits());
  341. SearchHit[] hits2 = hits.getHits();
  342. for(SearchHit s : hits2)
  343. {
  344. Map<String, HighlightField> highlightFields = s.getHighlightFields();
  345. HighlightField highlightField = highlightFields.get("name");
  346. if(null != highlightField)
  347. {
  348. Text[] fragments = highlightField.fragments();
  349. System.out.println(fragments[0]);
  350. }
  351. System.out.println(s.getSourceAsString());
  352. }
  353. }
  354. /**
  355. * 分组
  356. */
  357. @Test
  358. public void testGroupBy()
  359. {
  360. SearchResponse searchResponse = transportClient.prepareSearch(index).setTypes(type)
  361. .setQuery(QueryBuilders.matchAllQuery())
  362. .setSearchType(SearchType.QUERY_THEN_FETCH)
  363. .addAggregation(AggregationBuilders.terms("group_age")
  364. .field("age").size(0))//根据age分组,默认返回10,size(0)也是10
  365. .get();
  366. Terms terms = searchResponse.getAggregations().get("group_age");
  367. List<Bucket> buckets = terms.getBuckets();
  368. for(Bucket bt : buckets)
  369. {
  370. System.out.println(bt.getKey() + " " + bt.getDocCount());
  371. }
  372. }
  373. /**
  374. * 聚合函数,本例之编写了sum,其他的聚合函数也可以实现
  375. *
  376. */
  377. @Test
  378. public void testMethod()
  379. {
  380. SearchResponse searchResponse = transportClient.prepareSearch(index).setTypes(type)
  381. .setQuery(QueryBuilders.matchAllQuery())
  382. .setSearchType(SearchType.QUERY_THEN_FETCH)
  383. .addAggregation(AggregationBuilders.terms("group_name").field("name")
  384. .subAggregation(AggregationBuilders.sum("sum_age").field("age")))
  385. .get();
  386. Terms terms = searchResponse.getAggregations().get("group_name");
  387. List<Bucket> buckets = terms.getBuckets();
  388. for(Bucket bt : buckets)
  389. {
  390. Sum sum = bt.getAggregations().get("sum_age");
  391. System.out.println(bt.getKey() + "  " + bt.getDocCount() + " "+ sum.getValue());
  392. }
  393. }
  394. }