Hadoop实践(三)---Hadoop数据类型

时间:2022-01-26 14:15:19

适合Hadoop的数据类型

Hadoop使用派生于Writable接口的类作为MapReduce计算的数据类型,这些数据类型用于整个MapReduce计算流的数据吞吐过程,这个过程从读取输入数据开始,到传输map和reduce任务之间的中间数据,一直到最后写入输出数据为止;为输入数据、中间数据和输出数据选择合适的Writable数据类型面对MapReduce程序的可编程性和性能有很大的影响

  1. 为了用作MapReduce计算的value数据类型,数据类型必须实现org.apache.hadoop.io.Writable接口;Writable接口定义了当需要数据传输和数据存储时,Hadoop应该如何序列化和反序列化

  2. 为了用作MapReduce计算的key数据类型,数据类型必须实现org.apache.hadoop.io.WritableComparable<T>接口,除了Writable接口的功能之外,有一种WritableComparable接口更进一步定义了如何将这种类型的键相互比较,以达到排序的目的

Hadoop带有一些预定于的类用于实现WritableComparable,包括基本数据类型的封装类:

描述
BooleanWritable 标准布尔变量的封装
ByteWritable 单字节数的封装
DoubleWritable 双字节数的封装
FloatWritable 浮点数的封装
IntWritable 整数的封装
LongWritable 长整型的封装
Text 使用UTF-8格式的文本封装
NullWritable 无键值时的占位符

1.使用泛型类型变量为mapper的键值对指定输入数据类型(键:LongWritable,值:Text)和输出数据类型(键:Text,值:IntWritable)

public class SampleMapper extends Mapper<LongWritable,Text,Text,IntWritable>{
public void map(LongWritable key, Text value,Context context). . .{
. . .
}
}

2.使用通用型变量为reducer的键值对指定输入数据类型(键:Text,值:IntWritable)和输出数据类型(键:Text,值:IntWritable)【reducer的输入键值对数据类型应该和mapper的输出键值对数据类型相匹配】

public class Reduce extends Reducer<Text,IntWritable,Text,IntWritable>{
public void reduce(Text key, IntWritable value,Context context). . .{
. . .
}
}

3.使用Job对象指定MapReduce计算输出数据类型,当mapper和reducer的输出类型相同时:

Job job = new Job(. . .);
(Job job = Job.getInstance();)
. . .
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

4.当mapper和reducer的输出类型相同时:

job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);

Hadoop提供一些基本数据类型,IntWritable,LongWritable,BooleanWritable,FloatWritable,ByteWritable,这是它们各自的Java基本数据类型的Writable版本,可以使用这些类型作为key类型和value类型

下面是几种Hadoop内置的数据类型,可以作为作为key类型和value类型:

Text:存储UTF8文本
BytesWritable:存储一个字节序列
VIntWritable和VLongWritable:存储变长整型和长整型值
NullWritable:这是零长度的Writable类型(可以在不希望使用key或value类型的时候使用)

下面是Hadoop内置的集合数据类型【只能作为value类型】:

ArrayWritable:存储属于Writable类型的值数组(要使用ArrayWritable类型作为reduce输入的value类型,需要创建ArrayWritable的子类来指定存储在其中的Writable值的类型):

 public class LongArrayWritable extends ArrayWritable{ 
public LongArrayWritable(){
super(LongWritable.class);
}
}

TwoDArrayWritable:存储属于同一个Writable类型的值矩阵(要使用TwoDArrayWritable类型作为reduce输入的value类型,需要创建与ArrayWritable类型相似的TwoDArrayWritable类型的子类来指定存储的值的类型)
MapWritable:存储键值对的映射(键和值应该是Writable数据类型)
SortedMapWritable:存储键值对的有序映射(键应该事先WritableComparable接口)

实现自定义的Hadoop Writable数据类型

通过org.apache.hadoop.io.Writable接口编写一个定制Writable数据类型用于定义数据类型的序列化格式(基于Writable的接口类型可以用来作为Hadooop MapReduce计算的value类型)

假设日志包含5个部分:请求的主机,时间戳,请求的URL,相应大小,HTTP状态码,如下:

192.168.0.2 -- [01/Jul.1995:00:00:01-0400] "GET/history/appollo/HTTP/1.0" 200 6245

实现日志条目的自定义Hadoop Writable数据类型的步骤:
1.血一个新的LogWritable类实现org.apache.hadoop.io.Writable接口

public class LogWritable implements Writable{
private Text userIP;
private Text timestamp;
private Text request;
private IntWritable responseSize;
private IntWritable status;

public LogWritable(){
this.userIP = new Text();
this.timestamp = new Text();
this.request = new Text();
this.responseSize = new IntWritable();
this.status = new IntWritable();
}
public void readFields(DataInput in)throws IOException{
userIP.readFields(in);
timestamp.readFields(in);
request.readFields(in);
responseSize.readFields(in);
status.readFields(in);
}
public void write(DataOuput out)throws IOException{
userIP.write(out);
timestamp.write(out);
request.write(out);
responseSize.write(out);
status.write(out);
}
. . .//getters and setters for the fields
}

2.使用新的LogWritable类型作为MapReduce计算的value类型
例如,使用LogWritable类型作为Map输出值的类型

public class LogProcessMap extends Mapper<LongWritable,Text,Text,LogWritable>{
...
}
public class LogProcessReduce extends Reducer<Text,LogWritable,Text,IntWritable>{
public void reduce(Text key,Iterable<LogWritable> values,Context context){
...
}
}

3.配置相应的作用的输出类型

Job job = new Job(...);
(Job job = Job.getInstance();)
...
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LogWritable.class);

工作原理
Writable接口包含2个方法:readFields()he write(),在readFields()方法中我们反序列化输入数据,并填充Writable对象的字段:

public void readFields(DataInput in)throws IOException{
userIP.readFields(in);
timestamp.readFields(in);
request.readFields(in);
responseSize.readFields(in);
status.readFields(in);
}

在上面的示例中,使用Writable类型作为自定义的Writable类型的字段,并且使用readFields()方法从DataInput对象的数据字段中反序列化数据,【当然也可以使用Java的基本数据类型作为Writabel类型的字段,并使用DataInput对象的想要读取方法从基础流中读取值】,如下面的代码所示:

int responseSize = in.readInt();
String userIP = in.readUTF();

在Write()方法中,在底层流中写入Writable对象的字段

public void write(DataOuput out)throws IOException{
userIP.write(out);
timestamp.write(out);
request.write(out);
responseSize.write(out);
status.write(out);
}

如果使用的是Java基本数据类型作为Writable对象的字段,可以使用DataOutPut对象中的对应写入方法写入底层流的值:

out.writeInt(responseSize);
out.writeUTF(userIP);

在实现自定义的Writable数据类型时,需要注意一下问题:

1.如果要添加一个自定义的构造函数用于自定义的Writable类,一定要保持默认的空构造函数
2.TextOutputFormat使用toString()方法来序列化key和value类型,如果使用的是TextOutputFormat序列化自定义的Writable类型的实例,那么要确保用于自定义Writable数据类型的是一个有意义的toString()实现
3.在读取输入数据时,Hadoop可多次重复使用Writable类的一个实例,在ReadFields()方法里面填充字段时,不应该依赖于该对象的现有状态

实现自定义Hadoop key类型

Hadoop MapReduce的key类型的实例应该可以进行相互比较来满足排序的目的,为了在一个MapReduce计算中用作键类型,Hadoop的Writable数据类型应该实现org.apache.hadoop.io.WritableComparable<T>接口,WritableComparable接口继承于org.apache.hadoop.io.Writable接口,并增加了compareTo()方法来执行比较
使用日志数据来实现Hadoop WritableComparable数据类型的步骤:
1.修改LogWritable类来实现org.apache.hadoop.io.WritableComparable接口

public class LogWritable implements WritableComparable<LogWritable>{
private Text userIP;
private Text timestamp;
private Text request;
private IntWritable responseSize;
private IntWritable status;

public LogWritable(){
this.userIP = new Text();
this.timestamp = new Text();
this.request = new Text();
this.responseSize = new IntWritable();
this.status = new IntWritable();
}
public void readFields(DataInput in)throws IOException{
userIP.readFields(in);
timestamp.readFields(in);
request.readFields(in);
responseSize.readFields(in);
status.readFields(in);
}
public void write(DataOuput out)throws IOException{
userIP.write(out);
timestamp.write(out);
request.write(out);
responseSize.write(out);
status.write(out);
}
public int compareTo(LogWritable o){
if(userIP.compareTo(o.userIP) == 0){
return (timestamp.compareTo(o.timestamp));
}else return (userIP.compareTo(o.userIP));
}
public boolean equals(Object o){
if(o instanceof LogWritable){
LogWritable other = (LogWritable) o;
return userIP.equals(other.userIP)&&timestamp.equals(other.timestamp);
}
return false;
}
public int hashCode(){
return userIP.hashCode();
}
. . .//getters and setters for the fields
}

2.使用LogWritable类型作为MapReduce计算中key类型或value类型
例如,使用LogWritable类型作为Map输出key类型

public class LogProcessMap extends Mapper<LongWritable,Text,LogWritableIntWritable>{
...
}
public class LogProcessReduce extends Reducer<LogWritable,IntWritableText,IntWritable>{
public void reduce(LogWritable key,Iterable<IntWritable> values,Context context){
...
}
}

3.配置相应的作业输出类型

Job job = new Job(...);
(Job job = Job.getInstance();)
...
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setMapOutputKeyClass(LogWritable.class);
job.setMapOutputValueClass(IntWritable.class);

工作原理:
除了Writable接口的readFields()he write()方法之外,WritableComparable接口还引入了compareTo()方法,compareTo()方法的返回值有3种类型:负整数,0或正整数,分别表示当前对象小于、等于或大于被比较对象,在LogWritable实现中,如果2个用户的IP地址和时间戳是相同的,那么就认为这2个对象是相等的,如果对象不相等,就决定排序,首先根据IP地址排序,在根据时间戳排序:

public int compareTo(LogWritable o){
if(userIP.compareTo(o.userIP) == 0){
return (timestamp.compareTo(o.timestamp));
}else return (userIP.compareTo(o.userIP));
}

Hadoop使用HashPartitioner作为默认Partitioner实现,来计算中间数据在reducer中的分布,HashPartitioner需要键对象的hashcode()方法来满足一下2个属性:

1.在不同JVM实例提供相同哈希值
2.提供哈希值的均匀分布

因此,必须实现一个稳定的hashcode()方法,以使自定义的Hadoop key类型满足上述2个要求。在logWritable实现中,使用请求的主机名/IP地址的哈希吗作为LogWritable实例的哈希代码,保证了中间LogWritable数据能基于该请求的主机名/IP地址被正确地区分:

public int hashCode(){
return userIP.hashCode();
}

从mapper中输出不同值类型的数据

在执行reducer端join操作时,或者在多个MapReduce计算中将不同属性类型的数据聚合成一个数据集合时需要避免复制性时,从mapper中输出属于多个值类型的数据集合,是非常有用的,然而,Hadoop reducer 不允许有徐对各输入值类型,在这种情况下,可以使用GenericWritable类来包装属于不同数据类型的多个value实例

还是基于日志数据,将汇总从web服务器到特定直接的总字节数,同时输出一个由特定主机请求、由制表符分发的URL列表,使用IntWritable从mapper输出总字节数,同时,使用Text输出请求URL

下面步骤显示如何实现Hadoop GenericWritable数据类型,该数据类型可以封装数据类型为IntWritable或Text实例:
1.写一个类扩展org.apache.hadoop.io.GenericWritable类,实现getType()方法,返回将要使用的Witable类的数组,如果要添加一个自定义的构造函数,那么要确保添加的是一个无参数的默认构造函数

public class MultiValueWritable extends GenericWritable{
private static Class[] CLASSES = new class[]{IntWritable.class,Text.class};
public MultiValueWritable(){
}
public MultiValueWritable(Writable value){
set(value);
}
protected Class[] getType(){
return CLASSES;
}
}

2.设置MultiValueWritable作为mapper的输值类型,使用MultiValueWritable类的实例,封装mapper的输出Writable值

public class LogProcessMap extends Mapper<Object,Text,Text,MultiValueWritable>{
private Text userHostText = new Text();
private Text requestText = new Text();
private IntWritable responseSize = new IntWritable();
}
public void map(Object key, Text value, Context context)...{
...//parse the value (log entry)using a regex.
userHostText.set(userHost);
requestText.set(request);
byteWritable.set(responseSize);
context.write(userHostText,new MultiValueWritable(requestText));
context.write(userHostText,new MultiValueWritable(responseSize));
}
}

3.将reducer的输入值类型设置为MultiValueWritable,实现reduce()方法来处理多个值类型

public class LogProcessReduce extends Reducer<TextMultiValueWritableTextText>{
private Text result = new Text();
public void reduce(Text key, Iterable<MultiValueWritable> values, Context context)...{
int sum = 0;
StringBuilder requests = new StringBuilder();
for(MultiValueWritable multiValueWritable : values){
Writable writable = multiValueWritable.get();
if(writable instanceof IntWritable){
sum += ((IntWritable)writable).get();
}else{
requests.append((Text)writable).toString();
requests.append("\t");
}
}
result.set(sum + "\t" + requests);
context.write(key,result);
}
}

4.设置MultiValueWritable作为本次计算的Map输出值

Configuration conf = new Configuration();
Job job = new Job(conf, "log-analysis");
...
job.setMapOutputValueClass(MultiValueWritable.class);

工作原理
GenericWritable实现应该继承org.apache.hadoop.io.GenericWritable,并应该指定一组Writable值类型来进行封装,实现从getType()方法返回CLASSES数组,GenericWritable实现使用类数组之歌的索引类序列化和反序列化数据:

private static Class[] CLASSES = new class[]{IntWritable.class,Text.class};
protected Class[] getType(){
return CLASSES;
}

在这个mapper中,使用GenericWritable的实现来封装每个值的实例:

private Text requestText = new Text();
context.write(userHostText,new MultiValueWritable(requestText));

reducer实现必须手动处理不同值类型

if(writable instanceof IntWritable){
sum += ((IntWritable)writable).get();
}else{
requests.append((Text)writable).toString();
requests.append("\t");
}
}

org.apache.hadoop.io.ObjectWritable是另一个类,可以用来实现和GenericWritable一样的目标,ObjectWritable类可以处理Java基本类型,字符串和数组等除了Writable封装需要的类型,Hadoop通过将实例类名写入每个序列化条目的方式来序列化ObjectWritable实例,这使得它与GenericWritable这种基于类的方式比起来不够高效。

为输入数据个数选择合适的Hadoop InputFormat

Hadoop通过InputFormat来支持许多不同的格式和类型的数据处理,Hadoop MapReduce计算的InputFormat通过解析输入数据类生成用于mapper的键值对输入InputFormat还执行将输入数据分割成逻辑分区,基本上决定了MapReduce计算的Map任务数,并间接的决定了Map任务的执行位置,Hadoop为每个逻辑分区生成map任务,并使用键值对作为逻辑切分,调用相应的mapper

基于FileInputFormat的KeyValueTextInputFormat作为Hadoop Mapreduce计算的InputFormat
1.指定KeyValueTextInputFormat作为InputFormat,Hadoop MapReduce计算使用Job对象如下:

Configuration conf = new Configuration();
Job job = new Job(conf,"log-analysis");
...
SetInputFormat(KeyValueTextInputFormat.class);

2.设置作业的输入路径

FileInputFormat。setInputPath(job,new Path(inputpath));

工作原理:
KeyValueTextInputFormat是一种纯文本的输入格式,它为输入文本的每一行生成一个键值记录,输入数据的每一行使用分隔符生成了键(Text)、值(Text)对;默认的分隔符是制表符;如果某行不包含分隔符,整行文本将被视为键和值为空,可以通过设置作业的配置对象的属性指定自定义分隔符,如下所示:

conf.set("key.value.separator.in.input.line",",");

KeyValueTextInputFormat基于FileInputFormat,FileInputFormat则是一种基于文件的InputFormat的基类;使用FileInputFormat类的SetInputPaths()方法指定MapReduce计算的输入路径;使用任何基于FileInputFormat类的InputFormat时,必须执行如下步骤:

FileInputFormat.setInputPaths(Job,new Path(inputpath));

可以通过提供一个逗号分隔的路径列表来为MapReduce计算提供多个HDFS中的输入路径,也可以使用FileInputFormat类的addInputPath静态方法添加额外的计算输入路径:

public static void setInputPaths(JobConf conf, Path. . . inputpaths);
public static void addInputPath(JobConf conf, Path path);

确保mapper输入的数据类型与MapReduce计算所使用的InputFormat产生的数据类型相匹配
Hadoop提供的InputFormat实现,以支持多个公共数据格式:

TextInputFormat:用于纯文本文件;TextInputFormat为输入文本文件的每一行生成一个键值对记录;对于每一行,键(LongWritable)是行在文件中的字节偏移量,值(Text)是行的文本内容;TextInputFormat是Hadoop默认的InputFormat
NLineInputFormat:用于纯文本文件; NLineInputFormat将输入文件转换为固定数目行的逻辑切分;当map任务需要输入固定数目的行的时候,可以使用 NLineInputFormat;键(LongWritable)和值(Text)以类似于TextInputFormat风分割产生用于每个行(Text)的记录;默认情况下,NLineInputFormat为每行创建一个逻辑切分(对应一个Map任务),可以按如下方式制定每个切分行(或每个Map任务的键值对)的数目,NLineInputFormat为输入文本文件的每行生成一个键值对记录NLineInputFormat.setNumLinesPerSplit(job,50);
SequenceFileInputFormat:用于Hadoop顺序文件输入数据,Hadoop顺序文件将数据存储为二进制键值对,并支持数据的压缩;当一个MapReduce计算的结果是顺序文件格式时,使用顺序文件格式作为MapReduce计算的输入,SequenceFileInputFormat显得非常有用

SequenceFileAsBinaryInputFormat:这是一种以原始二进制格式程序键(ByteWritable)值(ByteWritable)对的格式,是SequenceInputFormat的子类
SequenceFileAsTextInputFormat:这是一个以字符串形式呈现键(Text)和值(Text)的输入格式,是SequenceInputFormat的子类

DBInputFormat:这是支持从SQL表中读取输入数据,用于MapReduce计算的输入格式,DBInputFormat使用记录号作为键(LongWritable)。使用查询结果的记录作为值(DBWrItable)

在一个MapReduce应用程序中,使用多个输入数据类型和多个mapper实现

可以使用Hadoop的MultipleInputs功能类运行具有多个输入路径的MapReduce作业,同时制定用于每个路径的不同InputFormat和(可选的)mapper;Hadoop将输出路由到不同的mapper实例上,使用单一类型reducer实例执行MapReduce计算输出;当需要处理多个具有相同含义的数据集时,如果这些数据集具有不同的输入格式(逗号分隔的数据集合制表符分隔的数据集),那么对于多个输入具有不同的InputFormat实现是非常有用的

可以使用MultipleInputs类的addInputPath静态方法,来将输入路径和各自路径相应的InputFormat添加到MapReduce计算:

public static void addInputPath(Job,Path path,class<?extendsInputFormat>inputFormatClass)

具体实例如下:

MultipleInputs.addInputPath(job,path1,CSVInputFormat.class);
MultipleInputs.addInputPath(job,path1,TabInputFormat.class);

当为2个或者更多个数据集执行一个reduce端join时,不同mapper和InputFormat拥有多个同输入路径的特性,显得非常有用:

public static void addInputPath(JobConf conf,Path path,class<?extendsInputFormat>inputFormatClassclass<?extends Mapper>mapperClass)

具体实例如下:

MultipleInputs.addInputPath(job,accessLogPath,TextInputFormat.class,AccessLogMapper.class);
MultipleInputs.addInputPath(job,userDarPath,TextInputFormat.class,UserDataMapper.class);

实现自定义的InputFormat

InputFormat实现应该扩展org.apache.hadoop.mapreduce.InputFormat<K,V>抽象类,并重写createRecordReader()和getSplits()方法

实现InputFormat和RecordReader,用于处理日志文件,该InputFormat将产生LongWritable实例的键和LogWritable实例的值
基于FileInputFormat的自定义InputFormat,用于处理日志文件
1.LogFileInputFormat要直接操作存储在HDFS文件中的数据,因此实现一个扩展自FileInputFormat的LogFileInputFormat

public class LogFileInputFormat extends FileInputFormat<LongWritable,LogWritable>{
public RecordReader<LongWritable,LogWritable> createRecorder(InputSplit arg0,TaskAttemptContext arg1)throws . . .{
return new LogFileRecorderReader();
}
}

2.实现LogFileRecordReader类

Public class LogFileRecordReader extends RecordReader<LongWritable,LogWritable>{
LineRecordReader lineReader;
LogWritable value;

public void initialize(InputSplit inputSplit,TaskAttempContext attempt)...{
lineReader = new LineRecordReader();
lineReader.initialize(inputSplit,attempt);
}

public boolean nextKeyValue()throws IOException, ... {
if(!lineReader.nextKeyValue())
return false;

String line = lineReader.getCurrentValue().toString();
...//Extract the fields from 'line'using a regex

value = new LogWritable(userIP,timestamp,request,status,bytes);
return true;
}

public LongWritable getCurrentKey() throws ...{
return lineReader.getCurrentKey();
}

public LogWritable getCurrentValue() throws ...{
return value;
}

public float getProgress() throws IOException, ... {
return lineReader.getProgress();
}

public void close() throws IOException{
lineReader.close();
}
}

3.指定LogFileInputFormat作为InputFormat,用于使用Job对象的MapReduce计算,代码如下,指定使用底层FileInputFormat用于计算的输入路径:

Configuration conf = new Configuration();
Job job = new Job(conf, "log-analysis")l
...
job.setInputFormatClass(LogFileInputFormat.class);
FileInputFormat.setInputPath(job,new Path(inputpath));

4.确保计算的mapper使用LongWritable作为输入key类型,LogWritable作为输入的value类型:

public class LogProcessMap extends Mapper<LongWritable, LogWritbale,Text,IntWritable>{
public void map(LongWritable key, LogWritable value, Context context) throws ...{
...
}
}

工作原理
LogFileInputFormat继承FileInputFormat,提供了一个通用的基于InputFormat分割HDFS文件的机制,在
LogFileInputFormat中覆盖createRecordReader()方法,提供自定义的RecordReader实现,一个LogFileRecordReader实例,或者可以重写FileInputFormat的isSplitable()方法来控制输入文件是否被分割为几个逻辑分区,或作为整个文件:

public RecordReader<LongWritable,LogWritable> 
createRecorder(InputSplit arg0,TaskAttemptContext arg1)throws . . .{
return new LogFileRecorderReader();
}

LogFileRecordReader类继承了org.apache.hadoop.mapreduce.InputFormat<K,V>抽象类,并在内部使用LineRecordReader来执行输入数据的基本分析,LineRecordReader以行的形式读取输入数据:

lineReader = new LineRecordReader();
lineReader.initialize(inputSplit,attempt);

通过执行nextKeyValue()方法来进行输入数据的日志条目定义解析,使用正则表达式提取日志条目的各个字段,并产生LogWritable类一个实例类封装这个字段:

public boolean nextKeyValue()throws IOException, ... {
if(!lineReader.nextKeyValue())
return false;

String line = lineReader.getCurrentValue().toString();
...//Extract the fields from 'line'using a regex

value = new LogWritable(userIP,timestamp,request,status,bytes);
return true;
}

可以重写InputFormat类的getSplit()方法执行输入数据的自定义分割,getSplit()方法返回InputSplit对象的列表;一个InputSplit对象表示输入数据的一个逻辑分区并且将被分配到一个单独的Map任务上

InputSplit类扩展自InputSplit抽象类,应该覆盖getLocations()和getLength()方法;getLength()方法应该提供分割的长度,而getLocations()方法应该提供列表节点,其中有这种分割所表示的数据的物理存储

格式化MapReduce计算的结果–使用Hadoop的OutputFormat

使用Hadoop OutputFormat接口来为MapReduce计算的输出定义数据存储格式、数据存储位置和数据组织形式,OutputFormat准备输出位置,并提供一个RecordWriter的实现来执行实际的数据序列化和存储

Hadoop使用org.apache.hadoop.mapreduce.lib.output.TextOutputFormat<K,V>作为MapReduce计算的默认OutputFormat;TextOutputFormat将数据记录输出到HDFS的文本文件中,每个单独的行保存一条记录,TextOutputFormat会使用制表符来分割键和值;TextOutputFormat扩展FileOutputFormat,这是可用于所有的基于文件的输出格式的基类

使用基于FileOutputFormat的SequenceFileOutputFormat作为OutputFormat,用于Hadoop MapReduce计算
1.将指定org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat<K,V>作为OutputFormat用于一个Hadoop MapReduce计算使用Job对象:

Configuration conf = new Configuration();
Job job = new Job(conf,"log-analysis");
...
job.setOutputFormat(SequenceFileOutputFormat.class);

2.设置作业的输出路径

FileOutputFormat.setOutputPath(job,new Path(outputpath));

工作原理

SequenceFileOutputFormat将数据序列化到Hadoop序列文件,Hadoop顺序文件将数据存储为二进制键值对,支持数据压缩,序列文件是高效存储的非文本文件格式;如果MapReduce计算的输出要作为另一个MapReduce计算的输入,可以使用序列化文件来存储前一个MpaReduce的计算结果

FileOutputFormat是SequenceFileOutputFormat的基类,它是所有基于文件的OutputFormat的基类,使用FileOutputFormat的setOutputPath()方法指定输出路径,任何基于FileOutputFormat的OutputFormat必须执行如下操作:

FileOutputFormat.setOutputPath(job,new Path(outputpath));

通过继承org.apache.hadoop.mapreduce.OutputFormat<K,V>抽象类,可以实现自定有点OutputFormat类,把MapReduce计算的输出写出专用的或自定义的数据格式,并且/或者存储在HDFS之外的存储系统上

Hadoop的中间数据分区

Hadoop在整个reduce任务的计算过程中,对map任务生成的中间数据进行分区(一个适当的分区函数能确保每个reduce任务负载平衡)【分区也可以将相关的记录集分组,发送给特定的人reduce任务(如果希望某些输出被加工或组合在一起的话)】

Hadoop基于中间数据键空间划分中间数据,来决定哪个reduce任务将接收哪个中间结果,有序集合分区的键和它们的值将作为一个reduce任务的输入(在Hadoop中,分区的总数等于Reduce任务的数目)

Hadoop Partitioner应扩展 org.apache.hadoop.mapreduce.Partitioner<KEY,VALUE>抽象类【Hadoop使用org.apache.hadoop.mapreduce.lib.partitioner.HashPartitioner作为默认的Partitioner】

HashPartitioner分区基于其hashcode()划分键,使用公司key.hashcode() mod r,其中r是reduce的任务数

实现自定义的用于分析日志的Partitioner,基于地理区域划分键(IP地址)
1.实现IPBasedPartitioner扩展Partitioner抽象类

public class IPBasedPartitioner extends Partitioner<Text,IntWritable>{
public int getPartition(Text ipAdress, IntWritable value, int numPartitions)
{
String region = getGeoLocation(ipAdress);
if(region != null){
return ((region.hashcode() & Integer.MAX_VALUE)%numPatririons);
}
return 0;
}
}

2.在Job对象总设置Partitioner类的参数

Job job = new Job(conf,"log-analysis");
...
job.setPartitionerClass(IPBasedPartitioner.class);

工作原理
在中间数据上执行分区逻辑,使得来自同一个IP的请求被发送到相同的reduce实例,getGeoLocation()方法返回给定IP地址的地理位置,例子中省略了getGeoLocation()方法的具体实现,然后得到地理位置的hashcode(),执行模运算来䋀请求的reducer

public int getPartition(Text ipAdress, IntWritable value, int numPartitions)
{
String region = getGeoLocation(ipAdress);
if(region != null){
return ((region.hashcode() & Integer.MAX_VALUE) % numPartitions);
}
return 0;
}

Hadoop的默认分区(HashPartitioner)在对中间数据进行分区的时候不强制进行排序
TotalOrderPartitioner和KeyFieldPartitioner是有Hadoop提供的2个内置的Partitioner实现:

TotalOrderPartitioner扩展org.apache.hadoop.mapreduce.lib.Partitioner.TotalOrderPartitioner<K,V>,reducer的输入记录集是有序的,以确保输入分区中有正确排序(如果要确保全局有序,可以使用TotalOrderPartitioner强制获得一个全局顺序以减少每个reducer任务的输入记录数)【TotalOrderPartitioner需要一个分区文件作为输入,来定义分区的范围,org.apache.hadoop.mapreduce.lib.partitionerInputSampleer工具运行我们通过采样输入数据,来生成用于TotalOrderPartitioner的分区文件】TotalOrderPartitioner主要用于Hadoop的TeraSort基准测试

Job job = new Job(conf,"Sort");
...
job.setPartitionerClass(TotalOrderPartitioner.class);
TotalOrderPartitioner.setPartitionFile(jonconf,partitionFile);

KeyFieldPartitioner扩展org.apache.hadoop.mapreduce.lib.Partitioner.KeyFieldBasedPartitioner<K,V>可以用来划分基于部分键的中间数据,可以使用分隔字符串将每个键分割成一组字段,在进行分区的时候,可以指定需要考虑字段的索引,还可以在字段中指定字符的索引

将共享资源传播和分发到MapReduce作业的任务中–Hadoop DistributedCache

利用Hadoop DistributedCache来基于资源(这些资源可以是数据文件,归档文件,Jar文件)给Map和Reuduce任务分发只读文件,它们都是通过mapper或reducer执行计算时需要的

如何将文件添加到Hadoop DistributedCache以及如何从Map和Reuduce任务中获取:

1.将资源复制到HDFS,也可以使用已经存在于HDFS中的文件
bin/hadoop fs -copyFromLocal ip2loc.dat ip2loc.dat
2.从主程序添加资源到DistributedCache
Job job = new Job(conf,"log-analysis");
. . .
DistributedCache.addCacheFile(new URI("ip2loc.dat # ip2location"),job.getConfiguration());

3.从mapper或reducer的setup()方法中获取资源,并在map()或reduce()函数哄使用这些数据

public class LogProcessMap extends Mapper<Object ,LogWritable, Text, IntWritable>{
private IPLookup lookupTable;
public void setup(Context context) throws IOException{
File lookipDb = new File("ip2location");//load the ip lookup table to memory
lookupTable = IPLookup.LoadData(lookupDb);
}
public void map(...){
String country = lookupTable.getCountry(value.ipAddress);
. . .
}
}

工作原理
在任意的作业任务执行前,Hadoop会将已经添加到DistributedCache的文件复制到颙的工作节点,每个作业,DistributedCache复制这些文件一次;Hadoop还支持通过增加一个具有需要的URI链接名的片段,在当前计算工作目录创建链接到DistributedCache中文件的符号链接;在下面的例子中,使用ip2Location作为DistributedCache中ip2loc.dat文件的符号链接:

DistributedCache.addCacheFile(new URI("/data/ip2loc.dat # ip2location"),job.getConfiguration());

在mapper或reducer的setup()方法中,从DistributedCache中解析和加载数据(带符号链接的文件可以使用提供的链接名从工作目录访问到):

private IPLookup lookupTable;
public void setup(Context context) throws IOException{
File lookupDb = new File("ip2location");//load the ip lookup table to memory
lookupTable = IPLookup.LoadData(lookupDb);
}
public void map(...){
String country = lookupTable.getCountry(value.ipAddress);
. . .
}

也可以使用getLocalCacheFiles()方法,在不适用符号链接的情况下,直接访问DistributedCache中的数据:

Path[] cacheFiles = DistributedCache.getLocalCacheFiles(conf);

DistributedCache不能再Hadoop本地模式下工作

使用DistributedCache分发压缩档案
Hadoop在工作节点自动提取压缩档案中的文件,也可以提供符号链接来使用URI片段的档案:

Job job = new Job(conf,"log-analysis");
. . .
DistributedCache.addCacheArchive(new URI("/data/ip2locationdb.tar.gz#ip2locationdb"),job.getConfiguration());`

归档文件的解压缩目录可以使用上面提供的符号链接从mapper或reducer所在工作目录访问:

public void setup(Context context) throws IOException{
File lookupDir = new File("ip2locationdb");
String[] children = lookupDir.list();
. . .
}

也可以直接在mapper或reducer中实现,用下面的方法直接访问未提前的DistributedCache存档文件:

Path[] cachePath;
public void setup(Context context)throws IOException{
Configuration conf = context.getConfiguration();
cachePath = DistributedCache.getLocationCacheArchives(conf);
. . .
}

在命令行中将资源添加到DistributedCache
Hadoop支持使用命令行将文件或压缩归档添加到DistributedCache,前提是MR主程序实现了org.apache.hadoop.util.Tool接口或利用了org.apache.hadoop.GenericOptionsParser类;文件可以使用-files命令行选项添加到DistributedCache,归档文件可以使用-archives命令行选项添加,JAR文件使用-libjars选项添加(文件或存档可以在Hadoop可以访问的任意文件系统中,包括本地文件系统)【这些选项支持逗号分隔的路径列表以及使用URI片段符号链接的创建】

bin/hadoop jar C4LogProcessor.jar LogProcessor - files ip2location.dat#ip2location indir outdir
bin/hadoop jar C4LogProcessor.jar LogProcessor -archives ip2locationdb.tar.gz#ip2locationdb indir outdir
bin/hadoop jar C4LogProcessor.jar LogProcessor -libjars ip2locationResolvers.jar indir outdir

使用DistributedCache将资源添加到类路径
可以使用DistributedCache来分发Jar文件和其他mapper或reducer的依赖库,可以在客户端主程序中使用下列方法,将JAR文件添加到运行mapper或reducer的JVM类路径中:

public static void addFileToClassPath(Path file, Configuration conf, FileSystem fs)
public static void addArchiveToClassPath(Path archive, Configuration conf, FileSystem fs)

添加MapReduce作业之间的依赖关系

Hadoop的ControlledJob和JobControl来提供了一种机制,通过指定2个MR作业之间的依赖关系来执行MR作业的简单工作流图

例:log-anaylsis计算依赖于log-grep计算:
1.为第一个MR作业创建Configuration和Job对象,并填充其他必要配置项:

Job job1 = new Job(getConf(),"log-grep");
job1.setJarByClass(RegexMapper.class);
job1.setMapperClass(RegexMapper.class);
FileInputFormat.setInputPaths(job1,new Path(inputpath));
FileOutputFormat.setOutputPaths(job1,new Path(intermedpath));
. . .

2.为第二个MR作业创建Configuration和Job对象,并填充其他必要配置项:

Job job2 = new Job(getConf(),"log-analysis");
job2.setJarByClass(LogProcessorMap.class);
job2.setMapperClass(LogProcessorMap.class);
job2.setReducerClass(LogProcessorReduce.class);
FileOutputFormat.setOutputPaths(job2,new Path(outputpath));
. . .

3.设置第一个作业的输出目录,并将该目录作为第二个作业的输入目录

FileInputFormat.setInputPaths(job2,new Path(intermedpath+"part*"));

4.使用上述创建的Job对象来创建ControlledJob对象

ControlledJob controlledJob1 = new ControlJob(job1.getConfiguration());
ControlledJob controlledJob2 = new ControlJob(job2.getConfiguration());

5.将第一个作业添加成第二个作业的依赖项

controlledJob2.addDependingJob(controlledJob1);

6.为作业创建JobControl对象,并将步骤4创建的ControlledJob对象添加到新创建的JobControl对象中

JobControl jobControl = new JobControl("JobControlDemoGroup");
jobControl.addJob(controlledJob1);
jobControl.addJob(controlledJob2);

7.创建一个新的线程来运行添加到JobControl对象的作业组,启动线程,并等待期完成

Thread jobControlThread = new Thread(jobcontrol);
jobControlThread.start();
while(!jobControlThread.allFinished()){
Thread.sleep(500);
}
jobControl.stop();

工作原理
ControlledJob类封装了MR作业,并提供跟踪作业依赖性的功能,当且仅当所有的依赖的作业成功完成时,ControlledJob类配置依赖的作业才会变为提交状态,如果有任何一个作业失败,ControlJob就会失败
JobControl类封装了一组ControlledJob及其依赖,JobControl跟踪分装的ControlledJob的状态,并包含一个线程用于提交READY状态的作业

Apache Oozie是Hadoop MR的计算工作流系统。可以使用Oozie以有向无环图方式执行MR计算

用于报告自定义指标的Hadoop计数器

Hadoop使用一组计数器来聚合MR计算的指标,Hadoop计算器有助于理解MR程序的行为,并跟踪MR计算的进度,可以定义自定义计数器来跟踪MR计算中的应用程序特定指标

自定义计数器,在日志处理的应用程序中统计不良或损坏的记录数
1.通过定义一个枚举自定义计数器列表:

public static num LOG_PROCESSOR_COUNTER{BAD_RECORDS};

2.在mapper或reducer中增加计数器值:

context.getCounter(LOG_PROCESSOR_COUNTER.BAD_RECORDS).increment(1);

3.将以下内容添加到主程序来访问计数器:

Job job = new Job(conf,"log-analysis");
. . .
Counters counters = job.getCounters();
Counter badRecordsCounter = counters.findCounter(LOG_PROCESSOR_COUNTER.BAD_RECORDS);
System.out.println("# of Bad Records : "+badRecordsCounters.getValue());

4.执行Hadoop MR计算,可以在管理控制台或命令行查看计数器的值:

bin/hadop jar C4LogProcessor.jar \ demo.LogProcessor in out 1

工作原理
必须使用枚举来定义程序中的自定义计数器,同一组中的枚举计数器将形成计数器组,JobTracker(AM)会聚合mapper和reducer报告的计数器值