一个输入分片(split)就是一个由单个map操作来处理的输入块。每一个map操作只处理一个输入分片。每个分片被划分为若干个记录,每条记录就是一个键值对,map一个接一个地处理记录。输入分片和记录都是逻辑概念,不必将它们对应到文件,尽管其常见形式都是文件。在数据库的场景中,一个输入分片可以对应于一个表上的若干行,而一条记录对应到一行(DBInputFormat正式这么做的,这种输入格式用于从关系数据库读取数据)
输入分片在Java中被表示为InputSplit接口
InputSplit包含一个以字节为单位的长度和一组存储位置(即一组主机名)。注意,分片并不包含数据本身,而是指向数据的引用(reference)。存储位置供MapReduce系统使用以便将map任务尽量放在分片数据附近,而分片大小用来排序分片,以便优先处理最大的分片,从而最小化作业运行时间(这是贪婪近似算法的一个实例)
MapReduce应用开发人员不必直接处理InputSplit,因为它是由InputFormat创建的。InputFormat负责产生输入分片并将它们分割成记录。
public abstract class InputFormat<K, V> { public abstract List<InputSplit> getSplits(JobContext context ) throws IOException, InterruptedException; public abstract RecordReader<K,V> createRecordReader(InputSplit split, TaskAttemptContext context ) throws IOException, InterruptedException; }
运行作业的客户端通过调用getSplits计算分片,然后将它们发送到jobtracker,jobtracker使用其存储位置信息来调度map任务从而在tasktracker上处理这些分片数据。在tasktracker上,map任务把输入分片传给InputFormat的getRecordReader方法来获得这个分片的RecordReader。RecordReader就像是记录上的迭代器,map任务用一个RecordReader来生成记录的键值对,然后再传递给map函数。
public abstract class InputSplit { public abstract long getLength() throws IOException, InterruptedException; public abstract String[] getLocations() throws IOException, InterruptedException; }
运行setup之后,再重复调用Context上的nextKeyValue委托给RecordReader的同名函数实现来为mapper产生key和value对象。通过Context,key和value从RecordReader中重新取出传递给map().当reader读到stream的结尾时,nextKeyValue方法返回false,map任务运行其cleanup方法,然后结束。
由于效率原因,RecordReader程序每次调用getCurrentKey和getCurrentValue时返回相同的键值对象。只是这些对象的内容被reader的nextKeyValue方法改变。在map函数之外有键值的引用时,这可能引起问题,因为它的值会在没有警告的情况下呗改变。如果确实需要这样的引用,name需要保存想保留的对象的一个副本,例如,对于Text对象,可以使用它的复制构造函数:new Text(value)。这样的情况在reduce中也会发生。Reducer迭代器中的值对象被反复使用,所以,在调用迭代器之间,一定要复制任何需要保留的任何对象。
Mapper的run方法是公共的,可以由用户定制。MultithreadedMapRunner是另一个MapRunnable接口的实现,它可以使用可配置个数的线程来并发运行多个mapper(用mapreduce.mapper.multithreadedmapper.threads设置)。对于大多数数据处理任务来说,默认的执行机制没有优势。但是,对于因为需要连接外部服务器而造成单个记录处理时间比较长的mapper来说,它允许多个mapper在同一个JVM下以尽量避免竞争的方式执行。
1. FileInputFormat类
FileInputFormat是所有使用文件作为其数据源的InputFormat实现的基类。它提供两个功能:一个用于指出作业的输入文件位置;一个是输入文件生成分片的实现代码段。把分片切割成记录的作业由其子类来完成。
2. FileInputFormat类的输入路径
作业的输入被设定为一组路径,这对指定作业输入提供了很强的灵活性。FileInputFormat提供了四种静态方法来设定job的输入路径。
public static void addInputPath(Job job, Path path) throws IOException public static void addInputPaths(Job job, String commaSeparatedPaths) throws IOException public static void setInputPaths(Job job, Path... inputPaths) throws IOException public static void setInputPaths(Job job, String commaSeparatedPaths ) throws IOException
其中addInputPath和addInputPaths方法可以将一个或多个路径加入路径列表。可以分别调用这两种方法来建立路径列表。setInputPaths方法一次设定完整的路径(替换前面调用中在job上所设置的所有路径)
一条路径可以表示一个文件、一个目录、或是一个glob(通配),即一个文件和目录的集合。路径是目录的话,表示要包含这个目录下所有的文件,这些文件都作为作业的输入。
一个被指定为输入路径的目录,其内容不会被递归处理。事实上,这些目录只包含文件:如果包含子目录,也会被解释为文件(从而产生错误)。处理这个问题的方法是:使用一个文件glob或一个过滤器根据命名模式(name pattern)限定选择目录中的文件。另一种方法是,将mapred.input.dir.recursive设置为true从而强制对输入目录进行递归地读取。
Add方法和set方法允许指定包含的文件。如果需要排除特定文件,可以使用FileInputFormat的setInputPathFilter方法设置一个过滤器。
public static void setInputPathFilter(Job job, Class<? extends PathFilter> filter)
即使不设置过滤器,FileInputFormat也会使用一个默认的过滤器来排除隐藏文件(名称以”.”和”_”开头的文件)。如果通过setInputPathFilter设置了过滤器,它会在默认过滤器的基础上进行过滤。自定义的过滤器只能看到非隐藏文件。
路径和过滤器也可以通过配置属性来设置,这对Streaming和Pipes应用很方便。路径(mapred.input.dir) 过滤器(mapred.input.path.Filter.class)
3. FileInputFormat类的输入分片
FileInputFormat只分割大文件。这里的大指的是文件超过HDFS块的大小。分片通常与HDFS块大小一样,这在大多应用中是合理的;然而,这个值也可以通过设置不同的Hadoop属性来改变
mapred.min.split.size 默认:1 一个文件分片最小的有效字节数
mapred.max.split.size Long.MAX_VALUE 一个文件分片中最大的有效字节数(以字节算)
dfs.block.size 默认:64MB(hadoop1中),128MB(2中) HDFS中块的大小(按字节)
最小的分片大小通常是1个字节,不过某些格式可以使分片大小有一个更低的下界。
应用程序可以强制设置一个最小的输入分片大小:通过设置一个比HDFS块更大一些的值,强制分片比文件块大,如果数据存储在HDFS上,name这样做是没有好处的,因为这样会对map任务来说不是本地文件的文件块数。
最大的分片大小默认是由Java long类型表示的最大值。只有把它的值设置成小于块大小才有效果,这将强制分片比块小。
分片的大小由以下公式计算
max(minimumSize, min(maximumSize, blockSize))
默认情况下
minimumSize < blockSize < maximumSize
所以分片的大小就是blockSize
4. 小文件与CombineFileInputFormat
相当于大批量的小文件,Hadoop更合适处理少量的大文件。一个原因是FileInputFormat生成的分块是一个文件或该文件的一部分。如果文件很小,并且文件数量很多,那么每次map任务只处理很少的输入数据,(一个文件)就会有很多map任务,每次map操作都会造成额外的开销。
CombineFileInputFormat可以缓解这个问题,它是针对小文件而设计的。FileInputFormat为每个文件产生一个分片,而CombineFileInputFormat把多个文件打包到一个分片中以便每个mapper可以处理更多的数据。关键是哪些块放入同一个分片时,CombineFileInputFormat会考虑到节点和机架的因素,所以在典型MapReduce作业中处理输入的速度并不会下降。
当然,如果可能的话应该尽量避免许多小文件的情况,因为MapReduce处理数据的最佳速度最好与数据在集群中的传输速度相同,而处理小文件将增加作业而必须的寻址次数。还有,在HDFS集群中存储大量的小文件会浪费namenode的内存。一个可以减少大量小文件的方法是使用SequenceFile将这些小文件合并成一个或多个大文件:可以将文件名作为键,文件的内容作为值。
由于CombineFileInputFormat是一个抽象类,没有提供实体类,所以使用的时候需要一些额外的工作。如果要使用CombineFileInputFormat与TextInputFormat相同,需要创建一个CombineFileInputFormat的实体子类,并且实现getRecordreader()方法。
5. 避免切分
有些应用程序可能不希望文件被切分,而是用一个mapper完整处理每一个输入文件。例如,检查一个文件中记录是否有序,一个简单的方法是顺序扫描每一条记录并且比较后一条记录是否比前一条要小。如果将它时限为一个map任务,name只有一个map操作整个文件时,这个算法才可行。
有两种方法可以保证输入文件不被切分。第一种(最简单但不推荐)方法就是增加最小分片大小,将它设置成大于要处理的最大文件大小。把它设置为最大值logn.MAX_VALUE即可。第二种方法就是使用FileInputFormat具体子类,并且重载isSplitable()方法把返回值设置为false。
public class NonSplittableTextInputFormat extends TextInputFormat{ @Override protected boolean isSplitable(JobContext context, Path file) { return false; } }
6. mapper中的文件信息
处理输入分片的mapper可以从作业配置对象的某些特定属性中读取输入分片的有关信息,这可以通过调用在Mapper的context对象上的getInputSplit方法来实现。当输入的格式源来自于FileInputFormat时,该方法返回的InputSplit可以被强制转换为一个FileSplit,以此访问以下信息。
/** The file containing this split's data. */ public Path getPath() { return file; } /** The position of the first byte in the file to process. */ public long getStart() { return start; } /** The number of bytes in the file to process. */ @Override public long getLength() { return length; }
7. 把整个文件作为一条记录处理
有时,mapper需要访问一个文件中的全部内容。即使不分割文件,仍然需要一个RecordReader来读取文件内容作为record值。
代码如下
package com.zhen.mapreduce.wholeFile; import java.io.IOException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; /** * @author FengZhen * @date 2018年8月16日 * mapper访问一个文件中的全部内容 */ public class WholeFileInputFormat extends FileInputFormat<NullWritable, BytesWritable>{ @Override protected boolean isSplitable(JobContext context, Path filename) { //不分割文件 return false; } @Override public RecordReader<NullWritable, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { System.out.println("----------WholeFileInputFormat---------------"); WholeFileRecordReader reader = new WholeFileRecordReader(); reader.initialize(split, context); return reader; } } package com.zhen.mapreduce.wholeFile; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileSplit; /** * @author FengZhen * @date 2018年8月16日 * */ public class WholeFileRecordReader extends RecordReader<NullWritable, BytesWritable>{ private FileSplit fileSplit; private Configuration conf; private BytesWritable value = new BytesWritable(); private boolean processed = false; @Override public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { System.out.println("=============initialize============="); this.fileSplit = (FileSplit) split; this.conf = context.getConfiguration(); } @Override public boolean nextKeyValue() throws IOException, InterruptedException { System.out.println("=============nextKeyValue============="); if (!processed) { byte[] contents = new byte[(int) fileSplit.getLength()]; Path file = fileSplit.getPath(); FileSystem fs = file.getFileSystem(conf); FSDataInputStream in = null; try { in = fs.open(file); IOUtils.readFully(in, contents, 0, contents.length); value.set(contents, 0, contents.length); } finally { IOUtils.closeStream(in); } processed = true; return true; } return false; } @Override public NullWritable getCurrentKey() throws IOException, InterruptedException { return NullWritable.get(); } @Override public BytesWritable getCurrentValue() throws IOException, InterruptedException { return value; } @Override public float getProgress() throws IOException, InterruptedException { return processed ? 1.0f : 0.0f; } @Override public void close() throws IOException { //do nothing } } package com.zhen.mapreduce.wholeFile; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; /** * @author FengZhen * @date 2018年8月16日 * */ public class SmallFilesToSequenceFileConverter extends Configured implements Tool{ public static class SequenceFileMapper extends Mapper<NullWritable, BytesWritable, Text, BytesWritable>{ private Text filenameKey; @Override protected void setup(Mapper<NullWritable, BytesWritable, Text, BytesWritable>.Context context) throws IOException, InterruptedException { InputSplit inputSplit = context.getInputSplit(); //当输入的格式源自于FileInputFormat时,该方法返回的InputSplit可以被强制转换为一个FileSplit Path path = ((FileSplit) inputSplit).getPath(); filenameKey = new Text(path.toString()); } @Override protected void map(NullWritable key, BytesWritable value, Mapper<NullWritable, BytesWritable, Text, BytesWritable>.Context context) throws IOException, InterruptedException { System.out.println("=============map============="); context.write(filenameKey, value); } } public static class SequenceFileReducer extends Reducer<Text, BytesWritable, Text, BytesWritable>{ @Override protected void reduce(Text key, Iterable<BytesWritable> value, Reducer<Text, BytesWritable, Text, BytesWritable>.Context context) throws IOException, InterruptedException { System.out.println("=============reduce============="); while (value.iterator().hasNext()) { BytesWritable byteWritable = value.iterator().next(); context.write(key, byteWritable); } } } public int run(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJobName("SmallFilesToSequenceFileConverter"); job.setJarByClass(SmallFilesToSequenceFileConverter.class); job.setInputFormatClass(WholeFileInputFormat.class); job.setOutputFormatClass(SequenceFileOutputFormat.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(BytesWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(BytesWritable.class); job.setMapperClass(SequenceFileMapper.class); job.setReducerClass(SequenceFileReducer.class); WholeFileInputFormat.setInputPaths(job, new Path(args[0])); SequenceFileOutputFormat.setOutputPath(job, new Path(args[1])); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { String[] params = new String[]{"hdfs://fz/user/hdfs/MapReduce/data/wholeFile","hdfs://fz/user/hdfs/MapReduce/data/wholeFile/output"}; int exitCode = ToolRunner.run(new SmallFilesToSequenceFileConverter(), params); System.exit(exitCode); } }