JAVA整合 influxdb2.0 简单使用

时间:2025-04-14 10:11:29
//从时序数据库查值 Map<String, List<Map<String, Object>>> stringListMap = selectService.selectData(nameList); @Service public class SelectServiceImpl implements SelectService { @Value("${}") private String org; @Value("${}") private String bucket; @Resource private InfluxDBClient influxDBClient; private static final String MEASUREMENT = "environment_test"; @Override public Map<String, List<Map<String, Object>>> selectData(List<String> stringList) { StringBuilder sql1 = new StringBuilder(); for (int i = 0; i < stringList.size(); i++) { if (i == stringList.size() - 1) { sql1.append('"').append(stringList.get(i)).append('"'); } else { sql1.append('"').append(stringList.get(i)).append('"').append(" or r[\"_field\"] == "); } } String sql2 = "|> range(start: -1m)"; String sql = "from(bucket: \"%s\")\n" + sql2 + " |> filter(fn: (r) => r[\"_measurement\"] == \"%s\")\n" + " |> filter(fn: (r) => r[\"_field\"] == " + sql1 + ")\n" + " |> sort(columns:[\"valueTime\"]) " + " |> yield()"; return select(sql); } @Override public Map<String, List<Map<String, Object>>> selectHistoryData(List<String> stringList) { StringBuilder sql1 = new StringBuilder(); for (int i = 0; i < stringList.size(); i++) { if (i == stringList.size() - 1) { sql1.append('"').append(stringList.get(i)).append('"'); } else { sql1.append('"').append(stringList.get(i)).append('"').append(" or r[\"_field\"] == "); } } String sql2 = "|> range(start: -23h)"; String sql = " from(bucket: \"%s\")\n" + sql2 + " |> filter(fn: (r) => r[\"_measurement\"] == \"%s\")\n" + " |> filter(fn: (r) => r[\"_field\"] == " + sql1 + ")\n" + " |> sort(columns:[\"valueTime\"]) " + " |> aggregateWindow(every: 1h, fn: mean, createEmpty: true)\n" + " |> yield(name: \"mean\")"; return select(sql); } private Map<String, List<Map<String, Object>>> select(String sql){ String flux = String.format(sql, bucket, MEASUREMENT); QueryApi queryApi = influxDBClient.getQueryApi(); List<FluxTable> tables = queryApi.query(flux, org); Map<String, List<Map<String, Object>>> resulMap = new HashMap<>(); if (CollectionUtil.isNotEmpty(tables)) { for (FluxTable table : tables) { List<FluxRecord> records = table.getRecords(); List<Map<String, Object>> mapList = new ArrayList<>(); for (FluxRecord fluxRecord : records) { Map<String, Object> map = new HashMap<>(); map.put("value", fluxRecord.getValue()); map.put("valueTime", fluxRecord.getTime());//Instant map.put("field", fluxRecord.getField()); mapList.add(map); } String name = (String) mapList.get(0).get("field"); resulMap.put(name, mapList); } } return resulMap; } }