利用CombineFileInputFormat把netflix data set 导入到Hbase里

时间:2024-12-07 13:03:02

版权声明:本文为博主原创文章。未经博主同意不得转载。 https://blog.****.net/xiewenbo/article/details/25637931

package com.mr.test;

import java.io.IOException;

import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader;
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit; public class CombineSmallfileInputFormat extends CombineFileInputFormat<LongWritable, BytesWritable> { @Override
public RecordReader<LongWritable, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException { CombineFileSplit combineFileSplit = (CombineFileSplit) split;
CombineFileRecordReader<LongWritable, BytesWritable> recordReader = new CombineFileRecordReader<LongWritable, BytesWritable>(combineFileSplit, context, CombineSmallfileRecordReader.class);
try {
recordReader.initialize(combineFileSplit, context);
} catch (InterruptedException e) {
new RuntimeException("Error to initialize CombineSmallfileRecordReader.");
}
return recordReader;
} }
package com.mr.test;

import java.io.IOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.LineRecordReader; public class CombineSmallfileRecordReader extends RecordReader<LongWritable, BytesWritable> { private CombineFileSplit combineFileSplit;
private LineRecordReader lineRecordReader = new LineRecordReader();
private Path[] paths;
private int totalLength;
private int currentIndex;
private float currentProgress = 0;
private LongWritable currentKey;
private BytesWritable currentValue = new BytesWritable(); public CombineSmallfileRecordReader(CombineFileSplit combineFileSplit, TaskAttemptContext context, Integer index) throws IOException {
super();
this.combineFileSplit = combineFileSplit;
this.currentIndex = index; // 当前要处理的小文件Block在CombineFileSplit中的索引
} @Override
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
this.combineFileSplit = (CombineFileSplit) split;
// 处理CombineFileSplit中的一个小文件Block,由于使用LineRecordReader,须要构造一个FileSplit对象,然后才可以读取数据
FileSplit fileSplit = new FileSplit(combineFileSplit.getPath(currentIndex), combineFileSplit.getOffset(currentIndex), combineFileSplit.getLength(currentIndex), combineFileSplit.getLocations());
lineRecordReader.initialize(fileSplit, context); this.paths = combineFileSplit.getPaths();
totalLength = paths.length;
context.getConfiguration().set("map.input.file.name", combineFileSplit.getPath(currentIndex).getName());
} @Override
public LongWritable getCurrentKey() throws IOException, InterruptedException {
currentKey = lineRecordReader.getCurrentKey();
return currentKey;
} <strong><span style="color:#ff0000;"> @Override
public BytesWritable getCurrentValue() throws IOException, InterruptedException {
System.out.println("lineRecordReader:"+lineRecordReader.getCurrentValue().toString());
byte[] content = lineRecordReader.getCurrentValue().toString().getBytes();
System.out.println("content:"+new String(content));
currentValue = new BytesWritable();
currentValue.set(content, 0, content.length);
System.out.println("currentValue:"+new String(currentValue.getBytes()));
return currentValue;
}</span></strong>
public static void main(String args[]){
BytesWritable cv = new BytesWritable();
String str1 = "1234567";
String str2 = "123450";
cv.set(str1.getBytes(), 0, str1.getBytes().length);
System.out.println(new String(cv.getBytes())); cv.setCapacity(0); cv.set(str2.getBytes(), 0, str2.getBytes().length);
System.out.println(new String(cv.getBytes()));
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if (currentIndex >= 0 && currentIndex < totalLength) {
return lineRecordReader.nextKeyValue();
} else {
return false;
}
} @Override
public float getProgress() throws IOException {
if (currentIndex >= 0 && currentIndex < totalLength) {
currentProgress = (float) currentIndex / totalLength;
return currentProgress;
}
return currentProgress;
} @Override
public void close() throws IOException {
lineRecordReader.close();
}
}

package com.mr.test;

import java.io.IOException;
import java.util.Iterator; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.GenericOptionsParser; public class BulkImportData { public static class TokenizerMapper extends
Mapper<Object, BytesWritable, Text, Text> {
public Text _key = new Text();
public Text _value = new Text();
public void map(Object key, BytesWritable value, Context context)
throws IOException, InterruptedException {
_value.set(value.getBytes());
String tmp = _value.toString().trim();
System.out.println(tmp);
tmp = tmp.replace("\\x00", "");
_value.set(tmp);
String filename = context.getConfiguration().get("map.input.file.name");
String[] splits = _value.toString().split(",");
if(splits.length==3){
filename = filename.replace("mv_", "");
filename = filename.replace(".txt", "");
_key.set(splits[0]+"_"+filename);
context.write(_key, _value);
}
}
}
public static class IntSumReducer extends
TableReducer<Text, Text, ImmutableBytesWritable> {
public void reduce(Text key, Iterable<Text> values,
Context context) throws IOException, InterruptedException {
Iterator<Text> itr = values.iterator();
while(itr.hasNext()){
Text t = itr.next();
String[] strs = t.toString().split(",");
if(strs.length!=3)continue;
Put put = new Put(key.getBytes());
put.add(Bytes.toBytes("content"), Bytes.toBytes("score"), Bytes.toBytes(strs[1].trim()));
put.add(Bytes.toBytes("content"), Bytes.toBytes("date"), Bytes.toBytes(strs[2].trim()));
context.write(new ImmutableBytesWritable(key.getBytes()), put);
}
}
} public static void main(String[] args) throws Exception {
String tablename = "ntf_data";
Configuration conf = HBaseConfiguration.create();
HBaseAdmin admin = new HBaseAdmin(conf);
if (admin.tableExists(tablename)) {
admin.disableTable(tablename);
admin.deleteTable(tablename);
}
HTableDescriptor htd = new HTableDescriptor(tablename);
HColumnDescriptor hcd = new HColumnDescriptor("content");
htd.addFamily(hcd);
admin.createTable(htd);
String[] otherArgs = new GenericOptionsParser(conf, args)
.getRemainingArgs();
if (otherArgs.length != 1) {
System.err
.println("Usage: wordcount <in> <out>" + otherArgs.length);
System.exit(2);
}
Job job = new Job(conf, "h");
job.setMapperClass(TokenizerMapper.class);
job.setJarByClass(BulkImportData.class);
job.setInputFormatClass(CombineSmallfileInputFormat.class);
job.setNumReduceTasks(5);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
TableMapReduceUtil.initTableReducerJob(tablename, IntSumReducer.class,
job);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}