JAVA整合 influxdb2.0 简单使用
//从时序数据库查值
Map<String, List<Map<String, Object>>> stringListMap = selectService.selectData(nameList);
@Service
public class SelectServiceImpl implements SelectService {
@Value("${}")
private String org;
@Value("${}")
private String bucket;
@Resource
private InfluxDBClient influxDBClient;
private static final String MEASUREMENT = "environment_test";
@Override
public Map<String, List<Map<String, Object>>> selectData(List<String> stringList) {
StringBuilder sql1 = new StringBuilder();
for (int i = 0; i < stringList.size(); i++) {
if (i == stringList.size() - 1) {
sql1.append('"').append(stringList.get(i)).append('"');
} else {
sql1.append('"').append(stringList.get(i)).append('"').append(" or r[\"_field\"] == ");
}
}
String sql2 = "|> range(start: -1m)";
String sql = "from(bucket: \"%s\")\n" + sql2 +
" |> filter(fn: (r) => r[\"_measurement\"] == \"%s\")\n" +
" |> filter(fn: (r) => r[\"_field\"] == " + sql1 + ")\n" +
" |> sort(columns:[\"valueTime\"]) " +
" |> yield()";
return select(sql);
}
@Override
public Map<String, List<Map<String, Object>>> selectHistoryData(List<String> stringList) {
StringBuilder sql1 = new StringBuilder();
for (int i = 0; i < stringList.size(); i++) {
if (i == stringList.size() - 1) {
sql1.append('"').append(stringList.get(i)).append('"');
} else {
sql1.append('"').append(stringList.get(i)).append('"').append(" or r[\"_field\"] == ");
}
}
String sql2 = "|> range(start: -23h)";
String sql = " from(bucket: \"%s\")\n" + sql2 +
" |> filter(fn: (r) => r[\"_measurement\"] == \"%s\")\n" +
" |> filter(fn: (r) => r[\"_field\"] == " + sql1 + ")\n" +
" |> sort(columns:[\"valueTime\"]) " +
" |> aggregateWindow(every: 1h, fn: mean, createEmpty: true)\n" +
" |> yield(name: \"mean\")";
return select(sql);
}
private Map<String, List<Map<String, Object>>> select(String sql){
String flux = String.format(sql, bucket, MEASUREMENT);
QueryApi queryApi = influxDBClient.getQueryApi();
List<FluxTable> tables = queryApi.query(flux, org);
Map<String, List<Map<String, Object>>> resulMap = new HashMap<>();
if (CollectionUtil.isNotEmpty(tables)) {
for (FluxTable table : tables) {
List<FluxRecord> records = table.getRecords();
List<Map<String, Object>> mapList = new ArrayList<>();
for (FluxRecord fluxRecord : records) {
Map<String, Object> map = new HashMap<>();
map.put("value", fluxRecord.getValue());
map.put("valueTime", fluxRecord.getTime());//Instant
map.put("field", fluxRecord.getField());
mapList.add(map);
}
String name = (String) mapList.get(0).get("field");
resulMap.put(name, mapList);
}
}
return resulMap;
}
}