批量(bulkload)载入数据到hbase

时间:2021-09-04 08:24:43

HBase提供了操作表的java api,但是这种方式是单条数据插入,对于大量数据载入来说效率太低。
对于批量数据导入,直接生成HBase的内部存储结构:HFile,并将其导入到Hbase中的效率无疑是最高。
步骤如下:

  1. 通过mapreduce将源数据导出为HFile文件
    HBaseBulkLoadDriver.java
import com.sq.platform.hbaseLoadService.constant.MapperType;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
* hbase批量载入工具
* @author lijiang
* @date 2015/7/28 16:25
*/

public class HBaseBulkLoadDriver extends Configured implements Tool {

private static final String DATA_SEPARATOR = "\t";
public static final String COLUMN_FAMILY = "data";

@Override
public int run(String[] args) throws Exception {
if (args.length != 2)
return -1;
int result = 0;
String inputPath = args[0];
String outputPath = args[1];

Configuration configuration = getConf();
configuration.set("DATA_SEPARATOR", DATA_SEPARATOR);
configuration.set("TABLE_NAME", tableName);
configuration.set("COLUMN_FAMILY", COLUMN_FAMILY);

Job job = Job.getInstance(configuration);
job.setJarByClass(HBaseBulkLoadDriver.class);
job.setJobName("Bulk Loading HBase Table::" + tableName);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);

// 设置Mapper
job.setMapperClass(CanMapper.class);
FileInputFormat.addInputPaths(job, inputPath);
FileSystem.getLocal(getConf()).delete(new Path(outputPath), true);
FileOutputFormat.setOutputPath(job, new Path(outputPath));
job.setMapOutputValueClass(Put.class);
HFileOutputFormat2.configureIncrementalLoad(job, new HTable(configuration, tableName));
job.waitForCompletion(true);
if (job.isSuccessful()) {
// mapreduce执行完成后进行批量hbase文件载入
HBaseBulkLoad.doBulkLoad(outputPath, tableName);
} else {
result = -1;
}

return result;
}

public static void main(String[] args) {
try {
int response = ToolRunner.run(HBaseConfiguration.create(), new HBaseBulkLoadDriver(), args);
if (response == 0) {
System.out.println("Job is successfully completed...");
} else {
System.out.println("Job failed...");
}
} catch (Exception exception) {
exception.printStackTrace();
}
}
}

CanMapper.java

import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;

import java.io.IOException;

/**
* @author lijiang
* @date 2015/8/14 10:50
*/

public class CanMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {

/**
* 数据字段分隔符
*/

protected String dataSeperator;

/**
* 列族
*/

protected String columnFamily;

/**
* 表名
*/

protected ImmutableBytesWritable hbaseTableName;

public void setup(Mapper.Context context) {
Configuration configuration = context.getConfiguration();
dataSeperator = configuration.get("DATA_SEPARATOR");
columnFamily = configuration.get("COLUMN_FAMILY");
hbaseTableName = new ImmutableBytesWritable(Bytes.toBytes(configuration.get("TABLE_NAME")));
}

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
try {
String[] values = value.toString().split(dataSeperator);
// tick作为rowkey
String rowKey = values[6];
Put put = new Put(Bytes.toBytes(rowKey));
put.add(Bytes.toBytes(columnFamily), Bytes.toBytes("type"), Bytes.toBytes(values[0]));
put.add(Bytes.toBytes(columnFamily), Bytes.toBytes("value"), Bytes.toBytes(values[1]));
put.add(Bytes.toBytes(columnFamily), Bytes.toBytes("typeTag"), Bytes.toBytes(values[2]));
put.add(Bytes.toBytes(columnFamily), Bytes.toBytes("result"), Bytes.toBytes(values[3]));
put.add(Bytes.toBytes(columnFamily), Bytes.toBytes("dm1Info"), Bytes.toBytes(values[4]));
put.add(Bytes.toBytes(columnFamily), Bytes.toBytes("dm1Code"), Bytes.toBytes(values[5]));
put.add(Bytes.toBytes(columnFamily), Bytes.toBytes("time_t"), Bytes.toBytes(values[7]));
put.add(Bytes.toBytes(columnFamily), Bytes.toBytes("stamp"), Bytes.toBytes(values[8]));
context.write(hbaseTableName, put);
} catch (Exception exception) {
exception.printStackTrace();
}
}
}

2 . 导入HFile文件到HBase
HBaseBulkLoad.java

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;

/**
* hbase批量载入
* @author lijiang
* @date 2015/7/29 10:31
*/

public class HBaseBulkLoad {
public static void doBulkLoad(String outputPath, String tableName) {
try {
Configuration configuration = new Configuration();
configuration.set("mapreduce.child.java.opts", "-Xmx1g");
HBaseConfiguration.addHbaseResources(configuration);
LoadIncrementalHFiles loadFfiles = new LoadIncrementalHFiles(configuration);
HTable hTable = new HTable(configuration, tableName);
loadFfiles.doBulkLoad(new Path(outputPath), hTable);
} catch (Exception exception) {
exception.printStackTrace();
}
}
}

maven依赖:

<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-common</artifactId>
<version>0.99.0</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>0.99.0</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>0.99.0</version>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
<version>1.9.13</version>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-asl</artifactId>
<version>1.9.13</version>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-jaxrs</artifactId>
<version>1.9.13</version>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-xc</artifactId>
<version>1.9.13</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-protocol</artifactId>
<version>0.99.0</version>
</dependency>