[Hadoop--基础]--用户自定义mapreduce输出的文件名称

时间:2023-01-27 09:34:21

前言

      有这样一个需求:在reduce结束后,输出的文件名称为用户自定义,且要求没有空文件输出。

方案

    方案1:单个输出,获取上下文配置,修改文件名称。

    方案2:参考spark使用MultipleOutputs输出。

这里主要说说方案2的实现方式。

1、需要保证输出文件没有空

2、自定义输出文件名称

3、具体实现

(1)在org.apache.hadoop.mapreduce.Job中设置输出格式

org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat
如下

MultipleOutputs.addNamedOutput(job, yesterday, LazyOutputFormat.class, Text.class, Text.class);
(2)在Reduce过程中自定义修改文件名称的方法

/* generate final basePath  */
private String generateBasePath(Text k, String yesterday) {
// key_yesterday_
return k.toString() + "_" + yesterday+ "_";
}
(3)在Reducer过程中,使用MultipleOutputs类

public  class LogReducer extends Reducer<Text, Text, NullWritable, Text> {
private final static Logger logger = LoggerFactory.getLogger(LogReducer.class);
private Text result;
private MultipleOutputs mos;
private String yesterday;
/* generate final basePath */
private String generateBasePath(Text k, String yesterday) {
// key_yesterday_
return k.toString() + "_" + yesterday+ "_";
}
@Override
protected void setup(Context context) throws IOException, InterruptedException {
result = new Text();
yesterday = context.getConfiguration().get("yesterday");
mos = new MultipleOutputs(context);
logger.debug("Reduce init..... , yesterday:{}", yesterday);
}
@Override
public void reduce(Text key, Iterable<Text> values,
Context context
) throws IOException, InterruptedException {
for (Text val : values) {
result.set(val);
// You do not need to write the key to the file.
mos.write(NullWritable.get(), result, generateBasePath(key, yesterday));
}
}
@Override
protected void cleanup(Context context
) throws IOException, InterruptedException {
// NOTHING
mos.close();
super.cleanup(context);
}
}
   

参考:

http://hadoop.apache.org/docs/r2.4.1/api/org/apache/hadoop/mapreduce/lib/output/MultipleOutputs.html#addNamedOutput%28org.apache.hadoop.mapreduce.Job,%20java.lang.String,%20java.lang.Class,%20java.lang.Class,%20java.lang.Class%29