我前段时间在完成一个公司业务时,遇到了一个这样的需求:将HDFS上按每天每小时存储的数据进行数据预处理,然后对应按天存储在HDFS........由此可得,MapReduce的输入路径是:
/user/data/yyyy/MM/dd/HH/
/user/out/yyyy/MM/dd/
在设计代码的时候,发现FileInputFormat.addInputPath()难堪此大任,于是,我就通过APIs等资料,找到了FileInputFormat.setInputPaths()的解决方案。不过,我将在下面对MapReduce的输入/输出进行总结和介绍。
1.MapReduce多路径输入
1.1FileInputFormat.addInputPath(s)
FileInputFormat.addInputPath()是我们最常用的设置MapReduce输入路径的方法了。其实,FileInputFormat有两个这样的方法:
static void addInputPath(Job job, Path path)
static void addInputPaths(Job job, String commaSeperatedPaths)
addInputPath()只能指定一个路径,如果要想添加多个路径需要多次调用该方法:
FileInputFormat.addInputPath(job, new Path(args[0]));
FileInputFormat.addInputPath(job, new Path(args[1]));
FileInputFormat.addInputPath(job, new Path(args[2]));
addInputPaths()可以指定多条路径,而这多条路径是用“,”分隔的一个字符串:
String paths = strings[0] + "," + strings[1];
FileInputFormat.addInputPaths(job, paths);
这两种方法的缺陷:
- 路径必须是指向目的文件的,如:/user/yyyy/mm/dd,而文件实际存在/user/yyyy/mm/dd/00,/user/yyyy/mm/dd/01。
- 路径中的目录不能存在通配符,如:/user/yyyy/mm/dd/*/。(我在2个节点的虚拟机上是用小数据可行的,但是在公司的大集群用大数据是不可行的,所以建议不要用通配符)。
- 目的文件的文件格式和类型必须一样,如:文件类型有CSV,RCFile。
- 所有文件都通过一个Mapper进行处理。
- 文件路径过多,代码冗余增加。
1.2MultipleInputs.addInputPath
MultipleInputs的addInputPath有两种定义方式:
static void addInputPath(Job job, Path path, Class<? extends InputFormat> inputFormatClass)
static void addInputPath(Job job, Path path, Class<? extends InputFormat> inputFormatClass, Class<? extends Mapper> mapperClass)
前者不需要指定Mapper,所以所有文件都通过一个Mapper进行处理;
MultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class);
MultipleInputs.addInputPath(job, new Path(args[1]), TextInputFormat.class);
后者可以对不同的路径指定不同的Mapper,故可以指定不同Mapper处理不同类型的文件。
MultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class,
MultiPathMR.MultiMap1.class);
MultipleInputs.addInputPath(job, new Path(args[1]), TextInputFormat.class,
MultiPathMR.MultiMap2.class);
- 路径必须是指向目的文件的,如:/user/yyyy/mm/dd,而文件实际存在/user/yyyy/mm/dd/00,/user/yyyy/mm/dd/01。
- 文件路径过多,代码冗余增加。
- 可以处理不同类型或不同格式的文件,如:CSV,RCFile。
- 路径中的目录能存在通配符,如:/user/yyyy/mm/dd/*/。
- 可以指定不同的Mapper处理不同路径下的文件。
3.1.3 FileInputFormat.setInputPaths
FileInputFormat有三个设置路径的方法:
static void setInputPathFilter(Job job, Class<? extends PathFilter> filter)
static void setInputPaths(Job job, Path... inputPaths)
static void setInputPaths(Job job, String commaSeparatedPaths)
通配符 |
描述 |
* |
匹配0个或多个字符 |
? |
匹配单个字符 |
[ab] |
匹配集合{a, b}中的单个字符 |
[^ab] |
匹配不在集合{a, b}中的单个字符 |
[a-b] |
匹配闭区间[a, b]中的单个字符,其顺序按字典字母排序 |
[^a-b] |
匹配不在闭区间[a, b]中的单个字符 |
{a, b} |
匹配a表达式或b表达式 |
\c |
匹配元字符c |
/user/yyyy/mm/dd/*/
FileInputFormat.setInputPaths(job, new Path(strings[0]));
也可以像这样:
Path[] paths = {new Path(strings[0]), new Path(strings[1])};
FileInputFormat.setInputPaths(job, paths);
对于第三个方法的使用:
String paths = strings[0] + "," + strings[1];
FileInputFormat.setInputPaths(job, paths);
这三种方法的缺陷:
- 目的文件的文件格式和类型必须一样,如:文件类型有CSV,RCFile。
- 所有文件都通过一个Mapper进行处理。
- 路径必须是指向目的文件的,如:/user/yyyy/mm/dd,而文件实际存在/user/yyyy/mm/dd/00,/user/yyyy/mm/dd/01。
- 路径中的目录能存在通配符,如:/user/yyyy/mm/dd/*/。
- 由于能使用通配符,所以即使路径过多,也不至于是代码冗余太多。
2多文件输出
MapReduce可以定义多文件输出,但是不能定义多目录输出。提供这种功能的是MultipleOutputs类。MultipleOutputs有三个write()方法:void write(KEYOUT key, VALUEOUT value, String baseOutputPath)
<K, V> void write(String namedOutput, K key, V value)
<K, V> void write(String namedOutput, K key, V value, String baseOutputPath)
在用后两个方法时,需要在调用FileOutputFormat. setOutputPath(job, new Path(args[1]))之前,先用addNamedOutput()方法定义namedOutput:
MultipleOutputs.addNamedOutput(job, namedOutput, TextOutputFormat.class,Text.class, LongWritable.class);
下面是一个WordCount的简单示例,输入文件是:
hello,world
hello,hadoop
hello,spark
MR代码:
public class MultiOutMR {
public static class MultiOutMapper extends Mapper<Object, Text, Text, IntWritable> {
private Text outKey = new Text();
private IntWritable outValue = new IntWritable(1);
@Override
protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String[] line = value.toString().trim().split(",");
for(String word : line){
outKey.set(word);
context.write(outKey, outValue);
}
}
}
public static class MultiOutReducer extends Reducer<Text, IntWritable, Text, LongWritable> {
private LongWritable count = new LongWritable();
private MultipleOutputs outputs;
@Override
protected void setup(Context context) throws IOException, InterruptedException {
outputs = new MultipleOutputs(context);
}
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for(IntWritable value : values){
sum += value.get();
}
count.set(sum);
Configuration conf = context.getConfiguration();
String type = conf.get("type");
if(type.equalsIgnoreCase("namedOutput")) {
if(key.toString().equals("hello")) {
outputs.write("hello", key, count);
}
else {
outputs.write("IT", key, count);
}
}
else if(type.equalsIgnoreCase("baseOutputPath")){
outputs.write(key, count, key.toString());
}
else {
if(key.toString().equals("hello")) {
outputs.write("hello", key, count, key.toString());
}
else {
outputs.write("IT", key, count, key.toString());
}
}
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
outputs.close();
}
}
}
Driver代码:
public class Driver extends Configured implements Tool {
@Override
public int run(String[] strings) throws Exception {
Configuration conf = getConf();
conf.set("type", strings[2]);
Job job = new Job(conf, "Multiple Output");
job.setJarByClass(Driver.class);
job.setMapperClass(MultiOutMR.MultiOutMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setReducerClass(MultiOutMR.MultiOutReducer.class);
if(!strings[2].equalsIgnoreCase("baseOutputPath")){
MultipleOutputs.addNamedOutput(job, "hello", TextOutputFormat.class,
Text.class, LongWritable.class);
MultipleOutputs.addNamedOutput(job, "IT", TextOutputFormat.class,
Text.class, LongWritable.class);
}
FileInputFormat.addInputPath(job, new Path(strings[0]));
FileOutputFormat.setOutputPath(job, new Path(strings[1]));
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args)throws Exception{
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if(otherArgs.length != 3){
System.err.println("Usage: <input> <input> <output type>");
System.out.println("Type:\n" +
"namedOutput - the named output name.\n" +
"baseOutputPath - base-output path to write the record to. Note: Framework will generate unique filename for the baseOutputPath.\n" +
"all - contains namedOutput and baseOutputPath.");
System.exit(1);
}
System.exit(ToolRunner.run(conf, new Driver(), otherArgs));
}
}
参考文献:
http://blog.zaloni.com/using-globs-and-wildcards-with-mapreduce