hbase分页应用场景及分页思路与代码实现

时间:2022-01-22 15:01:04

转自:http://www.aboutyun.com/forum.php?mod=viewthread&tid=7030&extra=page=1

可以带着下面问题来阅读
1.hbasef分页什么情况下比较有用?
2.hbase分页的思路是什么?
3.hbase分页代码与sql分页代码有什么区别和联系?

一、hbase分页应用场景:

一、应用场景
hbase到底需不需要分页,hbase的数据量肯定不少,肯定是需要分页的。很多人认为数量量如此大,怎么会展示。这个从客户角度来讲,我们做的系统,不可能会给机器看的。这里面我们可以对其进行统计分析,这样利于我们决策。
比如我们:平台中有一个场景是要做用户历史订单数据的查询,并且需要支持分页。这里只是举了一个场景,后面大家可以根据自己的经验。下面给大家讲讲分页的思路。

二、hbase分页思路:

hbase通过scan来扫描表,通过startKey,stopKey来确定范围,hbase官方提供了一个PageFilter来支持一次scan可以返回多少条数据即每页的行数。假如一页是10条,这样是第一页还好,但是第二页呢,如果不改变PageFilter的pageSize,那返回的还是第一页的数据,如果改变pageSize为20,则返回了第一页10多余的数据,在客户端要过滤掉,性能不好。那怎么办呢,方法就是在查询下一页时,指定下一页的startKey,这样PageFilter每次就不会返回多余的记录,stopKey可以不用变,那现在问题是,怎么得到下一页的startKey(即下一页第一行的rowkey)呢?,有两种方法来取每一页的startKey

1.  上一页的最后一行记录的rowkey作为下一页的startKey。
2.   在每次scan时多取一条记录,即把下一页第一条行页取出来,把该行的rowkey做为下一页的startKey。

这两种方法,都要注意,hbase scan时是包含startKey的,如果是采用第一种,则要在记录多取一条,排除第一条。第二种页是多取一条,但是排除最后一条,用来做下一页的startKey。还有需要注意的是在计算是否有下一页时,可以根据返回的条数来判断。

startKey怎么取没有问题了。但是怎么存储呢,有同学可能会想到存到session,但是如果你的服务是rest api型的,就没有session的概念了。那还有两种选择:
1. 是存到客户端,让客户端每次请求时把startKey再传回来,这样需要依赖客户端,如果客户端是远程,或者是开放平台的情况下,可能不合适。
2. 存在服务端,存在服务端需要注意并发访问的情况。比如scan同一个表,一个访问第2页,一个访问第3页,服务端就需要对每一个table的scan 存每一页的startKey,需要为同一个查询条件包含pageSize,因为pageSize不一样,startKey也会不一样,
在服务crash情况下,从起后都从第一页开始。

我自己是采用第二种方案,存在服务端.

----------------------------------------------------------------------------------------------------------------------------------------------------
三、代码实现

  1. import java.io.IOException;
  2. import java.util.LinkedHashMap;
  3. import java.util.LinkedList;
  4. import java.util.List;
  5. import java.util.Map;
  6. import org.apache.commons.lang.StringUtils;
  7. import org.apache.hadoop.conf.Configuration;
  8. import org.apache.hadoop.hbase.HBaseConfiguration;
  9. import org.apache.hadoop.hbase.client.Get;
  10. import org.apache.hadoop.hbase.client.HTableInterface;
  11. import org.apache.hadoop.hbase.client.HTablePool;
  12. import org.apache.hadoop.hbase.client.Result;
  13. import org.apache.hadoop.hbase.client.ResultScanner;
  14. import org.apache.hadoop.hbase.client.Scan;
  15. import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
  16. import org.apache.hadoop.hbase.filter.Filter;
  17. import org.apache.hadoop.hbase.filter.FilterList;
  18. import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
  19. import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
  20. import org.apache.hadoop.hbase.util.Bytes;
  21. public class HBaseUtils {
  22. private static Configuration config = null;
  23. private static HTablePool tp = null;
  24. static {
  25. // 加载集群配置
  26. config = HBaseConfiguration.create();
  27. config.set("hbase.zookeeper.quorum", "xx.xx.xx");
  28. config.set("hbase.zookeeper.property.clientPort", "2181");
  29. // 创建表池(可伟略提高查询性能,具体说明请百度或官方API)
  30. tp = new HTablePool(config, 10);
  31. }
  32. /*
  33. * 获取hbase的表
  34. */
  35. public static HTableInterface getTable(String tableName) {
  36. if (StringUtils.isEmpty(tableName))
  37. return null;
  38. return tp.getTable(getBytes(tableName));
  39. }
  40. /* 转换byte数组 */
  41. public static byte[] getBytes(String str) {
  42. if (str == null)
  43. str = "";
  44. return Bytes.toBytes(str);
  45. }
  46. /**
  47. * 查询数据
  48. * @param tableKey 表标识
  49. * @param queryKey 查询标识
  50. * @param startRow 开始行
  51. * @param paramsMap 参数集合
  52. * @return 结果集
  53. */
  54. public static TBData getDataMap(String tableName, String startRow,
  55. String stopRow, Integer currentPage, Integer pageSize)
  56. throws IOException {
  57. List<Map<String, String>> mapList = null;
  58. mapList = new LinkedList<Map<String, String>>();
  59. ResultScanner scanner = null;
  60. // 为分页创建的封装类对象,下面有给出具体属性
  61. TBData tbData = null;
  62. try {
  63. // 获取最大返回结果数量
  64. if (pageSize == null || pageSize == 0L)
  65. pageSize = 100;
  66. if (currentPage == null || currentPage == 0)
  67. currentPage = 1;
  68. // 计算起始页和结束页
  69. Integer firstPage = (currentPage - 1) * pageSize;
  70. Integer endPage = firstPage + pageSize;
  71. // 从表池中取出HBASE表对象
  72. HTableInterface table = getTable(tableName);
  73. // 获取筛选对象
  74. Scan scan = getScan(startRow, stopRow);
  75. // 给筛选对象放入过滤器(true标识分页,具体方法在下面)
  76. scan.setFilter(packageFilters(true));
  77. // 缓存1000条数据
  78. scan.setCaching(1000);
  79. scan.setCacheBlocks(false);
  80. scanner = table.getScanner(scan);
  81. int i = 0;
  82. List<byte[]> rowList = new LinkedList<byte[]>();
  83. // 遍历扫描器对象, 并将需要查询出来的数据row key取出
  84. for (Result result : scanner) {
  85. String row = toStr(result.getRow());
  86. if (i >= firstPage && i < endPage) {
  87. rowList.add(getBytes(row));
  88. }
  89. i++;
  90. }
  91. // 获取取出的row key的GET对象
  92. List<Get> getList = getList(rowList);
  93. Result[] results = table.get(getList);
  94. // 遍历结果
  95. for (Result result : results) {
  96. Map<byte[], byte[]> fmap = packFamilyMap(result);
  97. Map<String, String> rmap = packRowMap(fmap);
  98. mapList.add(rmap);
  99. }
  100. // 封装分页对象
  101. tbData = new TBData();
  102. tbData.setCurrentPage(currentPage);
  103. tbData.setPageSize(pageSize);
  104. tbData.setTotalCount(i);
  105. tbData.setTotalPage(getTotalPage(pageSize, i));
  106. tbData.setResultList(mapList);
  107. } catch (IOException e) {
  108. e.printStackTrace();
  109. } finally {
  110. closeScanner(scanner);
  111. }
  112. return tbData;
  113. }
  114. private static int getTotalPage(int pageSize, int totalCount) {
  115. int n = totalCount / pageSize;
  116. if (totalCount % pageSize == 0) {
  117. return n;
  118. } else {
  119. return ((int) n) + 1;
  120. }
  121. }
  122. // 获取扫描器对象
  123. private static Scan getScan(String startRow, String stopRow) {
  124. Scan scan = new Scan();
  125. scan.setStartRow(getBytes(startRow));
  126. scan.setStopRow(getBytes(stopRow));
  127. return scan;
  128. }
  129. /**
  130. * 封装查询条件
  131. */
  132. private static FilterList packageFilters(boolean isPage) {
  133. FilterList filterList = null;
  134. // MUST_PASS_ALL(条件 AND) MUST_PASS_ONE(条件OR)
  135. filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL);
  136. Filter filter1 = null;
  137. Filter filter2 = null;
  138. filter1 = newFilter(getBytes("family1"), getBytes("column1"),
  139. CompareOp.EQUAL, getBytes("condition1"));
  140. filter2 = newFilter(getBytes("family2"), getBytes("column1"),
  141. CompareOp.LESS, getBytes("condition2"));
  142. filterList.addFilter(filter1);
  143. filterList.addFilter(filter2);
  144. if (isPage) {
  145. filterList.addFilter(new FirstKeyOnlyFilter());
  146. }
  147. return filterList;
  148. }
  149. private static Filter newFilter(byte[] f, byte[] c, CompareOp op, byte[] v) {
  150. return new SingleColumnValueFilter(f, c, op, v);
  151. }
  152. private static void closeScanner(ResultScanner scanner) {
  153. if (scanner != null)
  154. scanner.close();
  155. }
  156. /**
  157. * 封装每行数据
  158. */
  159. private static Map<String, String> packRowMap(Map<byte[], byte[]> dataMap) {
  160. Map<String, String> map = new LinkedHashMap<String, String>();
  161. for (byte[] key : dataMap.keySet()) {
  162. byte[] value = dataMap.get(key);
  163. map.put(toStr(key), toStr(value));
  164. }
  165. return map;
  166. }
  167. /* 根据ROW KEY集合获取GET对象集合 */
  168. private static List<Get> getList(List<byte[]> rowList) {
  169. List<Get> list = new LinkedList<Get>();
  170. for (byte[] row : rowList) {
  171. Get get = new Get(row);
  172. get.addColumn(getBytes("family1"), getBytes("column1"));
  173. get.addColumn(getBytes("family1"), getBytes("column2"));
  174. get.addColumn(getBytes("family2"), getBytes("column1"));
  175. list.add(get);
  176. }
  177. return list;
  178. }
  179. /**
  180. * 封装配置的所有字段列族
  181. */
  182. private static Map<byte[], byte[]> packFamilyMap(Result result) {
  183. Map<byte[], byte[]> dataMap = null;
  184. dataMap = new LinkedHashMap<byte[], byte[]>();
  185. dataMap.putAll(result.getFamilyMap(getBytes("family1")));
  186. dataMap.putAll(result.getFamilyMap(getBytes("family2")));
  187. return dataMap;
  188. }
  189. private static String toStr(byte[] bt) {
  190. return Bytes.toString(bt);
  191. }
  192. public static void main(String[] args) throws IOException {
  193. // 拿出row key的起始行和结束行
  194. // #<0<9<:
  195. String startRow = "aaaa#";
  196. String stopRow = "aaaa:";
  197. int currentPage = 1;
  198. int pageSize = 20;
  199. // 执行hbase查询
  200. getDataMap("table", startRow, stopRow, currentPage, pageSize);
  201. }
  202. }
  203. class TBData {
  204. private Integer currentPage;
  205. private Integer pageSize;
  206. private Integer totalCount;
  207. private Integer totalPage;
  208. private List<Map<String, String>> resultList;
  209. public Integer getCurrentPage() {
  210. return currentPage;
  211. }
  212. public void setCurrentPage(Integer currentPage) {
  213. this.currentPage = currentPage;
  214. }
  215. public Integer getPageSize() {
  216. return pageSize;
  217. }
  218. public void setPageSize(Integer pageSize) {
  219. this.pageSize = pageSize;
  220. }
  221. public Integer getTotalCount() {
  222. return totalCount;
  223. }
  224. public void setTotalCount(Integer totalCount) {
  225. this.totalCount = totalCount;
  226. }
  227. public Integer getTotalPage() {
  228. return totalPage;
  229. }
  230. public void setTotalPage(Integer totalPage) {
  231. this.totalPage = totalPage;
  232. }
  233. public List<Map<String, String>> getResultList() {
  234. return resultList;
  235. }
  236. public void setResultList(List<Map<String, String>> resultList) {
  237. this.resultList = resultList;
  238. }
  239. }

复制代码