HDFS小文件处理——Mapper处理

时间:2024-09-16 19:36:08

处理小文件的时候,可以通过org.apache.hadoop.io.SequenceFile.Writer类将所有文件写出到一个seq文件中。

大致流程如下:

HDFS小文件处理——Mapper处理

实现代码:

package study.smallfile.sequence_one;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.SequenceFile.Writer;
import org.apache.hadoop.io.SequenceFile.Writer.Option;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class MapperDemo { private static final String INPUT_PATH = "hdfs://cluster1/smallfile/blankfile";
private static final String OUT_PATH = "hdfs://cluster1/smallfile/combined/map";
static FileSystem fileSystem; public void CombinedFile() throws Exception {
Job job = Job.getInstance(); job.setJarByClass(MapperDemo.class);
job.setJobName(MapperDemo.class.getSimpleName()); // 设置map类
job.setMapperClass(MapperDemo.CombinedMapper.class);
// 设置输出
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(BytesWritable.class);
// 设置reduce任务数量
job.setNumReduceTasks(0);
// 设置输入路径
FileInputFormat.setInputPaths(job, new Path(INPUT_PATH));
// 检查输出路径
Path outdir = new Path(OUT_PATH);
fileSystem = FileSystem.get(job.getConfiguration());
if (fileSystem.exists(outdir)) {// 如果已经存在删除
fileSystem.delete(outdir, true);
} // 设置输出路径
FileOutputFormat.setOutputPath(job, outdir); job.waitForCompletion(true); } static class CombinedMapper extends
Mapper<LongWritable, Text, Text, BytesWritable> {
Writer writer = null;
FileStatus[] files; Text outKey = new Text();
BytesWritable outValue = new BytesWritable(); FSDataInputStream in;
byte[] buffer = null; @Override
protected void map(LongWritable key, Text value,
Mapper<LongWritable, Text, Text, BytesWritable>.Context context)
throws IOException, InterruptedException {
// for (FileStatus file : files) {
// outKey.set(file.getPath().toString());
//
// in = fileSystem.open(file.getPath());
// buffer = new byte[(int) file.getLen()];
// IOUtils.read(in, buffer, 0, buffer.length);
// outValue.set(new BytesWritable(buffer));
// writer.append(outKey, outValue);
// } } @Override
protected void cleanup(
Mapper<LongWritable, Text, Text, BytesWritable>.Context context)
throws IOException, InterruptedException {
for (FileStatus file : files) {
outKey.set(file.getPath().toString()); in = fileSystem.open(file.getPath());
buffer = new byte[(int) file.getLen()];
IOUtils.readFully(in, buffer, 0, buffer.length);
outValue.set(new BytesWritable(buffer));
writer.append(outKey, outValue);
}
IOUtils.closeStream(writer);
} @Override
protected void setup(
Mapper<LongWritable, Text, Text, BytesWritable>.Context context)
throws IOException, InterruptedException {
// 输出文件项
Option fileOption = SequenceFile.Writer.file(new Path(OUT_PATH
+ "/mapper.seq"));
// 压缩选项
Option compressionOption = SequenceFile.Writer
.compression(CompressionType.BLOCK);
// SequeneFile key类型设置
Option keyClassOption = SequenceFile.Writer.keyClass(Text.class);
// SequeneFile value类型设置
Option valueClassOption = SequenceFile.Writer
.valueClass(BytesWritable.class);
// 构建输出流文件
Configuration conf = new Configuration();
writer = SequenceFile.createWriter(conf, fileOption,
compressionOption, keyClassOption, valueClassOption);
if (fileSystem == null) {
fileSystem = FileSystem.get(conf);
}
files = fileSystem.listStatus(new Path("hdfs://cluster1/smallfile/logs")); }
}
}

注意事项:

  我原本的逻辑是放到map函数中,将所有文件通过Writer写到HDFS中,但是map在整个mr的执行中被调用的次数是根据输入文件情况确定的,通过控制输入文件的情况,可以通过map函数实现

发现问题:

原本在实现之前,定义了一个FileSystem类型的静态字段,在提交job前已经赋值了,但是,在mapper类中访问到的fileSystem字段,是空值,有知道的大虾,多多指导小弟

SequenceFile介绍:

http://wiki.apache.org/hadoop/SequenceFile

http://www.cnblogs.com/zhenjing/archive/2012/11/02/File-Format.html