前言
有这样一个需求:在reduce结束后,输出的文件名称为用户自定义,且要求没有空文件输出。
方案
方案1:单个输出,获取上下文配置,修改文件名称。
方案2:参考spark使用MultipleOutputs输出。
这里主要说说方案2的实现方式。
1、需要保证输出文件没有空
2、自定义输出文件名称
3、具体实现
(1)在org.apache.hadoop.mapreduce.Job中设置输出格式
org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat
如下
MultipleOutputs.addNamedOutput(job, yesterday, LazyOutputFormat.class, Text.class, Text.class);(2)在Reduce过程中自定义修改文件名称的方法
/* generate final basePath */(3)在Reducer过程中,使用MultipleOutputs类
private String generateBasePath(Text k, String yesterday) {
// key_yesterday_
return k.toString() + "_" + yesterday+ "_";
}
public class LogReducer extends Reducer<Text, Text, NullWritable, Text> {
private final static Logger logger = LoggerFactory.getLogger(LogReducer.class);
private Text result;
private MultipleOutputs mos;
private String yesterday;
/* generate final basePath */
private String generateBasePath(Text k, String yesterday) {
// key_yesterday_
return k.toString() + "_" + yesterday+ "_";
}
@Override
protected void setup(Context context) throws IOException, InterruptedException {
result = new Text();
yesterday = context.getConfiguration().get("yesterday");
mos = new MultipleOutputs(context);
logger.debug("Reduce init..... , yesterday:{}", yesterday);
}
@Override
public void reduce(Text key, Iterable<Text> values,
Context context
) throws IOException, InterruptedException {
for (Text val : values) {
result.set(val);
// You do not need to write the key to the file.
mos.write(NullWritable.get(), result, generateBasePath(key, yesterday));
}
}
@Override
protected void cleanup(Context context
) throws IOException, InterruptedException {
// NOTHING
mos.close();
super.cleanup(context);
}
}
参考: