Hbase查询工具类,根据时间查询数据

时间:2021-05-17 08:24:42

1,需求:已知空气监测数据在hbase中存储,要求按照时间,查询citycode为110000(北京)一个月的数据,数据为每日的监测数据

ID ,CITYCODE,SO2 ,CO,NO2 ,O3, PM10,PM2_5,AQI,MEASURE, TIMEPOINT

13110000020141120,  110000,31,3.939,141,8,368,301,351,6,2014-11-20

5110000020150108,   110000,53,3.305,101,12,176,143,190,4,2015-01-08

key值设计规则:

String  yearMoth= "201411";  //年月
String  time="20141120";     //年月日
String citycode="1100000";  //城市编码

//hbase 20个分区,数据进入hbase的分区规则如下
int region=Math.abs((yearMoth+citycode).hashCode())%20; //结果:8

//hbasekey值为
String key =region+citycode+time;

这样设计key的好处是:同一个城市,每个月的数据的分区相同,即region相同,只需要设置startRowkey与endRowKey即可

例如 查询北京 2014 年 11 月的数据 

startRowkey:13110000020141100

endRowkey:    13110000020141200  即可


但:如果查询  2014 年11 月20 号  --- 2015年01月08号的数据  则需要把每个月的数据查询出来,合并到一起

以下 为工具类的的方法:

创建 时间范围的javaBean对象

public class TimeRange {

    private String startPoint ;
    private String endPoint ;

    public String getStartPoint() {
        return startPoint;
    }

    public void setStartPoint(String startPoint) {
        this.startPoint = startPoint;
    }

    public String getEndPoint() {
        return endPoint;
    }

    public void setEndPoint(String endPoint) {
        this.endPoint = endPoint;
    }

    public String toString() {
        return startPoint + " - " + endPoint ;
    }
}

计算时间范围的工具类:

startStr =20141120 

endStr=20150108

得到的结果 List<TimeRange> 为  20141120-20141201

                                                    20141201-20150101

                                                    20150101-20150108


/**  * 计算查询时间范围  */ public static List<TimeRange > getCallLogRanges(String startStr , String endStr){
    try{
        SimpleDateFormat sdfYMD = new SimpleDateFormat("yyyyMMdd");
        SimpleDateFormat sdfYM = new SimpleDateFormat("yyyyMM");
        DecimalFormat df00 = new DecimalFormat("00");

        //
        List<TimeRange > list = new ArrayList<>();
        //字符串时间
        String startPrefix = startStr.substring(0, 6);

        String endPrefix = endStr.substring(0, 6);
        int endDay = Integer.parseInt(endStr.substring(6, 8));
        //结束点
        String endPoint = endPrefix + df00.format(endDay + 1);

        //日历对象
        Calendar c = Calendar.getInstance();

        //同年月
        if (startPrefix.equals(endPrefix)) {
            TimeRange  range = new TimeRange ();
            range.setStartPoint(startStr);          //设置起始点

            range.setEndPoint(endPoint);            //设置结束点
            list.add(range);
        } else {
            //1.起始月
            TimeRange  range = new TimeRange ();
            range.setStartPoint(startStr);

            //设置日历的时间对象
            c.setTime(sdfYMD.parse(startStr));
            c.add(Calendar.MONTH, 1);
            range.setEndPoint(sdfYM.format(c.getTime()));
            list.add(range);

            //是否是最后一月
            while (true) {
                //到了结束月份
                if (endStr.startsWith(sdfYM.format(c.getTime()))) {
                    range = new TimeRange ();
                    range.setStartPoint(sdfYM.format(c.getTime()));
                    range.setEndPoint(endPoint);
                    list.add(range);
                    break;
                } else {
                    range = new TimeRange ();
                    //起始时间
                    range.setStartPoint(sdfYM.format(c.getTime()));

                    //增加月份
                    c.add(Calendar.MONTH, 1);
                    range.setEndPoint(sdfYM.format(c.getTime()));
                    list.add(range);
                }
            }
        }
        return list ;
    }
    catch(Exception e){
        e.printStackTrace();
    }
    return null ;
}

Hbase 查询的时 遍历List<TimeRange>

public List<Map> getLogByContion(String citycode, List<TimeRange> ranges) {
    Configuration conf = HBaseConfiguration.create();
    Connection conn = ConnectionFactory.createConnection(conf);
    TableName tableName = TableName.valueOf("hbase:airDay");
    table=conn.getTable(tableName);

    List<Map> list = new ArrayList<>();
    try {
        for (TimeRange range: ranges) {
            Scan scan = new Scan();
            //设置扫描起始行 结束行
            scan.setStartRow(Bytes.toBytes(HbaseUtils.getStartRowkey(citycode,range.getStartPoint())));
            scan.setStopRow(Bytes.toBytes(HbaseUtils.getStopRowkey(citycode,range.getStartPoint(),range.getEndPoint())));

            ResultScanner rs = table.getScanner(scan);
            Iterator<Result> it = rs.iterator();

            byte[] f1 = Bytes.toBytes("f1");
            byte[] caller = Bytes.toBytes("citycode");
            byte[] callee = Bytes.toBytes("so2");
            byte[] callTime = Bytes.toBytes("co");
            byte[] callDuration = Bytes.toBytes("no2");

            while (it.hasNext()) {
                Result r = it.next();
                Map map = new HashMap();
                map.put("rowkey",Bytes.toString(r.getRow()));
                map.put("caller",Bytes.toString(r.getValue(f1,citycode)));
                map.put("callee",Bytes.toString(r.getValue(f1,so2)));
                map.put("callTime",Bytes.toString(r.getValue(f1,co)));
                map.put("callDuration",Bytes.toString(r.getValue(f1,no2)));
                list.add(map);
            }

        }

    }catch (Exception e){

    }

    return list;
}



public static String getStartRowkey(String citycode, String time) {
    int region=Math.abs((yearMoth+citycode).hashCode())%20;
    return region+citycode+time;
}
public  static String getStopRowkey(String citycode,String starttime, String endtime) {
    int region=Math.abs((yearMoth+citycode).hashCode())%20;
    return region+citycode+endtime;
}