Java操作Hbase进行建表、删表以及对数据进行增删改查,条件查询

时间:2021-08-09 08:33:24
1、搭建环境

  新建JAVA项目,添加的包有:

   有关Hadoop的hadoop-core-0.20.204.0.jar

   有关Hbase的hbase-0.90.4.jar、hbase-0.90.4-tests.jar以及Hbase资源包中lib目录下的所有jar包

 

2、主要程序

 

Hbase基本使用示例:

  1. import java.io.IOException;   
  2. import java.util.ArrayList;   
  3. import java.util.List;   
  4.    
  5. import org.apache.hadoop.conf.Configuration;   
  6. import org.apache.hadoop.hbase.HBaseConfiguration;   
  7. import org.apache.hadoop.hbase.HColumnDescriptor;   
  8. import org.apache.hadoop.hbase.HTableDescriptor;   
  9. import org.apache.hadoop.hbase.KeyValue;   
  10. import org.apache.hadoop.hbase.MasterNotRunningException;   
  11. import org.apache.hadoop.hbase.ZooKeeperConnectionException;   
  12. import org.apache.hadoop.hbase.client.Delete;   
  13. import org.apache.hadoop.hbase.client.Get;   
  14. import org.apache.hadoop.hbase.client.HBaseAdmin;   
  15. import org.apache.hadoop.hbase.client.HTable;   
  16. import org.apache.hadoop.hbase.client.HTablePool;   
  17. import org.apache.hadoop.hbase.client.Put;   
  18. import org.apache.hadoop.hbase.client.Result;   
  19. import org.apache.hadoop.hbase.client.ResultScanner;   
  20. import org.apache.hadoop.hbase.client.Scan;   
  21. import org.apache.hadoop.hbase.filter.Filter;   
  22. import org.apache.hadoop.hbase.filter.FilterList;   
  23. import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;   
  24. import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;   
  25. import org.apache.hadoop.hbase.util.Bytes;   
  26.    
  27. public class HbaseTest {   
  28.    
  29.     public static Configuration configuration;   
  30.     static {   
  31.         configuration = HBaseConfiguration.create();   
  32.         configuration.set("hbase.zookeeper.property.clientPort""2181");   
  33.         configuration.set("hbase.zookeeper.quorum""192.168.1.100");   
  34.         configuration.set("hbase.master""192.168.1.100:600000");   
  35.     }   
  36.    
  37.     public static void main(String[] args) {   
  38.         // createTable("wujintao");   
  39.         // insertData("wujintao");   
  40.         // QueryAll("wujintao");   
  41.         // QueryByCondition1("wujintao");   
  42.         // QueryByCondition2("wujintao");   
  43.         //QueryByCondition3("wujintao");   
  44.         //deleteRow("wujintao","abcdef");   
  45.         deleteByCondition("wujintao","abcdef");   
  46.     }   
  47.    
  48.        
  49.     public static void createTable(String tableName) {   
  50.         System.out.println("start create table ......");   
  51.         try {   
  52.             HBaseAdmin hBaseAdmin = new HBaseAdmin(configuration);   
  53.             if (hBaseAdmin.tableExists(tableName)) {// 如果存在要创建的表,那么先删除,再创建   
  54.                 hBaseAdmin.disableTable(tableName);   
  55.                 hBaseAdmin.deleteTable(tableName);   
  56.                 System.out.println(tableName + " is exist,detele....");   
  57.             }   
  58.             HTableDescriptor tableDescriptor = new HTableDescriptor(tableName);   
  59.             tableDescriptor.addFamily(new HColumnDescriptor("column1"));   
  60.             tableDescriptor.addFamily(new HColumnDescriptor("column2"));   
  61.             tableDescriptor.addFamily(new HColumnDescriptor("column3"));   
  62.             hBaseAdmin.createTable(tableDescriptor);   
  63.         } catch (MasterNotRunningException e) {   
  64.             e.printStackTrace();   
  65.         } catch (ZooKeeperConnectionException e) {   
  66.             e.printStackTrace();   
  67.         } catch (IOException e) {   
  68.             e.printStackTrace();   
  69.         }   
  70.         System.out.println("end create table ......");   
  71.     }   
  72.    
  73.        
  74.     public static void insertData(String tableName) {   
  75.         System.out.println("start insert data ......");   
  76.         HTablePool pool = new HTablePool(configuration, 1000);   
  77.         HTable table = (HTable) pool.getTable(tableName);   
  78.         Put put = new Put("112233bbbcccc".getBytes());// 一个PUT代表一行数据,再NEW一个PUT表示第二行数据,每行一个唯一的ROWKEY,此处rowkey为put构造方法中传入的值   
  79.         put.add("column1".getBytes(), null"aaa".getBytes());// 本行数据的第一列   
  80.         put.add("column2".getBytes(), null"bbb".getBytes());// 本行数据的第三列   
  81.         put.add("column3".getBytes(), null"ccc".getBytes());// 本行数据的第三列   
  82.         try {   
  83.             table.put(put);   
  84.         } catch (IOException e) {   
  85.             e.printStackTrace();   
  86.         }   
  87.         System.out.println("end insert data ......");   
  88.     }   
  89.    
  90.        
  91.     public static void dropTable(String tableName) {   
  92.         try {   
  93.             HBaseAdmin admin = new HBaseAdmin(configuration);   
  94.             admin.disableTable(tableName);   
  95.             admin.deleteTable(tableName);   
  96.         } catch (MasterNotRunningException e) {   
  97.             e.printStackTrace();   
  98.         } catch (ZooKeeperConnectionException e) {   
  99.             e.printStackTrace();   
  100.         } catch (IOException e) {   
  101.             e.printStackTrace();   
  102.         }   
  103.    
  104.     }   
  105.        
  106.      public static void deleteRow(String tablename, String rowkey)  {   
  107.         try {   
  108.             HTable table = new HTable(configuration, tablename);   
  109.             List list = new ArrayList();   
  110.             Delete d1 = new Delete(rowkey.getBytes());   
  111.             list.add(d1);   
  112.                
  113.             table.delete(list);   
  114.             System.out.println("删除行成功!");   
  115.                
  116.         } catch (IOException e) {   
  117.             e.printStackTrace();   
  118.         }   
  119.            
  120.    
  121.     }   
  122.    
  123.         
  124.      public static void deleteByCondition(String tablename, String rowkey)  {   
  125.             //目前还没有发现有效的API能够实现根据非rowkey的条件删除这个功能能,还有清空表全部数据的API操作   
  126.    
  127.     }   
  128.    
  129.    
  130.        
  131.     public static void QueryAll(String tableName) {   
  132.         HTablePool pool = new HTablePool(configuration, 1000);   
  133.         HTable table = (HTable) pool.getTable(tableName);   
  134.         try {   
  135.             ResultScanner rs = table.getScanner(new Scan());   
  136.             for (Result r : rs) {   
  137.                 System.out.println("获得到rowkey:" + new String(r.getRow()));   
  138.                 for (KeyValue keyValue : r.raw()) {   
  139.                     System.out.println("列:" + new String(keyValue.getFamily())   
  140.                             + "====值:" + new String(keyValue.getValue()));   
  141.                 }   
  142.             }   
  143.         } catch (IOException e) {   
  144.             e.printStackTrace();   
  145.         }   
  146.     }   
  147.    
  148.        
  149.     public static void QueryByCondition1(String tableName) {   
  150.    
  151.         HTablePool pool = new HTablePool(configuration, 1000);   
  152.         HTable table = (HTable) pool.getTable(tableName);   
  153.         try {   
  154.             Get scan = new Get("abcdef".getBytes());// 根据rowkey查询   
  155.             Result r = table.get(scan);   
  156.             System.out.println("获得到rowkey:" + new String(r.getRow()));   
  157.             for (KeyValue keyValue : r.raw()) {   
  158.                 System.out.println("列:" + new String(keyValue.getFamily())   
  159.                         + "====值:" + new String(keyValue.getValue()));   
  160.             }   
  161.         } catch (IOException e) {   
  162.             e.printStackTrace();   
  163.         }   
  164.     }   
  165.    
  166.        
  167.     public static void QueryByCondition2(String tableName) {   
  168.    
  169.         try {   
  170.             HTablePool pool = new HTablePool(configuration, 1000);   
  171.             HTable table = (HTable) pool.getTable(tableName);   
  172.             Filter filter = new SingleColumnValueFilter(Bytes   
  173.                     .toBytes("column1"), null, CompareOp.EQUAL, Bytes   
  174.                     .toBytes("aaa")); // 当列column1的值为aaa时进行查询   
  175.             Scan s = new Scan();   
  176.             s.setFilter(filter);   
  177.             ResultScanner rs = table.getScanner(s);   
  178.             for (Result r : rs) {   
  179.                 System.out.println("获得到rowkey:" + new String(r.getRow()));   
  180.                 for (KeyValue keyValue : r.raw()) {   
  181.                     System.out.println("列:" + new String(keyValue.getFamily())   
  182.                             + "====值:" + new String(keyValue.getValue()));   
  183.                 }   
  184.             }   
  185.         } catch (Exception e) {   
  186.             e.printStackTrace();   
  187.         }   
  188.    
  189.     }   
  190.    
  191.        
  192.     public static void QueryByCondition3(String tableName) {   
  193.    
  194.         try {   
  195.             HTablePool pool = new HTablePool(configuration, 1000);   
  196.             HTable table = (HTable) pool.getTable(tableName);   
  197.    
  198.             List<Filter> filters = new ArrayList<Filter>();   
  199.    
  200.             Filter filter1 = new SingleColumnValueFilter(Bytes   
  201.                     .toBytes("column1"), null, CompareOp.EQUAL, Bytes   
  202.                     .toBytes("aaa"));   
  203.             filters.add(filter1);   
  204.    
  205.             Filter filter2 = new SingleColumnValueFilter(Bytes   
  206.                     .toBytes("column2"), null, CompareOp.EQUAL, Bytes   
  207.                     .toBytes("bbb"));   
  208.             filters.add(filter2);   
  209.    
  210.             Filter filter3 = new SingleColumnValueFilter(Bytes   
  211.                     .toBytes("column3"), null, CompareOp.EQUAL, Bytes   
  212.                     .toBytes("ccc"));   
  213.             filters.add(filter3);   
  214.    
  215.             FilterList filterList1 = new FilterList(filters);   
  216.    
  217.             Scan scan = new Scan();   
  218.             scan.setFilter(filterList1);   
  219.             ResultScanner rs = table.getScanner(scan);   
  220.             for (Result r : rs) {   
  221.                 System.out.println("获得到rowkey:" + new String(r.getRow()));   
  222.                 for (KeyValue keyValue : r.raw()) {   
  223.                     System.out.println("列:" + new String(keyValue.getFamily())   
  224.                             + "====值:" + new String(keyValue.getValue()));   
  225.                 }   
  226.             }   
  227.             rs.close();   
  228.    
  229.         } catch (Exception e) {   
  230.             e.printStackTrace();   
  231.         }   
  232.    
  233.     }   
  234.    
  235. }  
Java操作Hbase进行建表、删表以及对数据进行增删改查,条件查询

Hbase数据获取示例:

  1. /* 
  2.  * Need Packages: 
  3.  * commons-codec-1.4.jar 
  4.  * 
  5.  * commons-logging-1.1.1.jar 
  6.  * 
  7.  * hadoop-0.20.2-core.jar 
  8.  * 
  9.  * hbase-0.90.2.jar 
  10.  * 
  11.  * log4j-1.2.16.jar 
  12.  * 
  13.  * zookeeper-3.3.2.jar 
  14.  * 
  15.  */  
  16.   
  17. import java.io.IOException;  
  18. import java.util.ArrayList;  
  19. import java.util.List;  
  20. import org.apache.hadoop.conf.Configuration;  
  21. import org.apache.hadoop.hbase.HBaseConfiguration;  
  22. import org.apache.hadoop.hbase.KeyValue;  
  23. import org.apache.hadoop.hbase.client.Get;  
  24. import org.apache.hadoop.hbase.client.HTable;  
  25. import org.apache.hadoop.hbase.client.Result;  
  26. import org.apache.hadoop.hbase.client.ResultScanner;  
  27. import org.apache.hadoop.hbase.client.Scan;  
  28. import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;  
  29. import org.apache.hadoop.hbase.filter.FilterList;  
  30. import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;  
  31. import org.apache.hadoop.hbase.util.Bytes;  
  32.   
  33. public class HbaseSelecter  
  34. {  
  35.     public static Configuration configuration = null;  
  36.     static  
  37.     {  
  38.         configuration = HBaseConfiguration.create();  
  39.         //configuration.set("hbase.master", "192.168.0.201:60000");  
  40.         configuration.set("hbase.zookeeper.quorum""idc01-hd-nd-03,idc01-hd-nd-04,idc01-hd-nd-05");  
  41.         //configuration.set("hbase.zookeeper.property.clientPort", "2181");  
  42.     }  
  43.   
  44.     public static void selectRowKey(String tablename, String rowKey) throws IOException  
  45.     {  
  46.         HTable table = new HTable(configuration, tablename);  
  47.         Get g = new Get(rowKey.getBytes());  
  48.         Result rs = table.get(g);  
  49.   
  50.         for (KeyValue kv : rs.raw())  
  51.         {  
  52.             System.out.println("--------------------" + new String(kv.getRow()) + "----------------------------");  
  53.             System.out.println("Column Family: " + new String(kv.getFamily()));  
  54.             System.out.println("Column       :" + new String(kv.getQualifier()));  
  55.             System.out.println("value        : " + new String(kv.getValue()));  
  56.         }  
  57.     }  
  58.   
  59.     public static void selectRowKeyFamily(String tablename, String rowKey, String family) throws IOException  
  60.     {  
  61.         HTable table = new HTable(configuration, tablename);  
  62.         Get g = new Get(rowKey.getBytes());  
  63.         g.addFamily(Bytes.toBytes(family));  
  64.         Result rs = table.get(g);  
  65.         for (KeyValue kv : rs.raw())  
  66.         {  
  67.             System.out.println("--------------------" + new String(kv.getRow()) + "----------------------------");  
  68.             System.out.println("Column Family: " + new String(kv.getFamily()));  
  69.             System.out.println("Column       :" + new String(kv.getQualifier()));  
  70.             System.out.println("value        : " + new String(kv.getValue()));  
  71.         }  
  72.     }  
  73.   
  74.     public static void selectRowKeyFamilyColumn(String tablename, String rowKey, String family, String column)  
  75.             throws IOException  
  76.     {  
  77.         HTable table = new HTable(configuration, tablename);  
  78.         Get g = new Get(rowKey.getBytes());  
  79.         g.addColumn(family.getBytes(), column.getBytes());  
  80.   
  81.         Result rs = table.get(g);  
  82.   
  83.         for (KeyValue kv : rs.raw())  
  84.         {  
  85.             System.out.println("--------------------" + new String(kv.getRow()) + "----------------------------");  
  86.             System.out.println("Column Family: " + new String(kv.getFamily()));  
  87.             System.out.println("Column       :" + new String(kv.getQualifier()));  
  88.             System.out.println("value        : " + new String(kv.getValue()));  
  89.         }  
  90.     }  
  91.   
  92.     public static void selectFilter(String tablename, List<String> arr) throws IOException  
  93.     {  
  94.         HTable table = new HTable(configuration, tablename);  
  95.         Scan scan = new Scan();// 实例化一个遍历器  
  96.         FilterList filterList = new FilterList(); // 过滤器List  
  97.   
  98.         for (String v : arr)  
  99.         { // 下标0为列簇,1为列名,3为条件  
  100.             String[] wheres = v.split(",");  
  101.   
  102.             filterList.addFilter(new SingleColumnValueFilter(// 过滤器  
  103.                     wheres[0].getBytes(), wheres[1].getBytes(),  
  104.   
  105.                     CompareOp.EQUAL,// 各个条件之间是" and "的关系  
  106.                     wheres[2].getBytes()));  
  107.         }  
  108.         scan.setFilter(filterList);  
  109.         ResultScanner ResultScannerFilterList = table.getScanner(scan);  
  110.         for (Result rs = ResultScannerFilterList.next(); rs != null; rs = ResultScannerFilterList.next())  
  111.         {  
  112.             for (KeyValue kv : rs.list())  
  113.             {  
  114.                 System.out.println("--------------------" + new String(kv.getRow()) + "----------------------------");  
  115.                 System.out.println("Column Family: " + new String(kv.getFamily()));  
  116.                 System.out.println("Column       :" + new String(kv.getQualifier()));  
  117.                 System.out.println("value        : " + new String(kv.getValue()));  
  118.             }  
  119.         }  
  120.     }  
  121.   
  122.     public static void main(String[] args) throws Exception  
  123.     {  
  124.         if(args.length < 2){  
  125.             System.out.println("Usage: HbaseSelecter table key");  
  126.             System.exit(-1);  
  127.         }  
  128.   
  129.         System.out.println("Table: " + args[0] + " , key: " + args[1]);  
  130.         selectRowKey(args[0], args[1]);  
  131.   
  132.         /* 
  133.         System.out.println("------------------------行键  查询----------------------------------"); 
  134.         selectRowKey("b2c", "yihaodian1002865"); 
  135.         selectRowKey("b2c", "yihaodian1003396"); 
  136.  
  137.         System.out.println("------------------------行键+列簇 查询----------------------------------"); 
  138.         selectRowKeyFamily("riapguh", "用户A", "user"); 
  139.         selectRowKeyFamily("riapguh", "用户B", "user"); 
  140.  
  141.         System.out.println("------------------------行键+列簇+列名 查询----------------------------------"); 
  142.         selectRowKeyFamilyColumn("riapguh", "用户A", "user", "user_code"); 
  143.         selectRowKeyFamilyColumn("riapguh", "用户B", "user", "user_code"); 
  144.  
  145.         System.out.println("------------------------条件 查询----------------------------------"); 
  146.         List<String> arr = new ArrayList<String>(); 
  147.         arr.add("dpt,dpt_code,d_001"); 
  148.         arr.add("user,user_code,u_0001"); 
  149.         selectFilter("riapguh", arr); 
  150.         */  
  151.     }  
  152. }  
Java操作Hbase进行建表、删表以及对数据进行增删改查,条件查询


Hbase 导出特定列 示例(小量数据):

  1. /* 
  2.  * Need Packages: 
  3.  * commons-codec-1.4.jar 
  4.  * 
  5.  * commons-logging-1.1.1.jar 
  6.  * 
  7.  * hadoop-0.20.2-core.jar 
  8.  * 
  9.  * hbase-0.90.2.jar 
  10.  * 
  11.  * log4j-1.2.16.jar 
  12.  * 
  13.  * zookeeper-3.3.2.jar 
  14.  * 
  15.  * Example: javac -classpath ./:/data/chenzhenjing/code/panama/lib/hbase-0.90.2.jar:/data/chenzhenjing/code/panama/lib/hadoop-core-0.20-append-for-hbase.jar:/data/chenzhenjing/code/panama/lib/commons-logging-1.0.4.jar:/data/chenzhenjing/code/panama/lib/commons-lang-2.4.jar:/data/chenzhenjing/code/panama/lib/commons-io-1.2.jar:/data/chenzhenjing/code/panama/lib/zookeeper-3.3.2.jar:/data/chenzhenjing/code/panama/lib/log4j-1.2.15.jar:/data/chenzhenjing/code/panama/lib/commons-codec-1.3.jar   DiffHbase.java    
  16.  */  
  17.   
  18. import java.io.BufferedReader;  
  19. import java.io.File;  
  20. import java.io.IOException;  
  21. import java.io.FileInputStream;  
  22. import java.io.InputStreamReader;  
  23. import java.io.FileOutputStream;  
  24. import java.io.OutputStreamWriter;  
  25. import java.io.StringReader;  
  26. import java.text.SimpleDateFormat;  
  27. import java.util.Date;  
  28.   
  29. import java.io.IOException;  
  30. import java.util.ArrayList;  
  31. import java.util.List;  
  32. import org.apache.hadoop.conf.Configuration;  
  33. import org.apache.hadoop.hbase.HBaseConfiguration;  
  34. import org.apache.hadoop.hbase.KeyValue;  
  35. import org.apache.hadoop.hbase.client.Get;  
  36. import org.apache.hadoop.hbase.client.HTable;  
  37. import org.apache.hadoop.hbase.client.Result;  
  38. import org.apache.hadoop.hbase.client.ResultScanner;  
  39. import org.apache.hadoop.hbase.client.Scan;  
  40. import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;  
  41. import org.apache.hadoop.hbase.filter.FilterList;  
  42. import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;  
  43. import org.apache.hadoop.hbase.util.Bytes;  
  44.   
  45. class ColumnUtils {  
  46.   
  47.     public static byte[] getFamily(String column){  
  48.         return getBytes(column, 0);  
  49.     }  
  50.   
  51.     public static byte[] getQualifier(String column){  
  52.         return getBytes(column, 1);  
  53.     }  
  54.   
  55.     private static byte[] getBytes(String column , int offset){  
  56.         String[] split = column.split(":");  
  57.         return Bytes.toBytes(offset > split.length -1 ? split[0] :split[offset]);  
  58.     }  
  59. }  
  60.   
  61. public class DiffHbase  
  62. {  
  63.     public static Configuration configuration = null;  
  64.     static  
  65.     {  
  66.         configuration = HBaseConfiguration.create();  
  67.         configuration.set("hbase.zookeeper.quorum""idc01-hd-ds-01,idc01-hd-ds-02,idc01-hd-ds-03");  
  68.     }  
  69.   
  70.     public static void selectRowKey(String tablename, String rowKey) throws IOException  
  71.     {  
  72.         HTable table = new HTable(configuration, tablename);  
  73.         Get g = new Get(rowKey.getBytes());  
  74.         Result rs = table.get(g);  
  75.   
  76.         for (KeyValue kv : rs.raw())  
  77.         {  
  78.             System.out.println("--------------------" + new String(kv.getRow()) + "----------------------------");  
  79.             System.out.println("Column Family: " + new String(kv.getFamily()));  
  80.             System.out.println("Column       :" + new String(kv.getQualifier()) + "t");  
  81.             System.out.println("value        : " + new String(kv.getValue()));  
  82.         }  
  83.     }  
  84.   
  85.     public static void selectRowKeyFamily(String tablename, String rowKey, String family) throws IOException  
  86.     {  
  87.         HTable table = new HTable(configuration, tablename);  
  88.         Get g = new Get(rowKey.getBytes());  
  89.         g.addFamily(Bytes.toBytes(family));  
  90.         Result rs = table.get(g);  
  91.         for (KeyValue kv : rs.raw())  
  92.         {  
  93.             System.out.println("--------------------" + new String(kv.getRow()) + "----------------------------");  
  94.             System.out.println("Column Family: " + new String(kv.getFamily()));  
  95.             System.out.println("Column       :" + new String(kv.getQualifier()) + "t");  
  96.             System.out.println("value        : " + new String(kv.getValue()));  
  97.         }  
  98.     }  
  99.   
  100.     public static void selectRowKeyFamilyColumn(String tablename, String rowKey, String family, String column)  
  101.         throws IOException  
  102.     {  
  103.         HTable table = new HTable(configuration, tablename);  
  104.         Get g = new Get(rowKey.getBytes());  
  105.         g.addColumn(family.getBytes(), column.getBytes());  
  106.   
  107.         Result rs = table.get(g);  
  108.   
  109.         for (KeyValue kv : rs.raw())  
  110.         {  
  111.             System.out.println("--------------------" + new String(kv.getRow()) + "----------------------------");  
  112.             System.out.println("Column Family: " + new String(kv.getFamily()));  
  113.             System.out.println("Column       :" + new String(kv.getQualifier()) + "t");  
  114.             System.out.println("value        : " + new String(kv.getValue()));  
  115.         }  
  116.     }  
  117.   
  118.   
  119.   
  120.     private static final String USAGE = "Usage: DiffHbase [-o outfile] tablename infile filterColumns...";  
  121.   
  122.     /** 
  123.      * Prints the usage message and exists the program. 
  124.      *  
  125.      * @param message  The message to print first. 
  126.      */  
  127.     private static void printUsage(String message) {  
  128.         System.err.println(message);  
  129.         System.err.println(USAGE);  
  130.         throw new RuntimeException(USAGE);  
  131.     }  
  132.   
  133.     private static void PrintId(String id, Result rs){  
  134.         String value = Bytes.toString( rs.getValue(ColumnUtils.getFamily("info:url"), ColumnUtils.getQualifier("info:url")));  
  135.         if(value == null){  
  136.             System.out.println( id + "\tNULL");  
  137.         }else{  
  138.             System.out.println( id + "\t" + value);  
  139.         }  
  140.     }  
  141.   
  142.     private static void WriteId(String id, Result rs, FileOutputStream os){  
  143.         String value = Bytes.toString( rs.getValue(ColumnUtils.getFamily("info:url"), ColumnUtils.getQualifier("info:url")));  
  144.         try{  
  145.             if(value == null){  
  146.                 os.write( (id + "\tNULL\n").getBytes());  
  147.             }else{  
  148.                 os.write( (id + "\t" + value + "\n").getBytes());  
  149.             }  
  150.         }  
  151.         catch (IOException e) {  
  152.             e.printStackTrace();  
  153.         }  
  154.     }  
  155.   
  156.     private static void PrintRow(String id, Result rs){  
  157.   
  158.         System.out.println("--------------------" + id + "----------------------------");  
  159.         for (KeyValue kv : rs.raw())  
  160.         {  
  161.             System.out.println(new String(kv.getFamily()) + ":" + new String(kv.getQualifier()) + " : " + new String(kv.getValue()));  
  162.         }  
  163.     }  
  164.   
  165.     public static void main(String[] args) throws Exception  
  166.     {   
  167.         if (args.length < 3) {  
  168.             printUsage("Too few arguments");  
  169.         }  
  170.   
  171.         String outfile = null;  
  172.         String tablename = args[0];  
  173.         String dictfile  = args[1];  
  174.         int skilLen = 2;  
  175.   
  176.         if( args[0].equals("-o")){  
  177.             outfile = args[1];  
  178.             tablename = args[2];  
  179.             dictfile  = args[3];  
  180.             skilLen = 4;  
  181.         }  
  182.   
  183.         HTable table = new HTable(configuration, tablename);  
  184.   
  185.         String[] filterColumns = new String[args.length - skilLen];  
  186.         System.arraycopy(args, skilLen, filterColumns, 0, args.length - skilLen);  
  187.   
  188.         System.out.println("filterColumns: ");  
  189.         for(int i=0; i<filterColumns.length; ++i){  
  190.             System.out.println("\t" + filterColumns[i]);  
  191.         }  
  192.   
  193.         FileOutputStream os = null;  
  194.         if(outfile != null){  
  195.             os = new FileOutputStream(outfile);  
  196.         }  
  197.           
  198.         int count = 0;  
  199.         SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");//设置日期格式  
  200.   
  201.         File srcFile = new File(dictfile);  
  202.         FileInputStream in = new FileInputStream(srcFile);  
  203.         InputStreamReader isr = new InputStreamReader(in);  
  204.         BufferedReader br = new BufferedReader(isr);  
  205.         String read = null;  
  206.         while ((read = br.readLine()) != null) {  
  207.             String[] split = read.trim().split("\\s");   // space split  
  208.             if( split.length < 1 ){  
  209.                 System.out.println("Error line: " + read);  
  210.                 continue;  
  211.             }  
  212.   
  213.             if( ++count % 1000 == 0){  
  214.                 System.out.println(df.format(new Date()) + " : " + count + " rows processed." );  // new Date()为获取当前系统时间  
  215.             }  
  216.             // System.out.println("ROWKEY:" + split[0]);  
  217.   
  218.             Get g = new Get(split[0].getBytes());  
  219.             Result rs = table.get(g);  
  220.             if( rs == null){  
  221.                 System.out.println("No Result for " + split[0]);  
  222.                 continue;  
  223.             }  
  224.   
  225.             for(int i=0; i<filterColumns.length; ++i){  
  226.                 String value = Bytes.toString(rs.getValue(ColumnUtils.getFamily(filterColumns[i]), ColumnUtils.getQualifier(filterColumns[i])));  
  227.                 if(value == null){  
  228.                     if( os == null){  
  229.                         PrintId(split[0], rs);  
  230.                     }else{  
  231.                         WriteId(split[0], rs, os);  
  232.                     }  
  233.   
  234.                     // PrintRow(split[0], rs);  
  235.                     break;  
  236.                 }  
  237.             }  
  238.         }  
  239.   
  240.         br.close();  
  241.         isr.close();  
  242.         in.close();  
  243.   
  244.     }  
  245. }  
Java操作Hbase进行建表、删表以及对数据进行增删改查,条件查询

Hbase Mapreduce示例:全库扫描(大量数据):

  1. package com.hbase.mapreduce;  
  2.   
  3. import java.io.File;  
  4. import java.io.FileInputStream;  
  5. import java.io.IOException;  
  6. import java.util.ArrayList;  
  7. import java.util.List;  
  8.   
  9. import org.apache.hadoop.io.Text;  
  10. import org.apache.hadoop.conf.Configuration;  
  11. import org.apache.hadoop.fs.Path;  
  12. import org.apache.hadoop.hbase.HBaseConfiguration;  
  13. import org.apache.hadoop.hbase.HConstants;  
  14. import org.apache.hadoop.hbase.client.Scan;  
  15. import org.apache.hadoop.hbase.mapreduce.IdentityTableMapper;  
  16. import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;  
  17. import org.apache.hadoop.hbase.io.ImmutableBytesWritable;  
  18. import org.apache.hadoop.mapreduce.Job;  
  19. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
  20. import org.apache.hadoop.mapred.JobConf;  
  21. import org.apache.hadoop.util.GenericOptionsParser;  
  22.   
  23. import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;                                                                        
  24. import org.apache.hadoop.hbase.filter.CompareFilter;                                                                                  
  25. import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;                                                                        
  26. import org.apache.hadoop.hbase.filter.BinaryComparator;                                                                               
  27. import org.apache.hadoop.hbase.util.Bytes;   
  28.   
  29. import com.goodhope.utils.ColumnUtils;  
  30.   
  31. public class ExportHbase {  
  32.     private static final String INFOCATEGORY = "info:storecategory";  
  33.   
  34.     private static final String USAGE = "Usage: ExportHbase " +  
  35.         "-r <numReduceTasks> -indexConf <iconfFile>\n" +  
  36.         "-indexDir <indexDir> -webSite <amazon> [-needupdate <true> -isVisible -startTime <long>] -table <tableName> -columns <columnName1> " +  
  37.         "[<columnName2> ...]";  
  38.   
  39.     /** 
  40.      * Prints the usage message and exists the program. 
  41.      *  
  42.      * @param message  The message to print first. 
  43.      */  
  44.     private static void printUsage(String message) {  
  45.         System.err.println(message);  
  46.         System.err.println(USAGE);  
  47.         throw new RuntimeException(USAGE);  
  48.     }  
  49.   
  50.     /** 
  51.      * Creates a new job. 
  52.      * @param conf  
  53.      *  
  54.      * @param args  The command line arguments. 
  55.      * @throws IOException When reading the configuration fails. 
  56.      */  
  57.     public static Job createSubmittableJob(Configuration conf, String[] args)   
  58.         throws IOException {  
  59.         if (args.length < 7) {  
  60.             printUsage("Too few arguments");  
  61.         }  
  62.   
  63.         int numReduceTasks = 1;  
  64.         String iconfFile = null;  
  65.         String indexDir = null;  
  66.         String tableName = null;  
  67.         String website = null;  
  68.         String needupdate = "";  
  69.         String expectShopGrade = "";  
  70.         String dino = "6";  
  71.         String isdebug = "0";  
  72.         long debugThreshold = 10000;  
  73.         String debugThresholdStr = Long.toString(debugThreshold);  
  74.         String queue = "offline";  
  75.   
  76.         long endTime =  Long.MAX_VALUE;  
  77.         int maxversions = 1;  
  78.         long startTime = System.currentTimeMillis() - 28*24*60*60*1000l;  
  79.         long distartTime = System.currentTimeMillis() - 30*24*60*60*1000l;  
  80.         long diusedTime = System.currentTimeMillis() - 30*24*60*60*1000l;  
  81.         String startTimeStr = Long.toString(startTime);  
  82.         String diusedTimeStr = Long.toString(diusedTime);  
  83.         String quorum = null;  
  84.   
  85.         String isVisible = "";  
  86.         List<String> columns = new ArrayList<String>() ;    
  87.   
  88.         boolean bFilter = false;  
  89.   
  90.         // parse args  
  91.         for (int i = 0; i < args.length - 1; i++) {  
  92.             if ("-r".equals(args[i])) {  
  93.                 numReduceTasks = Integer.parseInt(args[++i]);  
  94.             } else if ("-indexConf".equals(args[i])) {  
  95.                 iconfFile = args[++i];  
  96.             } else if ("-indexDir".equals(args[i])) {  
  97.                 indexDir = args[++i];  
  98.             } else if ("-table".equals(args[i])) {  
  99.                 tableName = args[++i];  
  100.             } else if ("-webSite".equals(args[i])) {  
  101.                 website = args[++i];  
  102.             } else if ("-startTime".equals(args[i])) {  
  103.                 startTimeStr = args[++i];  
  104.                 startTime = Long.parseLong(startTimeStr);  
  105.             } else if ("-needupdate".equals(args[i])) {  
  106.                 needupdate = args[++i];  
  107.             } else if ("-isVisible".equals(args[i])) {  
  108.                 isVisible = "true";  
  109.             } else if ("-shopgrade".equals(args[i])) {  
  110.                 expectShopGrade = args[++i];   
  111.             } else if ("-queue".equals(args[i])) {  
  112.                 queue = args[++i];  
  113.             } else if ("-dino".equals(args[i])) {  
  114.                 dino = args[++i];  
  115.             } else if ("-maxversions".equals(args[i])) {  
  116.                 maxversions = Integer.parseInt(args[++i]);  
  117.             } else if ("-distartTime".equals(args[i])) {  
  118.                 distartTime = Long.parseLong(args[++i]);   
  119.             } else if ("-diendTime".equals(args[i])) {  
  120.                 endTime = Long.parseLong(args[++i]);  
  121.             } else if ("-diusedTime".equals(args[i])) {  
  122.                 diusedTimeStr = args[++i];  
  123.                 diusedTime = Long.parseLong(diusedTimeStr);  
  124.             } else if ("-quorum".equals(args[i])) {  
  125.                 quorum = args[++i];  
  126.             } else if ("-filter".equals(args[i])) {  
  127.                 bFilter = true;  
  128.             } else if ("-columns".equals(args[i])) {  
  129.                 columns.add(args[++i]);  
  130.                 while (i + 1 < args.length && !args[i + 1].startsWith("-")) {  
  131.                     String columnname = args[++i];  
  132.                     columns.add(columnname);  
  133.                     System.out.println("args column----: " + columnname);  
  134.                 }  
  135.             } else if ("-debugThreshold".equals(args[i])) {  
  136.                 isdebug = "1";  
  137.                 debugThresholdStr = args[++i];  
  138.                 debugThreshold =  Long.parseLong( debugThresholdStr );  
  139.             }  
  140.             else {  
  141.                 printUsage("Unsupported option " + args[i]);  
  142.             }  
  143.         }  
  144.   
  145.         if (distartTime > endTime) {  
  146.             printUsage("distartTime must <= diendTime");    
  147.         }  
  148.   
  149.         if (indexDir == null || tableName == null || columns.isEmpty()) {  
  150.             printUsage("Index directory, table name and at least one column must " +  
  151.                     "be specified");  
  152.         }  
  153.   
  154.         if (iconfFile != null) {  
  155.             // set index configuration content from a file  
  156.             String content = readContent(iconfFile);  
  157.             conf.set("hbase.index.conf", content);  
  158.             conf.set("hbase.website.name", website);  
  159.             conf.set("hbase.needupdate.productDB", needupdate);  
  160.             conf.set("hbase.expect.shopgrade", expectShopGrade);  
  161.             conf.set("hbase.di.no", dino);  
  162.             conf.set("hbase.expect.item.visible", isVisible);  
  163.             conf.set("hbase.index.startTime", startTimeStr);  
  164.             conf.set("hbase.index.diusedTime", diusedTimeStr);  
  165.             conf.set("hbase.index.debugThreshold", debugThresholdStr);  
  166.             conf.set("hbase.index.debug", isdebug);  
  167.             if (quorum != null) {  
  168.                 conf.set("hbase.zookeeper.quorum", quorum);  
  169.             }  
  170.             String temp = "";  
  171.             for (String column : columns) {  
  172.                 temp = temp + column + "|";  
  173.             }  
  174.             temp = temp.substring(0, temp.length() - 1);  
  175.             conf.set("hbase.index.column", temp);  
  176.             System.out.println("hbase.index.column: " + temp);  
  177.         }  
  178.   
  179.   
  180.         Job job = new Job(conf, "export data from table " + tableName);  
  181.         ((JobConf) job.getConfiguration()).setQueueName(queue);  
  182.   
  183.         // number of indexes to partition into  
  184.         job.setNumReduceTasks(numReduceTasks);  
  185.         Scan scan = new Scan();  
  186.         scan.setCacheBlocks(false);  
  187.   
  188.         // limit scan range  
  189.         scan.setTimeRange(distartTime, endTime);  
  190.         //  scan.setMaxVersions(maxversions);  
  191.         scan.setMaxVersions(1);  
  192.   
  193.         /* limit scan columns */  
  194.         for (String column : columns) {  
  195.             scan.addColumn(ColumnUtils.getFamily(column), ColumnUtils.getQualifier(column));  
  196.             scan.addFamily(ColumnUtils.getFamily(column));  
  197.         }  
  198.   
  199.         // set filter  
  200.         if( bFilter ){  
  201.             System.out.println("only export guangtaobao data. ");  
  202.             SingleColumnValueFilter filter = new SingleColumnValueFilter(  
  203.                     Bytes.toBytes("info"),  
  204.                     Bytes.toBytes("producttype"),  
  205.                     CompareFilter.CompareOp.EQUAL,  
  206.                     new BinaryComparator(Bytes.toBytes("guangtaobao")) );  
  207.             filter.setFilterIfMissing(true);  
  208.             scan.setFilter(filter);  
  209.         }  
  210.   
  211.         TableMapReduceUtil.initTableMapperJob(tableName, scan, ExportHbaseMapper.class,  
  212.                 Text.class, Text.class, job);  
  213.         // job.setReducerClass(ExportHbaseReducer.class);  
  214.         FileOutputFormat.setOutputPath(job, new Path(indexDir));  
  215.   
  216.   
  217.         return job;  
  218.     }  
  219.   
  220.     /** 
  221.      * Reads xml file of indexing configurations.  The xml format is similar to 
  222.      * hbase-default.xml and hadoop-default.xml. For an example configuration, 
  223.      * see the <code>createIndexConfContent</code> method in TestTableIndex. 
  224.      *  
  225.      * @param fileName  The file to read. 
  226.      * @return XML configuration read from file. 
  227.      * @throws IOException When the XML is broken. 
  228.      */  
  229.     private static String readContent(String fileName) throws IOException {  
  230.         File file = new File(fileName);  
  231.         int length = (int) file.length();  
  232.         if (length == 0) {  
  233.             printUsage("Index configuration file " + fileName + " does not exist");  
  234.         }  
  235.   
  236.         int bytesRead = 0;  
  237.         byte[] bytes = new byte[length];  
  238.         FileInputStream fis = new FileInputStream(file);  
  239.   
  240.         try {  
  241.             // read entire file into content  
  242.             while (bytesRead < length) {  
  243.                 int read = fis.read(bytes, bytesRead, length - bytesRead);  
  244.                 if (read > 0) {  
  245.                     bytesRead += read;  
  246.                 } else {  
  247.                     break;  
  248.                 }  
  249.             }  
  250.         } finally {  
  251.             fis.close();  
  252.         }  
  253.   
  254.         return new String(bytes, 0, bytesRead, HConstants.UTF8_ENCODING);  
  255.     }  
  256.   
  257.     /** 
  258.      * The main entry point. 
  259.      *  
  260.      * @param args  The command line arguments. 
  261.      * @throws Exception When running the job fails. 
  262.      */  
  263.     public static void main(String[] args) throws Exception {  
  264.         Configuration conf = HBaseConfiguration.create();  
  265.         String[] otherArgs =   
  266.             new GenericOptionsParser(conf, args).getRemainingArgs();  
  267.         Job job = createSubmittableJob(conf, otherArgs);  
  268.         System.exit(job.waitForCompletion(true) ? 0 : 1);  
  269.     }  
  270.   
  271. }  
  272.   
  273. //////////////////////////////////////////////////////////  
  274.   
  275. package com.hbase.mapreduce;  
  276.   
  277. import java.io.IOException;  
  278. import java.util.List;  
  279. import java.util.ArrayList;  
  280. import java.lang.String;  
  281. import java.lang.StringBuffer;  
  282.   
  283. import org.apache.hadoop.io.Text;  
  284. import org.apache.hadoop.conf.Configurable;  
  285. import org.apache.hadoop.conf.Configuration;  
  286. import org.apache.commons.lang.StringUtils;  
  287. import org.apache.hadoop.hbase.client.Result;  
  288. import org.apache.hadoop.hbase.io.ImmutableBytesWritable;  
  289. import org.apache.hadoop.hbase.mapreduce.TableMapper;  
  290. import org.apache.hadoop.hbase.util.Bytes;  
  291. import org.apache.hadoop.hbase.KeyValue;  
  292.   
  293. import com.goodhope.utils.ColumnUtils;  
  294.   
  295.   
  296. /** 
  297.  * Pass the given key and record as-is to the reduce phase. 
  298.  */  
  299. @SuppressWarnings("deprecation")  
  300. public class ExportHbaseMapper extends TableMapper<Text,Text> implements Configurable {  
  301.     private static final Text keyTEXT = new Text();  
  302.     private static final Text SENDTEXT = new Text();  
  303.   
  304.     private Configuration conf = null;  
  305.   
  306.     private long startTime = 0;  
  307.     List<String> columnMap = null;  
  308.   
  309.     private long rCount = 0;  
  310.     private long errCount = 0;  
  311.     private int  debug  = 0;  
  312.     private long thresCount  = 10000;  
  313.   
  314.     public void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {  
  315.   
  316.         rCount++;  
  317.   
  318.         String itemid = Bytes.toString(key.get());  
  319.         if (itemid.contains("&")) {  
  320.             context.getCounter("Error""rowkey contains \"&\"").increment(1);  
  321.             return;  
  322.         }  
  323.   
  324.         StringBuffer outstr = new StringBuffer();  
  325.         for (String col : columnMap) {  
  326.   
  327.             String tmp = Bytes.toString(value.getValue(ColumnUtils.getFamily(col), ColumnUtils.getQualifier(col)));  
  328.             if (tmp == null){  
  329.                 context.getCounter("Error", col+" No value in hbase").increment(1);  
  330.                   
  331.                 errCount++;  
  332.                 if( debug > 0 && (errCount % thresCount == 0)){  
  333.                     System.err.println( itemid + ": doesn't has " + col + " data!");  
  334.                 }  
  335.   
  336.                 outstr.append("NULL" + "\t");  
  337.             }else{  
  338.                 if( tmp.contains("guangtaobao") ){  
  339.                     outstr.append("1" + "\t");  
  340.                 }else{  
  341.                     outstr.append(tmp.trim() + "\t");  
  342.                 }  
  343.             }  
  344.         }  
  345.   
  346.         if ( ! outstr.toString().isEmpty() ) {  
  347.   
  348.             SENDTEXT.set( outstr.toString() );  
  349.             keyTEXT.set(itemid);  
  350.             context.write(keyTEXT, SENDTEXT);  
  351.   
  352.             if( debug > 0 && (rCount % thresCount*10000 == 0)){  
  353.                 System.out.println( SENDTEXT.toString() + keyTEXT.toString() );  
  354.             }  
  355.         }  
  356.         else  
  357.         {  
  358.             context.getCounter("Error""No Colume output").increment(1);  
  359.             return;  
  360.         }  
  361.     }  
  362.   
  363.     /** 
  364.      * Returns the current configuration. 
  365.      *  
  366.      * @return The current configuration. 
  367.      * @see org.apache.hadoop.conf.Configurable#getConf() 
  368.      */  
  369.     @Override  
  370.         public Configuration getConf() {  
  371.             return conf;  
  372.         }  
  373.   
  374.     /** 
  375.      * Sets the configuration. This is used to set up the index configuration. 
  376.      *  
  377.      * @param configuration 
  378.      *            The configuration to set. 
  379.      * @see org.apache.hadoop.conf.Configurable#setConf(org.apache.hadoop.conf.Configuration) 
  380.      */  
  381.     @Override  
  382.         public void setConf(Configuration configuration) {  
  383.             this.conf = configuration;  
  384.               
  385.             startTime = Long.parseLong(conf.get("hbase.index.startTime"));  
  386.             thresCount = Long.parseLong(conf.get("hbase.index.debugThreshold"));  
  387.             debug = Integer.parseInt(conf.get("hbase.index.debug"));  
  388.   
  389.             String[] columns = conf.get("hbase.index.column").split("\\|");  
  390.   
  391.             columnMap = new ArrayList<String>();  
  392.             for (String column : columns) {  
  393.                 System.out.println("Output column: " + column);  
  394.   
  395.                 columnMap.add(column);  
  396.             }  
  397.   
  398.         }  
  399.   
  400. }  
  401.   
  402.   
  403. //////////////////////////////////////////////////////////  
  404.   
  405. package com.hbase.utils;  
  406.   
  407. import org.apache.hadoop.hbase.util.Bytes;  
  408.   
  409. public class ColumnUtils {  
  410.   
  411.         public static byte[] getFamily(String column){  
  412.                 return getBytes(column, 0);  
  413.         }  
  414.   
  415.         public static byte[] getQualifier(String column){  
  416.                 return getBytes(column, 1);  
  417.         }  
  418.   
  419.         private static byte[] getBytes(String column , int offset){  
  420.                 String[] split = column.split(":");  
  421.                 return Bytes.toBytes(offset > split.length -1 ? split[0] :split[offset]);  
  422.         }  
  423. }  
Java操作Hbase进行建表、删表以及对数据进行增删改查,条件查询