1. 配置文件
添加一个配置文件 :
hbase.zookeeper.property.clientPort=2181
hbase.zookeeper.quorum=192.168.10.120
hbase.master=192.168.10.120:60020
fs.defaultFS=hdfs://192.168.10.120:19000
delete-connection=true
其中前两行为zk的地址和端口
第三行为hbase的master节点地址和端口,即hbase-site.xml文件中的配置
第四行为hadoop中core-site.xml的配置
第五行暂时不知道什么意思
Spring的配置文件:
<bean id="appProperty" class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
<property name="locations">
<array>
<value>classpath:/config/filter-dev-harve.properties</value>
</array>
</property>
</bean>
<!-- 扫描包 -->
<context:component-scan base-package="cn.harvetech.normal.dao"></context:component-scan>
<context:component-scan base-package="cn.harvetech.normal.util"></context:component-scan>
<context:component-scan base-package="cn.harvetech.normal.service"></context:component-scan>
<context:component-scan base-package="cn.harvetech.normal.hbase"> </context:component-scan>
<hdp:configuration>fs.defaultFS=${fs.defaultFS}</hdp:configuration>
<hdp:hbase-configuration stop-proxy="false" delete-connection="${delete-connection}" zk-quorum="${hbase.zookeeper.quorum}" zk-port="${hbase.zookeeper.property.clientPort}"/>
<!-- 加载hbase 配置文件 -->
<!-- <hadoop:configuration id="hadoopConfig" resources="classpath:/config/hbase-site.xml"></hadoop:configuration>
-->
<!--加载 模板 -->
<!-- <hadoop:hbase-configuration id="hbaseConfiguration" configuration-ref="hadoopConfig"></hadoop:hbase-configuration>
-->
<bean id="hbaseTemplate" class="org.springframework.data.hadoop.hbase.HbaseTemplate">
<property name="configuration" ref="hbaseConfiguration"></property>
<property name="encoding" value="UTF-8"></property>
</bean>
<!-- 自定义模板 -->
<bean id="hbaseCustomTemplate" class="cn.harvetech.normal.hbase.HbaseCustomTemplate">
<property name="configuration" ref="hbaseConfiguration"/>
<property name="encoding" value="utf-8"/>
</bean>
上面第一段是用来解析属性文件的;
第二段只是扫描一些注解的类
第三段是将属性文件中配置的四个参数值初始化给hbaseConfiguration
第四段是两个模板,其中configration都是上面刚初始化的hbaseConfiguration
hbaseTemplate模板用于hbase的增删改查;
hbaseCustomTemplate模板是个自定义的类,用于获取hbase的table,其实没什么必要,本文暂略掉。
注:也可使用屏蔽掉的代码替换第三段中的两行配置,即自己定义一个xml的配置文件,然后将那四个参数写到配置中,这里再起个名称为hbaseConfiguration(上面的hbaseConfiguration是标准的,这里的hbaseConfiguration只是一个名称,可以自定义为其他名字)的id,将配置信息初始化到此bean中,再去初始化下面的两个模板。如果这样,对应的xml配置文件内容如下:
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>hbase.zookeeper.quorum</name>
<value>192.168.10.120</value>
</property>
<property>
<name>hbase.zookeeper.property.clientPort</name>
<value>2181</value>
</property>
<property>
<name>hbase.master</name>
<value>192.168.10.120:60020</value>
</property>
<property>
<name>fs.defaultFS</name>
<value>hdfs://192.168.10.120:19000</value>
</property>
<property>
<name>delete-connection</name>
<value>true</value>
</property>
<property>
<name>hive.url</name>
<value>jdbc:hive://192.168.10.120:10002/default</value>
</property>
</configuration>
2. 注解
在dao层直接使用注解:
@Resource
private HbaseTemplate hbaseTemplate;
拿到上面spring配置文件中配置的hbaseTemplate模板bean。
然后就可以通过get方法读取hbase了,如:
@SuppressWarnings("deprecation")
@Repository
public class HbaseTemplateDaoImpl<T> implements HbaseTemplateDao {
@Resource
private HbaseTemplate hbaseTemplate;
private FilterList filterlist;
static final byte[] POSTFIX = new byte[] { 0x00 };
@Override
public Map<String, Object> get(String tableName, String rowKey) throws Exception {
Assert.notNull(hbaseTemplate);
return this.hbaseTemplate.get(tableName, rowKey, new RowMapper<Map<String, Object>>() {
。。。。。。
});
@Override
public void put(String tableName, final String rowKey, final String[] families, final String[] qualifiers, final byte[][] values) throws Exception {
Assert.notNull(hbaseTemplate);
this.hbaseTemplate.execute(tableName, new TableCallback<T>() {
。。。。。。
}
});
}
。。。。。。
}
}
@Repository
public class HbaseDAO {
//表名
public static final String TABLENAME = "harve_commodity";
@Resource
private HbaseTemplateDao hbaseTemplateDao;
public Commodity getList(String id){
Commodity obj = null;
Map<String, Object> map = null;
try {
map = hbaseTemplateDao.get(TABLENAME, id);
} catch (Exception e) {
System.out.println("失败");
e.printStackTrace();
}
if (map != null) {
try {
obj = HbaseUtil.parseMap(map, Commodity.class, HbaseUtil.FAMILYNAME);
} catch (Exception e) {
System.out.println("失败");
}
}
return obj;
}
}
service层非常简单,如下:
@Service
public class HbaseService {
@Resource
private HbaseDAO hbaseDAO;
public Commodity getList(String id){
Commodity list = hbaseDAO.getList(id);
return list;
}
}
controller层如下:
@Controller
public class HbaseController {
@Resource
private HbaseService hbaseService;
@RequestMapping("list")
public String getList(){
String id = "1";
Commodity list = hbaseService.getList(id);
System.out.println(list.getCategoryName());
System.out.println(list.getCommodityCode());
return "list";
}
}
访问页面,可以查询出id为1的记录。
HbaseUtil.parseMap是一个工具类,将hbase查出来的结果,通过反射,返回一个具体的类对象。
内容如下:
@SuppressWarnings({ "unchecked", "rawtypes" })
public static <E> E parseMap(Map<String, Object> map, Class clazz, String family) throws Exception {
Assert.notNull(map);
E bean = null;
Object data = null;
if ((data = map.get(FAMILYNAME + "_id")) != null) {
Class<?>[] paramTypes = { String.class };
Object[] params = { Bytes.toString((byte[]) data) };
Constructor<E> con = clazz.getConstructor(paramTypes);
bean = con.newInstance(params);
for (; clazz != Object.class; clazz = clazz.getSuperclass()) {
for (Field field : clazz.getDeclaredFields()) {
boolean access = field.isAccessible();
field.setAccessible(true);
String key = family + "_" + field.getName();
if (map.containsKey(key)) {
Object value = map.get(key);
if (value != null) {
if (field.getType().isAssignableFrom(String.class)) {
field.set(bean, Bytes.toString((byte[]) value));
} else if (field.getType().isAssignableFrom(DateTime.class)) {
field.set(bean, DateTime.parse(Bytes.toString((byte[]) value), DateTimeFormat.forPattern(RegexUtil.DATETIME_FORMATTER)));
} else if ((field.getType() == Integer.TYPE) || field.getType().isAssignableFrom(Integer.class)) {
field.set(bean, Bytes.toInt((byte[]) value));
} else if ((field.getType() == Long.TYPE) || field.getType().isAssignableFrom(Long.class)) {
field.set(bean, Bytes.toLong((byte[]) value));
} else if ((field.getType() == Double.TYPE) || field.getType().isAssignableFrom(Double.class)) {
field.set(bean, Bytes.toDouble((byte[]) value));
} else if ((field.getType() == Float.TYPE) || field.getType().isAssignableFrom(Float.class)) {
field.set(bean, Bytes.toFloat((byte[]) value));
} else if ((field.getType() == Boolean.TYPE) || field.getType().isAssignableFrom(Boolean.class)) {
field.set(bean, Bytes.toBoolean((byte[]) value));
} else if (field.getType().isAssignableFrom(Set.class)) {
field.set(bean, HbaseUtil.json2Set(Bytes.toString((byte[]) value)));
} else if (field.getType().isAssignableFrom(List.class)) {
field.set(bean, HbaseUtil.json2List(Bytes.toString((byte[]) value)));
} else if (field.getType().isAssignableFrom(Map.class)) {
field.set(bean, HbaseUtil.json2Map(Bytes.toString((byte[]) value)));
} else if (field.getType().isArray()) {
field.set(bean, HbaseUtil.json2Array(Bytes.toString((byte[]) value)));
}
}
}
field.setAccessible(access);
}
}
}
return bean;
}
3. 添加记录
4. 修改记录
添加和修改记录使用同一方法:
@Resource
private HbaseTemplateDao hbaseTemplateDao;
// 添加或修改记录
public <E> int addAndUpdateRecord(E obj, String rowKey, String tableName) throws Exception {
int result = HbaseUtil.HBASE_FAILED;
if (obj == null) {
return result;
}
HbaseUtil.putFields(obj, hbaseTemplateDao, tableName, rowKey, HbaseUtil.FAMILYNAME);
result = HbaseUtil.HBASE_SUCCESS;
return result;
}
/**
* 将obj入HBase数据库
* @param obj 数据对象值
* @param hbaseTemplateDao
* @param tableName
* @param rowKey
* @param family
* @throws Exception
*/
public static <E> void putFields(E obj, HbaseTemplateDao hbaseTemplateDao, String tableName, String rowKey, String family) throws Exception {
if (obj == null) {
return;
}
List<String> families = Lists.newArrayList();
List<String> qualifiers = Lists.newArrayList();
List<byte[]> values = Lists.newArrayList();
Class<?> clazz = obj.getClass();
for (; clazz != Object.class; clazz = clazz.getSuperclass()) {
for (Field field : clazz.getDeclaredFields()) {
boolean access = field.isAccessible();
field.setAccessible(true);
Object value = field.get(obj);
if ((value != null) && (field.getAnnotation(DbIgnore.class) == null)) { // 需判断DbIgnore注解
byte[] valueBytes = null;
if (field.getType().isAssignableFrom(String.class)) {
valueBytes = Bytes.toBytes((String) value);
} else if (field.getType().isAssignableFrom(DateTime.class)) {
valueBytes = Bytes.toBytes(((DateTime) value).toString(RegexUtil.DATETIME_FORMATTER));
} else if ((field.getType() == Integer.TYPE) || field.getType().isAssignableFrom(Integer.class)) {
valueBytes = Bytes.toBytes((Integer) value);
} else if ((field.getType() == Long.TYPE) || field.getType().isAssignableFrom(Long.class)) {
if ("serialVersionUID".equals(field.getName())) {
continue;
}
valueBytes = Bytes.toBytes((Long) value);
} else if ((field.getType() == Double.TYPE) || field.getType().isAssignableFrom(Double.class)) {
valueBytes = Bytes.toBytes((Double) value);
} else if ((field.getType() == Float.TYPE) || field.getType().isAssignableFrom(Float.class)) {
valueBytes = Bytes.toBytes((Float) value);
} else if ((field.getType() == Boolean.TYPE) || field.getType().isAssignableFrom(Boolean.class)) {
valueBytes = Bytes.toBytes((Boolean) value);
} else if (field.getType().isAssignableFrom(Set.class)) {
Set<?> set = (Set<?>) value;
valueBytes = Bytes.toBytes(HbaseUtil.set2Json(set));
} else if (field.getType().isAssignableFrom(List.class)) {
List<?> list = (List<?>) value;
valueBytes = Bytes.toBytes(HbaseUtil.list2Json(list));
} else if (field.getType().isAssignableFrom(Map.class)) {
Map<?, ?> map = (Map<?, ?>) value;
valueBytes = Bytes.toBytes(HbaseUtil.map2Json(map));
} else if (field.getType().isAssignableFrom(Array.class)) {
Object[] array = (Object[]) value;
valueBytes = Bytes.toBytes(HbaseUtil.array2Json(array));
}
if (valueBytes != null) {
families.add(family);
qualifiers.add(field.getName());
values.add(valueBytes);
}
}
field.setAccessible(access);
}
}
if (values.size() > 0) {
hbaseTemplateDao.put(tableName, rowKey, FluentIterable.from(families).toArray(String.class), FluentIterable.from(qualifiers).toArray(String.class), FluentIterable.from(values).toArray(byte[].class));
}
return;
}
@SuppressWarnings("deprecation")
@Repository
public class HbaseTemplateDaoImpl<T> implements HbaseTemplateDao {
@Resource
private HbaseTemplate hbaseTemplate;
private FilterList filterlist;
static final byte[] POSTFIX = new byte[] { 0x00 };
@Override
public void put(String tableName, final String rowKey, final String[] families, final String[] qualifiers, final byte[][] values) throws Exception {
Assert.notNull(hbaseTemplate);
this.hbaseTemplate.execute(tableName, new TableCallback<T>() {
@Override
public T doInTable(HTableInterface table) throws Throwable {
Put p = new Put(Bytes.toBytes(rowKey));
for (int i = 0; i < families.length; i++) {
p.addColumn(Bytes.toBytes(families[i]), Bytes.toBytes(qualifiers[i]), (values[i] == null) ? Bytes.toBytes("") : values[i]);
}
table.put(p);
return null;
}
});
}
......
}
5. 扫描整张表
public List<Map<String, Object>> find(String tableName, String startRow, String stopRow) throws Exception {
Assert.notNull(hbaseTemplate);
Scan scan = new Scan();
if (startRow != null) {
startRow += String.valueOf(POSTFIX);
scan.setStartRow(Bytes.toBytes(startRow));
}
if (stopRow != null) {
stopRow += String.valueOf(POSTFIX);
scan.setStopRow(Bytes.toBytes(stopRow));
}
if (filterlist != null) {
scan.setFilter(filterlist);
}
return this.hbaseTemplate.find(tableName, scan, new RowMapper<Map<String, Object>>() {
@Override
public Map<String, Object> mapRow(Result result, int rowNum) throws Exception {
List<Cell> ceList = result.listCells();
Map<String, Object> map = new HashMap<>();
if ((ceList != null) && (ceList.size() > 0)) {
String rowKey = "";
for (Cell cell : ceList) {
rowKey = Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
String family = Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength());
String qualifier = Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
byte[] value = Bytes.copy(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
map.put(family + "_" + qualifier, value);
}
map.put("rowKey", rowKey);
}
return map;
}
});
}
6. 通过rowkey查询记录
// 通过rowkey获取记录
public Object getObjById(String tablename,String id, Class className){
Object obj = null;
Map<String, Object> map = null;
try {
map = hbaseTemplateDao.get(tablename, id);
} catch (Exception e) {
System.out.println("失败");
e.printStackTrace();
}
if (map != null) {
try {
obj = HbaseUtil.parseMap(map, className, HbaseUtil.FAMILYNAME);
} catch (Exception e) {
System.out.println("失败");
}
}
return obj;
}
@SuppressWarnings("deprecation")
@Repository
public class HbaseTemplateDaoImpl<T> implements HbaseTemplateDao {
@Resource
private HbaseTemplate hbaseTemplate;
private FilterList filterlist;
static final byte[] POSTFIX = new byte[] { 0x00 };
@Override
public Map<String, Object> get(String tableName, String rowKey) throws Exception {
Assert.notNull(hbaseTemplate);
return this.hbaseTemplate.get(tableName, rowKey, new RowMapper<Map<String, Object>>() {
@Override
public Map<String, Object> mapRow(Result result, int rowNum) throws Exception {
return result2Map(result);
}
});
}
private Map<String, Object> result2Map(Result result) {
Map<String, Object> map = Maps.newHashMap();
if (result != null) {
List<Cell> cellList = result.listCells();
if ((cellList != null) && (cellList.size() > 0)) {
String rowKey = "";
for (Cell cell : cellList) {
rowKey = Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
String family = Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength());
String qualifier = Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
byte[] data = Bytes.copy(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
map.put(family + "_" + qualifier, data);
}
map.put("rowKey", rowKey);
}
}
return map;
}
}
7. 通过条件查询记录
① And查询
public List<Map<String, Object>> getListByOr(String tableName, List<String> fieldNames, List<String> fieldValues) throws Exception{
FilterList list = new FilterList(Operator.MUST_PASS_ALL);
for (int i = 0; i < fieldNames.size(); ++i){
Filter filter = new SingleColumnValueFilter(
Bytes.toBytes(HbaseUtil.FAMILYNAME), //列族
Bytes.toBytes(fieldNames.get(i)), //列名
CompareOp.EQUAL,
Bytes.toBytes(fieldValues.get(i)));
list.addFilter(filter);
}
Scan scan = new Scan();
scan.setFilter(list);
return this.hbaseTemplate.find(tableName, scan, new RowMapper<Map<String, Object>>() {
@Override
public Map<String, Object> mapRow(Result result, int rowNum) throws Exception {
List<Cell> ceList = result.listCells();
Map<String, Object> map = new HashMap<>();
if ((ceList != null) && (ceList.size() > 0)) {
String rowKey = "";
for (Cell cell : ceList) {
rowKey = Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
String family = Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength());
String qualifier = Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
byte[] value = Bytes.copy(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
map.put(family + "_" + qualifier, value);
}
map.put("rowKey", rowKey);
}
return map;
}
});
}
② Or查询
//数据只要满足一组过滤器中的一个就可以
FilterList list = new FilterList(Operator.MUST_PASS_ONE);
其他的一样
8. 获取所有记录
// 获取所有记录
public List<Map<String, Object>> scanTable(String tableName) throws Exception{
return hbaseTemplateDao.find(tableName, null, null);
}
9. 分页查询
通过rowkey来进行分页:
static final byte[] POSTFIX = new byte[] { 0x00 };
Scan scan = new Scan();
if (startRow != null) {
startRow += String.valueOf(POSTFIX);
scan.setStartRow(Bytes.toBytes(startRow));
}
if (stopRow != null) {
stopRow += String.valueOf(POSTFIX);
scan.setStopRow(Bytes.toBytes(stopRow));
}
hbaseTemplate.find(tableName, scan, new RowMapper<Map<String, Object>>() {}
通过条件进行分页,其实是查询满足条件的前多少条。相当于limit,详见过滤器中的pageFilter。
10. 删除记录
// 删除记录
public int delRecord(String tableName, String rowKey){
int result = HbaseUtil.HBASE_FAILED;
try {
hbaseTemplateDao.del(tableName, rowKey, null, null);
result = HbaseUtil.HBASE_SUCCESS;
} catch (Exception e) {
e.printStackTrace();
}
return result;
}
public void HbaseTemplateDaoImpl::del(final String tableName, final String rowKey, final String family, final String qualifier) throws Exception {
Assert.notNull(hbaseTemplate);
this.hbaseTemplate.execute(tableName, new TableCallback<T>() {
@Override
public T doInTable(HTableInterface table) throws Throwable {
Delete d = new Delete(rowKey.getBytes(Charsets.UTF_8));
if ((qualifier != null) && !"".equals(qualifier)) {
d.addColumn(family.getBytes(Charsets.UTF_8), qualifier.getBytes(Charsets.UTF_8));
}
table.delete(d);
return null;
}
});
}