目录
项目需求
数据从Mysql 迁移到 Es, Es查询数据默认fetch Size最大为10000条,如果查询超过1万条,需要通过scroll形式进行查询
要求
- 安全问题考虑,查询需要连接ES-ip:9200,不可使用第三方j a r
- 由于目前项目的查询方式是基于Mysql,为了减少改动,暂时使用sql进行查询
- 需要将结果以stream的形式进行返回,避免内存占用过大,以及瞬时的网络带宽问题
普通策略
- 进行第一次访问,然后取columns, rows 和 cursor
- 转化我们的第一次结果为map形式的json
- 拿第一次的corsor id 进行第二次访问
- 用第一次记录的columns 组装第二次的rows结果为map形式的json
- 以此类推
升级策略:使用迭代器模式
- 将我们访问封装到我们的迭代器中
- 第一次访问以迭代器的构造函数访问,初始化我们的columns, 供后续使用, 将第一次的结果转化为map 形式的json供迭代器使用 hasNext里面进行后续的多次访问
迭代器模式组成
- iterator 抽象迭代器:负责定义访问和遍历元素的接口
- concreteIterator 具体迭代器:实现迭代器接口,完成容器元素的遍历
- aggreate抽象容器:容器角色负责提供创建决堤迭代器的角色接口,提供一个类似createiterator()这样的方法, 在Java中一般是 iterator()方法
- concreteaggreate 具体容器:实现容器接口定义的方法
代码实现
查询实体
/**
* 第一次查询用query和fetchSize, 后续用返回结果中的游标cursor
*/
@JsonIgnoreProperties
public class EsSqlQuery {
/**
* 调用ES的查询的sql
*/
private String query;
/**
* 取出的条数
*/
private Long fetchSize;
/**
* ES返回的游标
*/
private String cursor;
public EsSqlQuery(String cursor) {
this.cursor = cursor;
}
public EsSqlQuery(String query, Long fetchSize) {
this.query = query;
this.fetchSize = fetchSize;
}
public String getQuery() {
return query;
}
public void setQuery(String query) {
this.query = query;
}
public Long getFetchSize() {
return fetchSize;
}
public void setFetchSize(Long fetchSize) {
this.fetchSize = fetchSize;
}
public String getCursor() {
return cursor;
}
public void setCursor(String cursor) {
this.cursor = cursor;
}
}
返回实体
public class EsSqlResult {
private List<Map<String,String>> columns;
private List<List<Object>> rows;
private String cursor;
public List<Map<String, String>> getColumns() {
return columns;
}
public void setColumns(List<Map<String, String>> columns) {
this.columns = columns;
}
public List<List<Object>> getRows() {
return rows;
}
public void setRows(List<List<Object>> rows) {
this.rows = rows;
}
public String getCursor() {
return cursor;
}
public void setCursor(String cursor) {
this.cursor = cursor;
}
}
实现类
@Component
public class EsQueryProcessor {
// 1.用stream返回,节省内存
public Stream<Map<String, Object>> scrollEsStream(String query, Long fetchSize) {
return StreamSupport.stream(Spliterators
.spliteratorUnknownSize(new ScrollIterator(query, fetchSize), 0), false);
}
// 2.用迭代器模式
private class ScrollIterator implements Iterator<Map<String, Object>> {
/**
* 游标
*/
private String scrollId;
/**
* 字段名集合
*/
private List<String> columns;
/**
* 迭代元素
*/
Iterator<Map<String, Object>> iterator;
/**
* 模拟访问次数
*/
private int i = 1;
// 2.1构造函数进行第一次查询, 初始化后续需要使用的columns和iterator,scrollId
public ScrollIterator(String query, Long fetchSize) {
// 模拟根据query和fetchSize从ES第一次获取数据
String jsonName = "es1.json";
EsSqlResult result = this.getEsSqlResult(jsonName);
columns = CollStreamUtil.toList(result.getColumns(), u -> u.get("name"));
this.scrollId = result.getCursor();
this.iterator = convert(columns, result).iterator();
}
// 2.2根据 scrollId 是否为null进行后续访问,直到scrollId为null
@Override
public boolean hasNext() {
return iterator.hasNext() || scrollNext();
}
private boolean scrollNext() {
if (iterator == null || this.scrollId == null) {
return false;
}
i++;
// 模拟根据query和fetchSize从ES第i次获取数据
String jsonName = "es" + i + ".json";
EsSqlResult result = this.getEsSqlResult(jsonName);
this.scrollId = result.getCursor();
this.iterator = convert(columns, result).iterator();
return iterator.hasNext();
}
@Override
public Map<String, Object> next() {
return iterator.next();
}
// 模拟从ES获取查询结果
private EsSqlResult getEsSqlResult(String jsonName) {
if (StrUtil.isBlank(jsonName)) {
return null;
}
EsSqlResult result = null;
List<File> fileList = FileUtil.loopFiles(ResourceUtil.getResource("json").getFile());
for (File file : fileList) {
if (jsonName.equals(file.getName())) {
String json = FileUtil.readString(file, Charset.forName("UTF-8"));
result = JSONObject.parseObject(json, EsSqlResult.class);
}
}
return result;
}
}
// 3.返回结果传统一点 List<Map>
private List<Map<String, Object>> convert(List<String> columns, EsSqlResult result) {
List<Map<String, Object>> results = new ArrayList<>();
for (List<Object> row : result.getRows()) {
Map<String, Object> map = new HashMap<>();
for (int i = 0; i < columns.size(); i++) {
map.put(columns.get(i), row.get(i));
}
results.add(map);
}
return results;
}
}
代码测试
@RestController
public class EsController {
@Autowired
private EsService esService;
@GetMapping("/es")
public Boolean suggestRequirement() {
return esService.query();
}
}
public class EsService {
@Autowired
private EsQueryProcessor processor;
public Boolean query() {
Stream<Map<String, Object>> mapStream = processor.scrollEsStream(null, null);
mapStream.forEach(x -> System.out.println(x));
return true;
}
}
mock的ES返回结果json数据
第一次返回结果
{
"cursor": "safajfkajskjkasg",
"columns": [
{
"name": "username",
"type": "String"
},
{
"name": "age",
"type": "String"
}
],
"rows": [
[
"zhao",
"11"
],
[
"qian",
"22"
]
]
}
第二次返回结果
{
"cursor": "safajfkajskjkasg",
"rows": [
[
"sun",
"33"
],
[
"li",
"44"
]
]
}
第三次返回结果
{
"rows": [
[
"zhou",
"55"
],
[
"wu",
"66"
]
]
}