hbase_在代码中使用(集成spring)

时间:2021-11-01 22:09:24

1. 配置文件

添加一个配置文件

 hbase_在代码中使用(集成spring)

hbase.zookeeper.property.clientPort=2181

hbase.zookeeper.quorum=192.168.10.120

hbase.master=192.168.10.120:60020

fs.defaultFS=hdfs://192.168.10.120:19000

delete-connection=true

其中前两行为zk的地址和端口

第三行为hbase的master节点地址和端口,即hbase-site.xml文件中的配置

第四行为hadoop中core-site.xml的配置

第五行暂时不知道什么意思

 

Spring的配置文件:

<bean id="appProperty" class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">

    <property name="locations">

        <array>

            <value>classpath:/config/filter-dev-harve.properties</value>

        </array>

    </property>

</bean>

<!-- 扫描包 -->

   <context:component-scan base-package="cn.harvetech.normal.dao"></context:component-scan>

   <context:component-scan base-package="cn.harvetech.normal.util"></context:component-scan>

   <context:component-scan base-package="cn.harvetech.normal.service"></context:component-scan>

   <context:component-scan base-package="cn.harvetech.normal.hbase"> </context:component-scan>

    

<hdp:configuration>fs.defaultFS=${fs.defaultFS}</hdp:configuration>

<hdp:hbase-configuration stop-proxy="false" delete-connection="${delete-connection}" zk-quorum="${hbase.zookeeper.quorum}" zk-port="${hbase.zookeeper.property.clientPort}"/>

 

<!-- 加载hbase  配置文件 -->

<!-- <hadoop:configuration id="hadoopConfig" resources="classpath:/config/hbase-site.xml"></hadoop:configuration>

 -->

<!--加载  模板  -->

<!-- <hadoop:hbase-configuration id="hbaseConfiguration" configuration-ref="hadoopConfig"></hadoop:hbase-configuration>    

    -->

<bean id="hbaseTemplate" class="org.springframework.data.hadoop.hbase.HbaseTemplate">

   <property name="configuration" ref="hbaseConfiguration"></property>

   <property name="encoding" value="UTF-8"></property>

</bean>    

<!-- 自定义模板 -->

<bean id="hbaseCustomTemplate" class="cn.harvetech.normal.hbase.HbaseCustomTemplate">

    <property name="configuration" ref="hbaseConfiguration"/>

    <property name="encoding" value="utf-8"/>

</bean>

    

上面第一段是用来解析属性文件的;

第二段只是扫描一些注解的类

第三段是将属性文件中配置的四个参数值初始化给hbaseConfiguration

第四段是两个模板,其中configration都是上面刚初始化的hbaseConfiguration

hbaseTemplate模板用于hbase的增删改查;

hbaseCustomTemplate模板是个自定义的类,用于获取hbasetable,其实没什么必要,本文暂略掉。

注:也可使用屏蔽掉的代码替换第三段中的两行配置,即自己定义一个xml的配置文件,然后将那四个参数写到配置中,这里再起个名称为hbaseConfiguration上面的hbaseConfiguration是标准的,这里的hbaseConfiguration只是一个名称,可以自定义为其他名字)id,将配置信息初始化到此bean中,再去初始化下面的两个模板。如果这样,对应的xml配置文件内容如下:

<?xml version="1.0"?>

<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<configuration>

<property>

<name>hbase.zookeeper.quorum</name>

<value>192.168.10.120</value>

</property>

 

<property>

<name>hbase.zookeeper.property.clientPort</name>

<value>2181</value>

</property>

 

<property>

<name>hbase.master</name>

<value>192.168.10.120:60020</value>

</property>

<property>

<name>fs.defaultFS</name>

<value>hdfs://192.168.10.120:19000</value>

</property>

<property>

<name>delete-connection</name>

<value>true</value>

</property>

<property>

<name>hive.url</name>

<value>jdbc:hive://192.168.10.120:10002/default</value>

</property>

</configuration>


2. 注解

dao层直接使用注解:

    @Resource

    private HbaseTemplate hbaseTemplate;

拿到上面spring配置文件中配置的hbaseTemplate模板bean

然后就可以通过get方法读取hbase了,如:

@SuppressWarnings("deprecation")

@Repository

public class HbaseTemplateDaoImpl<T> implements HbaseTemplateDao {

    @Resource

    private HbaseTemplate hbaseTemplate;

    private FilterList filterlist;

    static final byte[] POSTFIX = new byte[] { 0x00 };

    @Override

    public Map<String, Object> get(String tableName, String rowKey) throws Exception {

        Assert.notNull(hbaseTemplate);

        return this.hbaseTemplate.get(tableName, rowKey, new RowMapper<Map<String, Object>>() {

。。。。。。

        });

    @Override

    public void put(String tableName, final String rowKey, final String[] families, final String[] qualifiers, final byte[][] values) throws Exception {

        Assert.notNull(hbaseTemplate);

        this.hbaseTemplate.execute(tableName, new TableCallback<T>() {

。。。。。。

            }

        });

    }

。。。。。。

    }

}

@Repository

public class HbaseDAO {

 //表名

 public static final String TABLENAME = "harve_commodity";

 @Resource

 private HbaseTemplateDao hbaseTemplateDao;

 public Commodity getList(String id){

 Commodity obj = null;

 Map<String, Object> map = null;

 try {

 map = hbaseTemplateDao.get(TABLENAME, id);

 } catch (Exception e) {

 System.out.println("失败");

 e.printStackTrace();

 }

 if (map != null) {

 try {

 obj = HbaseUtil.parseMap(map, Commodity.class, HbaseUtil.FAMILYNAME);

 } catch (Exception e) {

 System.out.println("失败");

 }

 }

 

 return obj;

 }

}

 

service层非常简单,如下:

@Service

public class HbaseService {

@Resource

private HbaseDAO hbaseDAO;

public Commodity getList(String id){

Commodity list = hbaseDAO.getList(id);

return list;

}

}

controller层如下:

@Controller

public class HbaseController {

@Resource

private HbaseService hbaseService;

 

@RequestMapping("list")

public String getList(){

String id = "1";

Commodity list = hbaseService.getList(id);

 

System.out.println(list.getCategoryName());

System.out.println(list.getCommodityCode());

return "list";

}

}

访问页面,可以查询出id1的记录。

 

HbaseUtil.parseMap是一个工具类,将hbase查出来的结果,通过反射,返回一个具体的类对象。

内容如下:

@SuppressWarnings({ "unchecked", "rawtypes" })

public static <E> E parseMap(Map<String, Object> map, Class clazz, String family) throws Exception {

    Assert.notNull(map);

 

    E bean = null;

    Object data = null;

    if ((data = map.get(FAMILYNAME + "_id")) != null) {

        Class<?>[] paramTypes = { String.class };

        Object[] params = { Bytes.toString((byte[]) data) };

        Constructor<E> con = clazz.getConstructor(paramTypes);

        bean = con.newInstance(params);

 

        for (; clazz != Object.class; clazz = clazz.getSuperclass()) {

            for (Field field : clazz.getDeclaredFields()) {

                boolean access = field.isAccessible();

                field.setAccessible(true);

                String key = family + "_" + field.getName();

                if (map.containsKey(key)) {

                    Object value = map.get(key);

                    if (value != null) {

                        if (field.getType().isAssignableFrom(String.class)) {

                            field.set(bean, Bytes.toString((byte[]) value));

                        } else if (field.getType().isAssignableFrom(DateTime.class)) {

                            field.set(bean, DateTime.parse(Bytes.toString((byte[]) value), DateTimeFormat.forPattern(RegexUtil.DATETIME_FORMATTER)));

                        } else if ((field.getType() == Integer.TYPE) || field.getType().isAssignableFrom(Integer.class)) {

                            field.set(bean, Bytes.toInt((byte[]) value));

                        } else if ((field.getType() == Long.TYPE) || field.getType().isAssignableFrom(Long.class)) {

                            field.set(bean, Bytes.toLong((byte[]) value));

                        } else if ((field.getType() == Double.TYPE) || field.getType().isAssignableFrom(Double.class)) {

                            field.set(bean, Bytes.toDouble((byte[]) value));

                        } else if ((field.getType() == Float.TYPE) || field.getType().isAssignableFrom(Float.class)) {

                            field.set(bean, Bytes.toFloat((byte[]) value));

                        } else if ((field.getType() == Boolean.TYPE) || field.getType().isAssignableFrom(Boolean.class)) {

                            field.set(bean, Bytes.toBoolean((byte[]) value));

                        } else if (field.getType().isAssignableFrom(Set.class)) {

                            field.set(bean, HbaseUtil.json2Set(Bytes.toString((byte[]) value)));

                        } else if (field.getType().isAssignableFrom(List.class)) {

                            field.set(bean, HbaseUtil.json2List(Bytes.toString((byte[]) value)));

                        } else if (field.getType().isAssignableFrom(Map.class)) {

                            field.set(bean, HbaseUtil.json2Map(Bytes.toString((byte[]) value)));

                        } else if (field.getType().isArray()) {

                            field.set(bean, HbaseUtil.json2Array(Bytes.toString((byte[]) value)));

                        }

                    }

                }

                field.setAccessible(access);

            }

        }

    }

 

    return bean;

}

 

3. 添加记录

4. 修改记录

添加和修改记录使用同一方法:

 @Resource

 private HbaseTemplateDao hbaseTemplateDao;

 // 添加或修改记录

 public <E> int addAndUpdateRecord(E obj, String rowKey, String tableName) throws Exception {

 int result = HbaseUtil.HBASE_FAILED;

 if (obj == null) {

 return result;

 }

 

 HbaseUtil.putFields(obj, hbaseTemplateDao, tableName, rowKey, HbaseUtil.FAMILYNAME);

 result = HbaseUtil.HBASE_SUCCESS;

 return result;

 }

 

/**

 * 将objHBase数据库

 * @param obj 数据对象值

 * @param hbaseTemplateDao

 * @param tableName

 * @param rowKey

 * @param family

 * @throws Exception

 */

public static <E> void putFields(E obj, HbaseTemplateDao hbaseTemplateDao, String tableName, String rowKey, String family) throws Exception {

    if (obj == null) {

        return;

    }

 

    List<String> families = Lists.newArrayList();

    List<String> qualifiers = Lists.newArrayList();

    List<byte[]> values = Lists.newArrayList();

    Class<?> clazz = obj.getClass();

    for (; clazz != Object.class; clazz = clazz.getSuperclass()) {

        for (Field field : clazz.getDeclaredFields()) {

            boolean access = field.isAccessible();

            field.setAccessible(true);

            Object value = field.get(obj);

            if ((value != null) && (field.getAnnotation(DbIgnore.class) == null)) { // 需判断DbIgnore注解

                byte[] valueBytes = null;

                if (field.getType().isAssignableFrom(String.class)) {

                    valueBytes = Bytes.toBytes((String) value);

                } else if (field.getType().isAssignableFrom(DateTime.class)) {

                    valueBytes = Bytes.toBytes(((DateTime) value).toString(RegexUtil.DATETIME_FORMATTER));

                } else if ((field.getType() == Integer.TYPE) || field.getType().isAssignableFrom(Integer.class)) {

                    valueBytes = Bytes.toBytes((Integer) value);

                } else if ((field.getType() == Long.TYPE) || field.getType().isAssignableFrom(Long.class)) {

                    if ("serialVersionUID".equals(field.getName())) {

                        continue;

                    }

                    valueBytes = Bytes.toBytes((Long) value);

                } else if ((field.getType() == Double.TYPE) || field.getType().isAssignableFrom(Double.class)) {

                    valueBytes = Bytes.toBytes((Double) value);

                } else if ((field.getType() == Float.TYPE) || field.getType().isAssignableFrom(Float.class)) {

                    valueBytes = Bytes.toBytes((Float) value);

                } else if ((field.getType() == Boolean.TYPE) || field.getType().isAssignableFrom(Boolean.class)) {

                    valueBytes = Bytes.toBytes((Boolean) value);

                } else if (field.getType().isAssignableFrom(Set.class)) {

                    Set<?> set = (Set<?>) value;

                    valueBytes = Bytes.toBytes(HbaseUtil.set2Json(set));

                } else if (field.getType().isAssignableFrom(List.class)) {

                    List<?> list = (List<?>) value;

                    valueBytes = Bytes.toBytes(HbaseUtil.list2Json(list));

                } else if (field.getType().isAssignableFrom(Map.class)) {

                    Map<?, ?> map = (Map<?, ?>) value;

                    valueBytes = Bytes.toBytes(HbaseUtil.map2Json(map));

                } else if (field.getType().isAssignableFrom(Array.class)) {

                    Object[] array = (Object[]) value;

                    valueBytes = Bytes.toBytes(HbaseUtil.array2Json(array));

                }

                if (valueBytes != null) {

                    families.add(family);

                    qualifiers.add(field.getName());

                    values.add(valueBytes);

                }

            }

 

            field.setAccessible(access);

        }

    }

 

    if (values.size() > 0) {

        hbaseTemplateDao.put(tableName, rowKey, FluentIterable.from(families).toArray(String.class), FluentIterable.from(qualifiers).toArray(String.class), FluentIterable.from(values).toArray(byte[].class));

    }

 

    return;

}

@SuppressWarnings("deprecation")

@Repository

public class HbaseTemplateDaoImpl<T> implements HbaseTemplateDao {

    @Resource

    private HbaseTemplate hbaseTemplate;

    private FilterList filterlist;

    static final byte[] POSTFIX = new byte[] { 0x00 };

    @Override

    public void put(String tableName, final String rowKey, final String[] families, final String[] qualifiers, final byte[][] values) throws Exception {

        Assert.notNull(hbaseTemplate);

        this.hbaseTemplate.execute(tableName, new TableCallback<T>() {

            @Override

            public T doInTable(HTableInterface table) throws Throwable {

                Put p = new Put(Bytes.toBytes(rowKey));

                for (int i = 0; i < families.length; i++) {

                    p.addColumn(Bytes.toBytes(families[i]), Bytes.toBytes(qualifiers[i]), (values[i] == null) ? Bytes.toBytes("") : values[i]);

                }

                table.put(p);

                return null;

            }

        });

    }

......

}

 

5. 扫描整张表

public List<Map<String, Object>> find(String tableName, String startRow, String stopRow) throws Exception {

    Assert.notNull(hbaseTemplate);

    Scan scan = new Scan();

    if (startRow != null) {

        startRow += String.valueOf(POSTFIX);

        scan.setStartRow(Bytes.toBytes(startRow));

    }

 

    if (stopRow != null) {

        stopRow += String.valueOf(POSTFIX);

        scan.setStopRow(Bytes.toBytes(stopRow));

    }

 

    if (filterlist != null) {

        scan.setFilter(filterlist);

    }

 

    return this.hbaseTemplate.find(tableName, scan, new RowMapper<Map<String, Object>>() {

        @Override

        public Map<String, Object> mapRow(Result result, int rowNum) throws Exception {

            List<Cell> ceList = result.listCells();

            Map<String, Object> map = new HashMap<>();

            if ((ceList != null) && (ceList.size() > 0)) {

                String rowKey = "";

                for (Cell cell : ceList) {

                    rowKey = Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());

                    String family = Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength());

                    String qualifier = Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());

                    byte[] value = Bytes.copy(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());

                    map.put(family + "_" + qualifier, value);

                }

                map.put("rowKey", rowKey);

            }

 

            return map;

        }

    });

}

 

6. 通过rowkey查询记录

 // 通过rowkey获取记录

 public Object getObjById(String tablename,String id, Class className){

 Object obj = null;

 Map<String, Object> map = null;

 try {

 map = hbaseTemplateDao.get(tablename, id);

 } catch (Exception e) {

 System.out.println("失败");

 e.printStackTrace();

 }

 if (map != null) {

 try {

 obj = HbaseUtil.parseMap(map, className, HbaseUtil.FAMILYNAME);

 } catch (Exception e) {

 System.out.println("失败");

 }

 }

 return obj;

 }

 

@SuppressWarnings("deprecation")

@Repository

public class HbaseTemplateDaoImpl<T> implements HbaseTemplateDao {

    @Resource

    private HbaseTemplate hbaseTemplate;

    private FilterList filterlist;

    static final byte[] POSTFIX = new byte[] { 0x00 };

    @Override

    public Map<String, Object> get(String tableName, String rowKey) throws Exception {

        Assert.notNull(hbaseTemplate);

        return this.hbaseTemplate.get(tableName, rowKey, new RowMapper<Map<String, Object>>() {

            @Override

            public Map<String, Object> mapRow(Result result, int rowNum) throws Exception {

                return result2Map(result);

            }

        });

    }

    private Map<String, Object> result2Map(Result result) {

        Map<String, Object> map = Maps.newHashMap();

        if (result != null) {

            List<Cell> cellList = result.listCells();

            if ((cellList != null) && (cellList.size() > 0)) {

                String rowKey = "";

                for (Cell cell : cellList) {

                    rowKey = Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());

                    String family = Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength());

                    String qualifier = Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());

                    byte[] data = Bytes.copy(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());

                    map.put(family + "_" + qualifier, data);

                }

                map.put("rowKey", rowKey);

            }

        }

 

        return map;

    }

}

 

7. 通过条件查询记录

① And查询

public List<Map<String, Object>> getListByOr(String tableName, List<String> fieldNames, List<String> fieldValues) throws Exception{

FilterList list = new FilterList(Operator.MUST_PASS_ALL);

for (int i = 0; i < fieldNames.size(); ++i){

Filter filter =  new SingleColumnValueFilter(

Bytes.toBytes(HbaseUtil.FAMILYNAME),  //列族

Bytes.toBytes(fieldNames.get(i)),  //列名       

CompareOp.EQUAL,

Bytes.toBytes(fieldValues.get(i)));    

list.addFilter(filter);

}

Scan scan = new Scan();

scan.setFilter(list);

 

return this.hbaseTemplate.find(tableName, scan, new RowMapper<Map<String, Object>>() {

@Override

public Map<String, Object> mapRow(Result result, int rowNum) throws Exception {

List<Cell> ceList = result.listCells();

Map<String, Object> map = new HashMap<>();

if ((ceList != null) && (ceList.size() > 0)) {

String rowKey = "";

for (Cell cell : ceList) {

rowKey = Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());

String family = Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength());

String qualifier = Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());

byte[] value = Bytes.copy(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());

map.put(family + "_" + qualifier, value);

}

map.put("rowKey", rowKey);

}

 

return map;

}

});

}

 

② Or查询

//数据只要满足一组过滤器中的一个就可以

FilterList list = new FilterList(Operator.MUST_PASS_ONE);

其他的一样

 

8. 获取所有记录

// 获取所有记录

public List<Map<String, Object>> scanTable(String tableName) throws Exception{

return hbaseTemplateDao.find(tableName, null, null);

}

 

9. 分页查询

通过rowkey来进行分页:

static final byte[] POSTFIX = new byte[] { 0x00 };

Scan scan = new Scan();

if (startRow != null) {

    startRow += String.valueOf(POSTFIX);

    scan.setStartRow(Bytes.toBytes(startRow));

}

if (stopRow != null) {

    stopRow += String.valueOf(POSTFIX);

    scan.setStopRow(Bytes.toBytes(stopRow));

}

hbaseTemplate.find(tableName, scan, new RowMapper<Map<String, Object>>() {}

 

通过条件进行分页,其实是查询满足条件的前多少条。相当于limit,详见过滤器中的pageFilter

 

10. 删除记录

// 删除记录

public int delRecord(String tableName, String rowKey){

int result = HbaseUtil.HBASE_FAILED;

try {

hbaseTemplateDao.del(tableName, rowKey, null, null);

result = HbaseUtil.HBASE_SUCCESS;

} catch (Exception e) {

e.printStackTrace();

}

return result;

}

 

public void HbaseTemplateDaoImpl::del(final String tableName, final String rowKey, final String family, final String qualifier) throws Exception {

    Assert.notNull(hbaseTemplate);

    this.hbaseTemplate.execute(tableName, new TableCallback<T>() {

        @Override

        public T doInTable(HTableInterface table) throws Throwable {

            Delete d = new Delete(rowKey.getBytes(Charsets.UTF_8));

            if ((qualifier != null) && !"".equals(qualifier)) {

                d.addColumn(family.getBytes(Charsets.UTF_8), qualifier.getBytes(Charsets.UTF_8));

            }

            table.delete(d);

            return null;

        }

    });

}