Hadoop MapReduce InputFormat/OutputFormat

时间:2022-02-07 21:54:35

InputFormat

import java.io.IOException;
import java.util.List; /**
* InputFormat describes the input-specification for a Map-Reduce job.
*
* The Map-Reduce framework relies on the InputFormat of the job to:
*
* Validate the input-specification of the job.
*
* Split-up the input file(s) into logical InputSplits, each of which is then
* assigned to an individual Mapper.
*
* Provide the RecordReader implementation to be used to glean input records
* from the logical InputSplit for processing by the Mapper.
*
* The default behavior of file-based InputFormats, typically sub-classes of
* FileInputFormat, is to split the input into logical InputSplits based on the
* total size, in bytes, of the input files. However, the FileSystem blocksize
* of the input files is treated as an upper bound for input splits. A lower
* bound on the split size can be set via mapred.min.split.size.
*
* Clearly, logical splits based on input-size is insufficient for many
* applications since record boundaries are to respected. In such cases, the
* application has to also implement a RecordReader on whom lies the
* responsibility to respect record-boundaries and present a record-oriented
* view of the logical InputSplit to the individual task.
*
*/
public abstract class InputFormat<K, V> { /**
* Logically split the set of input files for the job.
*
* <p>
* Each {@link InputSplit} is then assigned to an individual {@link Mapper}
* for processing.
* </p>
*
* <p>
* <i>Note</i>: The split is a <i>logical</i> split of the inputs and the
* input files are not physically split into chunks. For e.g. a split could
* be <i>&lt;input-file-path, start, offset&gt;</i> tuple. The InputFormat
* also creates the {@link RecordReader} to read the {@link InputSplit}.
*
* @param context
* job configuration.
* @return an array of {@link InputSplit}s for the job.
*/
public abstract List<InputSplit> getSplits(JobContext context)
throws IOException, InterruptedException; /**
* Create a record reader for a given split. The framework will call
* {@link RecordReader#initialize(InputSplit, TaskAttemptContext)} before
* the split is used.
*
* @param split
* the split to be read
* @param context
* the information about the task
* @return a new record reader
* @throws IOException
* @throws InterruptedException
*/
public abstract RecordReader<K, V> createRecordReader(InputSplit split,
TaskAttemptContext context) throws IOException,
InterruptedException; }

OutputFormat

import java.io.IOException;

import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.RecordReader; /**
* <code>InputSplit</code> represents the data to be processed by an individual
* {@link Mapper}.
*
* <p>
* Typically, it presents a byte-oriented view on the input and is the
* responsibility of {@link RecordReader} of the job to process this and present
* a record-oriented view.
*
* @see InputFormat
* @see RecordReader
*/
public abstract class InputSplit { /**
* Get the size of the split, so that the input splits can be sorted by
* size.
*
* @return the number of bytes in the split
* @throws IOException
* @throws InterruptedException
*/
public abstract long getLength() throws IOException, InterruptedException; /**
* Get the list of nodes by name where the data for the split would be
* local. The locations do not need to be serialized.
*
* @return a new array of the node nodes.
* @throws IOException
* @throws InterruptedException
*/
public abstract String[] getLocations() throws IOException,
InterruptedException; }

RecordReader

import java.io.Closeable;
import java.io.IOException; /**
* The record reader breaks the data into key/value pairs for input to the
* {@link Mapper}.
*
* @param <KEYIN>
* @param <VALUEIN>
*/
public abstract class RecordReader<KEYIN, VALUEIN> implements Closeable { /**
* Called once at initialization.
*
* @param split
* the split that defines the range of records to read
* @param context
* the information about the task
* @throws IOException
* @throws InterruptedException
*/
public abstract void initialize(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException; /**
* Read the next key, value pair.
*
* @return true if a key/value pair was read
* @throws IOException
* @throws InterruptedException
*/
public abstract boolean nextKeyValue() throws IOException,
InterruptedException; /**
* Get the current key
*
* @return the current key or null if there is no current key
* @throws IOException
* @throws InterruptedException
*/
public abstract KEYIN getCurrentKey() throws IOException,
InterruptedException; /**
* Get the current value.
*
* @return the object that was read
* @throws IOException
* @throws InterruptedException
*/
public abstract VALUEIN getCurrentValue() throws IOException,
InterruptedException; /**
* The current progress of the record reader through its data.
*
* @return a number between 0.0 and 1.0 that is the fraction of the data
* read
* @throws IOException
* @throws InterruptedException
*/
public abstract float getProgress() throws IOException,
InterruptedException; /**
* Close the record reader.
*/
public abstract void close() throws IOException; }

OutputFormat

import java.io.IOException;

import org.apache.hadoop.fs.FileSystem;

/**
* <code>OutputFormat</code> describes the output-specification for a Map-Reduce
* job.
*
* <p>
* The Map-Reduce framework relies on the <code>OutputFormat</code> of the job
* to:
* <p>
* <ol>
* <li>
* Validate the output-specification of the job. For e.g. check that the output
* directory doesn't already exist.
* <li>
* Provide the {@link RecordWriter} implementation to be used to write out the
* output files of the job. Output files are stored in a {@link FileSystem}.</li>
* </ol>
*
* @see RecordWriter
*/
public abstract class OutputFormat<K, V> { /**
* Get the {@link RecordWriter} for the given task.
*
* @param context
* the information about the current task.
* @return a {@link RecordWriter} to write the output for the job.
* @throws IOException
*/
public abstract RecordWriter<K, V> getRecordWriter(
TaskAttemptContext context) throws IOException,
InterruptedException; /**
* Check for validity of the output-specification for the job.
*
* <p>
* This is to validate the output specification for the job when it is a job
* is submitted. Typically checks that it does not already exist, throwing
* an exception when it already exists, so that output is not overwritten.
* </p>
*
* @param context
* information about the job
* @throws IOException
* when output should not be attempted
*/
public abstract void checkOutputSpecs(JobContext context)
throws IOException, InterruptedException; /**
* Get the output committer for this output format. This is responsible for
* ensuring the output is committed correctly.
*
* @param context
* the task context
* @return an output committer
* @throws IOException
* @throws InterruptedException
*/
public abstract OutputCommitter getOutputCommitter(
TaskAttemptContext context) throws IOException,
InterruptedException; }

RecordWriter

import java.io.IOException;

import org.apache.hadoop.fs.FileSystem;

/**
* <code>RecordWriter</code> writes the output &lt;key, value&gt; pairs to an
* output file.
*
* <p>
* <code>RecordWriter</code> implementations write the job outputs to the
* {@link FileSystem}.
*
* @see OutputFormat
*/
public abstract class RecordWriter<K, V> { /**
* Writes a key/value pair.
*
* @param key
* the key to write.
* @param value
* the value to write.
* @throws IOException
*/
public abstract void write(K key, V value) throws IOException,
InterruptedException; /**
* Close this <code>RecordWriter</code> to future operations.
*
* @param context
* the context of the task
* @throws IOException
*/
public abstract void close(TaskAttemptContext context) throws IOException,
InterruptedException; }

OutputCommitter

import java.io.IOException;

/**
* <code>OutputCommitter</code> describes the commit of task output for a
* Map-Reduce job.
*
* <p>
* The Map-Reduce framework relies on the <code>OutputCommitter</code> of the
* job to:
* <p>
* <ol>
* <li>
* Setup the job during initialization. For example, create the temporary output
* directory for the job during the initialization of the job.</li>
* <li>
* Cleanup the job after the job completion. For example, remove the temporary
* output directory after the job completion.</li>
* <li>
* Setup the task temporary output.</li>
* <li>
* Check whether a task needs a commit. This is to avoid the commit procedure if
* a task does not need commit.</li>
* <li>
* Commit of the task output.</li>
* <li>
* Discard the task commit.</li>
* </ol>
*
* @see org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
* @see JobContext
* @see TaskAttemptContext
*
*/
public abstract class OutputCommitter { /**
* For the framework to setup the job output during initialization
*
* @param jobContext
* Context of the job whose output is being written.
* @throws IOException
* if temporary output could not be created
*/
public abstract void setupJob(JobContext jobContext) throws IOException; /**
* For cleaning up the job's output after job completion
*
* @param jobContext
* Context of the job whose output is being written.
* @throws IOException
*/
public abstract void cleanupJob(JobContext jobContext) throws IOException; /**
* Sets up output for the task.
*
* @param taskContext
* Context of the task whose output is being written.
* @throws IOException
*/
public abstract void setupTask(TaskAttemptContext taskContext)
throws IOException; /**
* Check whether task needs a commit
*
* @param taskContext
* @return true/false
* @throws IOException
*/
public abstract boolean needsTaskCommit(TaskAttemptContext taskContext)
throws IOException; /**
* To promote the task's temporary output to final output location
*
* The task's output is moved to the job's output directory.
*
* @param taskContext
* Context of the task whose output is being written.
* @throws IOException
* if commit is not
*/
public abstract void commitTask(TaskAttemptContext taskContext)
throws IOException; /**
* Discard the task output
*
* @param taskContext
* @throws IOException
*/
public abstract void abortTask(TaskAttemptContext taskContext)
throws IOException; }