Mapreduce的文件和hbase共同输入

时间:2022-09-02 18:21:15
Mapreduce的文件和hbase共同输入
package duogemap;
 
import java.io.IOException;
 
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 
public class MixMR {
 
public static class Map extends Mapper<Object, Text, Text, Text> {
 
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String s = value.toString();
String[] sa = s.split(",");
if (sa.length == 2) {
context.write(new Text(sa[0]), new Text(sa[1]));
}
 
}
 
}
 
public static class TableMap extends TableMapper<Text, Text> {
public static final byte[] CF = "cf".getBytes();
public static final byte[] ATTR1 = "c1".getBytes();
 
public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException {
 
String key = Bytes.toString(row.get());
String val = new String(value.getValue(CF, ATTR1));
 
context.write(new Text(key), new Text(val));
}
}
 
 
public static class Reduce extends Reducer <Object, Text, Object, Text> {
public void reduce(Object key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
String ks = key.toString();
for (Text val : values){
context.write(new Text(ks), val);
}
 
}
}
 
public static void main(String[] args) throws Exception {
Path inputPath1 = new Path(args[0]);
Path inputPath2 = new Path(args[1]);
Path outputPath = new Path(args[2]);
 
String tableName = "test";
 
Configuration config = HBaseConfiguration.create();
Job job = new Job(config, "ExampleRead");
job.setJarByClass(MixMR.class); // class that contains mapper
 
Scan scan = new Scan();
scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobs
scan.setCacheBlocks(false); // don't set to true for MR jobs
scan.addFamily(Bytes.toBytes("cf"));
 
TableMapReduceUtil.initTableMapperJob(
tableName, // input HBase table name
scan, // Scan instance to control CF and attribute selection
TableMap.class, // mapper
Text.class, // mapper output key
Text.class, // mapper output value
job);
 
 
job.setReducerClass(Reduce.class); // reducer class
job.setOutputFormatClass(TextOutputFormat.class);
 
 
// inputPath1 here has no effect for HBase table
MultipleInputs.addInputPath(job, inputPath1, TextInputFormat.class, Map.class);
MultipleInputs.addInputPath(job, inputPath2, TableInputFormat.class, TableMap.class);
 
FileOutputFormat.setOutputPath(job, outputPath);
 
job.waitForCompletion(true);
}
}