MapReduce太高深,性能也值得考虑,大家感兴趣的还是看看spark比较好。
FileInputFormat类
FileInputFormat是所有使用文件为数据源的InputFormat实现的基类,它提供了两个功能:一个定义哪些文件包含在一个作业的输入中;一个为输入文件生成分片的实现,把分片割成记录的作业由其子类来完成。
下图为InputFormat类的层次结构
:
FileInputFormat 类输入路径
FileInputFormat 提供四种静态方法来设定 Job 的输入路径,其中下面的 addInputPath() 方法 addInputPaths() 方法可以将一个或多个路径加入路径列表,setInputPaths() 方法一次设定完整的路径列表(可以替换前面所设路径)
1
2
3
4
|
public
static
void
addInputPath(Job job, Path path);
public
static
void
addInputPaths(Job job, String commaSeparatedPaths);
public
static
void
setInputPaths(Job job, Path... inputPaths);
public
static
void
setInputPaths(Job job, String commaSeparatedPaths);
|
如果需要排除特定文件,可以使用 FileInputFormat 的 setInputPathFilter() 设置一个过滤器: public
它默认过滤隐藏文件中以”_“和”.“开头的文件
static void setInputPathFilter(Job job, Class<? extends PathFilter> filter);
即使不手动设置过滤器, FilelnputFormat 也会自动设置一个默认的过滤器来排 除隐藏文件(即以.. ..和"_开头的文件)。如巢调用setlnputPathFilter (), 那么它会在原有默认过滤器的基础上增加一个过滤器。也就是说,自定义的过滤器只能看到非隐藏文件。
1
2
3
4
5
6
|
private
static
final
PathFilter hiddenFileFilter =
new
PathFilter(){
public
boolean
accept(Path p){
String name = p.getName();
return
!name.startsWith(
"_"
) && !name.startsWith(
"."
);
}
};
|
FileInputFormat 类的输入分片
FilelnputFormat 是如何将文件转换为输入分片的呢? FilelnputF ormat 只会分 制大文件。这里的"大"指的是大于HDFS 块的大小,这在大多应用程序中是合理的,然而,这个值也可以通过设置不同的Hadoop 变量而改变。FileInputFormat 类一般分割超过 HDFS 块大小的文件。通常分片与 HDFS 块大小一样,然后分片大小也可以改变的,下面展示了控制分片大小的属性:
1
2
3
|
FileInputFormat computeSplitSize(
long
goalSize,
long
minSize,
long
blockSize) {
return
Math.max(minSize, Math.min(goalSize, blockSize));
}
|
minimumSize < blockSize < maximumSize 分片的大小即为块大小。
重载 FileInputFormat 的 isSplitable() =false 可以避免 mapreduce 输入文件被分割。
小文件与CombineFileInputFormat
CombineFileInputFormat 是针对小文件设计的,CombineFileInputFormat 会把多个文件打包到一个分片中,以便每个 mapper 可以处理更多的数据;减少大量小文件的另一种方法可以使用 SequenceFile 将这些小文件合并成一个或者多个大文件。
CombineFileInputFormat 不仅对于处理小文件实际上对于处理大文件也有好处,本质上,CombineFileInputFormat 使 map 操作中处理的数据量与 HDFS 中文件的块大小之间的耦合度降低了
CombineFileInputFormat 是一个抽象类,没有提供实体类,所以需要实现一个CombineFileInputFormat 具体 类和 getRecordReader() 方法(旧的接口是这个方法,新的接口InputFormat中则是createRecordReader())
避免分割
把整个文件作为一条记录处理
有时,mapper 需要访问问一个文件中的全部内容。即使不分割文件,仍然需要一个 RecordReader 来读取文件内容为 record 的值,下面给出实现这个功能的完整程序,详细解释见《Hadoop权威指南》。
文本处理
TextInputFileFormat 是默认的 InputFormat,每一行就是一个纪录
TextInputFileFormat 的 key 是 LongWritable 类型,存储该行在整个文件的偏移量,value 是每行的数据内容,不包括任何终止符(换行符和回车符),它是Text类型. 如下例 On the top of the Crumpetty Tree
The Quangle Wangle sat,
But his face you could not see,
On account of his Beaver Hat.
每条记录表示以下key/value对
(0, On the top of the Crumpetty Tree)
(33, The Quangle Wangle sat,)
(57, But his face you could not see,)
(89, On account of his Beaver Hat.输入分片与 HDFS 块之间的关系:TextInputFormat 每一条纪录就是一行,很可能某一行跨数据库存放。
KeyValueTextInputFormat。对下面的文本,KeyValueTextInputFormat 比较适合处理,其中可以通过 mapreduce.input.keyvaluelinerecordreader.key.value.separator 属性设置指定分隔符,默认 值为制表符,以下指定”→“为分隔符
line1→On the top of the Crumpetty Tree
line2→The Quangle Wangle sat,
line3→But his face you could not see,
line4→On account of his Beaver Hat.NLineInputFormat。如果希望 mapper 收到固定行数的输入,需要使用 NLineInputFormat 作为 InputFormat 。与 TextInputFormat 一样,key是文件中行的字节偏移量,值是行本身。
N 是每个 mapper 收到的输入行数,默认时 N=1,每个 mapper 会正好收到一行输入,mapreduce.input.lineinputformat.linespermap 属性控制 N 的值。以刚才的文本为例。 如果N=2,则每个输入分片包括两行。第一个 mapper 会收到前两行 key/value 对:
(0, On the top of the Crumpetty Tree)
(33, The Quangle Wangle sat,)
另一个mapper则收到:
(57, But his face you could not see,)
(89, On account of his Beaver Hat.)
二进制输入
SequenceFileInputFormat 如果要用顺序文件数据作为 MapReduce 的输入,应用 SequenceFileInputFormat。key 和 value 顺序文件,所以要保证map输入的类型匹配
SequenceFileInputFormat 可以读 MapFile 和 SequenceFile,如果在处理顺序文件时遇到目录,SequenceFileInputFormat 类会认为值正在读 MapFile 数据文件。
SequenceFileAsTextInputFormat 是 SequenceFileInputFormat 的变体。将顺序文件(其实就是SequenceFile)的 key 和 value 转成 Text 对象
SequenceFileAsBinaryInputFormat是 SequenceFileInputFormat 的变体。将顺序文件的key和value作为二进制对象
多种输入
对于不同格式,不同表示的文本文件输出的处理,可以用 MultipleInputs 类里处理,它允许为每条输入路径指定 InputFormat 和 Mapper。
MultipleInputs 类有一个重载版本的 addInputPath()方法:
- 旧api列举
1
public
static
void
addInputPath(JobConf conf, Path path, Class<?
extends
InputForma
t> inputFormatClass)
- 新api列举
1
public
static
void
a ddInputPath(Job job, Path path, Class<?
extends
InputFormat>
inputFormatClass)
DBInputFormat
JDBC从关系数据库中读取数据的输入格式(参见权威指南)
总结:
输出格式
OutputFormat类的层次结构
文本输出
默认输出格式是 TextOutputFormat,它本每条记录写成文本行,key/value 任意,这里 key和value 可以用制表符分割,用 mapreduce.output.textoutputformat.separator 书信可以改变制表符,与TextOutputFormat 对应的输入格式是 KeyValueTextInputFormat。
可以使用 NullWritable 来省略输出的 key 和 value。
二进制输出
- SequenceFileOutputFormat 将它的输出写为一个顺序文件,因为它的格式紧凑,很容易被压缩,所以易于作为 MapReduce 的输入
- 把key/value对作为二进制格式写到一个 SequenceFile 容器中
- MapFileOutputFormat 把 MapFile 作为输出,MapFile 中的 key 必需顺序添加,所以必须确保 reducer 输出的 key 已经排好序。
多个输出
MultipleOutputFormat 类可以将数据写到多个文件中,这些文件名称源于输出的键和值。MultipleOutputFormat是个抽象类,它有两个子类:MultipleTextOutputFormat 和 MultipleSequenceFileOutputFormat 。它们是 TextOutputFormat 的和 SequenceOutputFormat 的多版本。
MultipleOutputs 类 用于生成多个输出的库,可以为不同的输出产生不同的类型,无法控制输出的命名。它用于在原有输出基础上附加输出。输出是制定名称的。
MultipleOutputFormat和MultipleOutputs的区别
这两个类库的功能几乎相同。MultipleOutputs 功能更齐全,但 MultipleOutputFormat 对 目录结构和文件命令更多de控制。
特征 | MultipleOutputFormat | MultipleOutputs |
完全控制文件名和目录名 | 是 | 否 |
不同输出有不同的键和值类型 | 否 | 是 |
从同一作业的map和reduce使用 | 否 | 是 |
每个纪录多个输出 | 否 | 是 |
与任意OutputFormat一起使用 | 否,需要子类 | 是 |
延时输出
有些文件应用倾向于不创建空文件,此时就可以利用 LazyOutputFormat (Hadoop 0.21.0版本之后开始提供),它是一个封装输出格式,可以保证指定分区第一条记录输出时才真正的创建文件,要使用它,用JobConf和相关输出格式作为参数来调用 setOutputFormatClass() 方法.
Streaming 和 Pigs 支持 -LazyOutput 选项来启用 LazyOutputFormat功能。
数据库输出
总结:
下表给出了已提供的输出格式:
输出格式 |
描述 |
TextOutputFormat |
默认的输出格式, 以 "key \t value" 的方式输出行 |
SequenceFileOutputFormat |
输出二进制文件,适合于读取为子MapReduce作业的输入 |
NullOutputFormat |
忽略收到的数据,即不做输出 |
SequenceFileAsBinaryOutputFormat | 与SequenceFileAsBinaryInputFormat相对应,它将键/值对当作二进制数据写入一个顺序文件 |
MapFileOutputFormat | MapFileOutputFormat将结果写入一个MapFile中。MapFile中的键必须是排序的,所以在reducer中必须保证输出的键有序。 |