一个combineInputformat

时间:2023-03-08 21:59:58
一个combineInputformat

mark

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader;
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
import org.apache.hadoop.util.LineReader; public class MyInputFormat extends CombineFileInputFormat<InputSplitFile, Text> { @Override
public RecordReader<InputSplitFile, Text> createRecordReader(InputSplit split, TaskAttemptContext context)
throws IOException {
return new CombineFileRecordReader<InputSplitFile, Text>((CombineFileSplit)split, context, MyCombineFileRecordReader.class);
} } class MyCombineFileRecordReader extends RecordReader<InputSplitFile, Text> {
private static final Log LOG = LogFactory.getLog(MyCombineFileRecordReader.class); private CompressionCodecFactory compressionCodecs = null;
private long start;
private long pos;
private long end;
private Path path;
private LineReader in;
private int maxLineLength;
private InputSplitFile key = null;
private Text value = null; public MyCombineFileRecordReader(CombineFileSplit split, TaskAttemptContext context, Integer index) throws IOException{
Configuration job = context.getConfiguration();
this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength", Integer.MAX_VALUE);
this.path = split.getPath(index);
this.start = split.getOffset(index);
this.end = start + split.getLength(index);
compressionCodecs = new CompressionCodecFactory(job);
final CompressionCodec codec = compressionCodecs.getCodec(this.path);
boolean skipFirstLine = false; FileSystem fs = path.getFileSystem(job);
FSDataInputStream fileIn = fs.open(split.getPath(index));
if (codec != null) {
in = new LineReader(codec.createInputStream(fileIn), job);
end = Long.MAX_VALUE;
} else {
if (start != 0) {
skipFirstLine = true;
--start;
fileIn.seek(start);
}
in = new LineReader(fileIn, job);
}
if (skipFirstLine) { // skip first line and re-establish "start".
start += in.readLine(new Text(), 0,
(int)Math.min((long)Integer.MAX_VALUE, end - start));
}
this.pos = start; } @Override
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
// TODO Auto-generated method stub } @Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if(key == null){
key = new InputSplitFile();
key.setFileName(path.getName());
}
key.setFileName(path.getName());
if(value == null){
value = new Text();
}
int newSize = 0;
while(pos < end){ newSize = in.readLine(value, maxLineLength, Math.max((int)Math.min(Integer.MAX_VALUE, end-pos), maxLineLength)); if(newSize == 0){
break;
} pos += newSize;
if(newSize < maxLineLength){
break;
}
// line too long. try again
LOG.info("Skipped line of size " + newSize + " at pos " +
(pos - newSize));
}
if (newSize == 0) {
key = null;
value = null;
return false;
} else {
return true;
} } @Override
public InputSplitFile getCurrentKey() throws IOException, InterruptedException { return key;
} @Override
public Text getCurrentValue() throws IOException, InterruptedException { return value;
} @Override
public float getProgress() throws IOException, InterruptedException {
if(start == end){
return 0.0f;
}else {
return Math.min(1.0f, (pos - start)/(float)(end-start));
}
} @Override
public void close() throws IOException {
if(in != null){
in.close();
} } } class InputSplitFile implements WritableComparable<InputSplitFile> {
private long offset;
private String fileName; public long getOffset(){
return offset;
} public void setOffset(long offset){
this.offset = offset;
} public String getFileName(){
return fileName;
} public void setFileName(String fileName){
this.fileName = fileName;
} public void readFields(DataInput in) throws IOException {
this.offset = in.readLong();
this.fileName = Text.readString(in);
} public void write(DataOutput out) throws IOException{
out.writeLong(offset);
Text.writeString(out, fileName);
} public int compareTo(InputSplitFile o){
InputSplitFile that = (InputSplitFile) o; int f = this.fileName.compareTo(that.fileName);
if(f == 0){
return (int)Math.signum((double)(this.offset - that.offset));
}
return f;
} public boolean equals(InputSplitFile obj){
if(obj instanceof InputSplitFile){
return this.compareTo(obj) == 0;
}
return false;
} public int hashCode(){
assert false : "hashCode not designed";
return 42;//an arbitrary constant
} }