干货--Hadoop自定义数据类型和自定义输入输出格式整合项目案例

时间:2021-02-09 20:57:42
正文开始前 ,先介绍几个概念

序列化
所谓序列化,是指将结构化对象转化为字节流,以便在网络上传输或写到磁盘进行永久存储。


反序列化
是指将字节流转回到结构化对象的逆过程


序列化在分布式数据处理的两个大领域经常出现:进程间通信和永久存储


在Hadoop中,系统中多个节点上进程间的通信是通过"远程过程调用"(remote procedure call,RPC)实现的 。RPC协议将消息序列化成二进制流后发送到远程节点,远程节点接着将二进制流反序列化为原始消息


Hadoop使用了自己写的序列化格式 Writable ,它格式紧凑,速度快,但是它很难用Java以外的语言进行拓展或使用,因为Writable是Hadoop的核心,大多数MapReduce程序都会为键和值使用它




简单来说,RPC协议是让程序员可以调用远程计算机进程上的代码的一套工具


打个比方  就是A通过网络调用B的某个进程方法


通信中的协议是由程序员自己规定的,比如你可以规定说当A向B发送数字1, B就打印hello hadoop, 并返回数字1给A, 如果发送数字2,B就打印hello world并发送数字2给A.  


序列化------------>写  write(DataOutput out)


反序列化-------->读   readFields(DataInput in)


首先声明:我是基于Hadoop2.6.4版本

 一.  Hadoop内置的数据类型


BooleanWritable:标准布尔型数值


ByteWritable:单字节数值


DoubleWritable:双字节数值


FloatWritable:浮点数


IntWritable:整型数


LongWritable:长整型数


Text:使用UTF8格式存储的文本


NullWritable:当<key, value>中的key或value为空时使用

 Hadoop中的数据类型都要实现Writable接口,以便用这些类型定义的数据可以被网络传输和文件存储。


 二. 用户自定义数据类型的实现


     1.继承接口Writable,实现其方法write()和readFields(), 以便该数据能被序列化后完成网络传输或文件输入/输出;


     2.如果该数据需要作为主键key使用,或需要比较数值大小时,则需要实现WritalbeComparable接口,实现其方法write(),readFields(),CompareTo() 。


     3.数据类型,必须要有一个无参的构造方法,为了方便反射,进行创建对象。    


     4.在自定义数据类型中,建议使用java的原生数据类型,最好不要使用Hadoop对原生类型进行封装的数据类型。比如 int x ;//IntWritable 和String s; //Text  等等


下面是一个自定义的数据类型  3D坐标轴 

package com.tg.type;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

public class Point3D implements WritableComparable<Point3D> {
public float x, y, z;
public Point3D(float fx, float fy, float fz) {
this.x = fx;
this.y = fy;
this.z = fz;
}
public Point3D() {
this(0.0f, 0.0f, 0.0f);
}
public void readFields(DataInput in) throws IOException {
x = in.readFloat();
y = in.readFloat();
z = in.readFloat();
}
@Override
public void write(DataOutput out) throws IOException {
out.writeFloat(x);
out.writeFloat(y);
out.writeFloat(z);
}
public String toString() {
return "X:"+Float.toString(x) + ", "
+ "Y:"+Float.toString(y) + ", "
+ "Z:"+Float.toString(z);
}
public float distanceFromOrigin() {
return (float) Math.sqrt( x*x + y*y +z*z);
}
public int compareTo(Point3D other) {
return Float.compare(
distanceFromOrigin(),
other.distanceFromOrigin());
}
public boolean equals(Object o) {
if( !(o instanceof Point3D)) {
return false;
}
Point3D other = (Point3D) o;
return this.x == other.x && this.y == other.y && this.z == other.z;
}
/* 实现 hashCode() 方法很重要
* Hadoop的Partitioners会用到这个方法,后面再说
*/
public int hashCode() {
return Float.floatToIntBits(x)
^ Float.floatToIntBits(y)
^ Float.floatToIntBits(z);
}

}

下面讲数据输入输出格式和自定义数据输入输出格式 ,然后把上面讲过的自定义数据类型整合进去


首先看看输入文件a.txt

干货--Hadoop自定义数据类型和自定义输入输出格式整合项目案例

数据输入格式(InputFormat) 用于描述MapReduce作业的数据输入规范。MapReduce框架依靠数据输入格式完成输入规范检查(比如输入文件目录的检查)、对数据文件进行输入分块(也叫分片,InputSplit),以及提供从输入分块(分片)中将数据记录逐一读出,并转化为Map过程的输入键值对等功能

Hadoop提供了丰富的内置数据输入格式。最常用的数据输入格式包括:TextInputFormat和KeyValueInputFormat
TextInputFormat是系统默认的数据输入格式,可以将文本文件分块并逐行读入以便Map节点进行处理。读入一行时,所产生的主键Key就是当前行在整个文本文件中的字节偏移位置,而value就是该行的内容,它是系统默认的输入格式,当用户程序不设置任何数据输入格式时,系统自动使用这个数据输入格式。
比如如下文件内容
hello tanggao
hello hadoop
第一行的偏移量为0
第二行偏移量为13

KeyValueTextInputFormat是另一个常用的数据输入格式,可将一个按照<key,value>格式逐行存放的文本文件逐行读出,并自动解析生成相应的key和value

比如
姓名    汤高
年龄    20
则解析出来的
第一行键Key为姓名  值value为汤高
第二行键key为年龄 值value为20
注意和TextInputFormat不同,TextInputFormat是偏移量做键,整行内容做值

对于一个数据输入格式,都需要一个对应的RecordReader。RecordReader。主要用于将一个文件中的数据记录分拆成具体的键值对,传送给Map过程作为键值对输入参数。每一个数据输入格式都有一个默认的RecordReader。TextInputFormat的默认RecordReader是LineRecordReader,而KeyValueTextInputFormat的默认RecordReader是KeyValueLineRecordReader
当然肯定还有很多数据输入格式和对应的默认RecordReader 这里就不接受了,有需要的可以去官网看看



数据输出格式(OutputFormat)用于描述MapReduce作业的数据输出规范。MapReduce框架依靠数据输出格式完成输出规范检查(蔽日检查输出目录是否存在),以及提供作业结果数据输出等功能

Hadoop提供了丰富的内置数据输出格式。最常用的数据输出格式是TextOutputFormat,也是系统默认的数据输出格式,可以将计算结果以 key+\t+value的形式逐行输出到文本文件中。

与数据输入格式中的RecordReader类似,数据输出格式也提供一个对应的RecordWriter,以便系统明确输出结果写入到文件中的具体格式。
TextOutputFormat的默认RecordWriter是LineRecordWriter,其实际操作是将结果数据以key+\t+value的形式输出到文本文件中。

当然同样肯定还有很多数据输出格式和对应的默认RecordWriter


对于自定义数据输入格式 可以参考已有的数据输入格式,继承自它即可,只要重写GetRecordReader方法得到一个自己写的RecordReader即可


我的是仿造KeyValueTextInputFormat和它的KeyValueLineRecordReader来自定义自己的输入格式的,所以我都是自己复制了上面两个类的源码然后进行自己的改写



package com.my.input;

import java.io.IOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.io.compress.SplittableCompressionCodec;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;


public class myInputFormat extends FileInputFormat<Text,Text> {
//用来压缩的
@Override
protected boolean isSplitable(JobContext context, Path file) {
final CompressionCodec codec =
new CompressionCodecFactory(context.getConfiguration()).getCodec(file);
if (null == codec) {
return true;
}
return codec instanceof SplittableCompressionCodec;
}

@Override
public RecordReader<Text, Text> createRecordReader(InputSplit genericSplit, TaskAttemptContext context)
throws IOException, InterruptedException {
context.setStatus(genericSplit.toString());
return new MyRecordReader(context.getConfiguration());
}

}


他的RecordReader

package com.my.input;

import java.io.IOException;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
@InterfaceAudience.Public
@InterfaceStability.Stable
public class MyRecordReader extends RecordReader<Text, Text> {

public static final String KEY_VALUE_SEPERATOR =
"mapreduce.input.mylinerecordreader.key.value.separator";

private final LineRecordReader lineRecordReader;
//源码是根据\t分割 我改为了我自己的需求为=号分割
private byte separator = (byte) '=';

private Text innerValue;

private Text key;

private Text value;

public Class<Text> getKeyClass() { return Text.class; }

public MyRecordReader(Configuration conf)
throws IOException {

lineRecordReader = new LineRecordReader();
String sepStr = conf.get(KEY_VALUE_SEPERATOR, "=");
this.separator = (byte) sepStr.charAt(0);
}

public void initialize(InputSplit genericSplit,
TaskAttemptContext context) throws IOException {
lineRecordReader.initialize(genericSplit, context);


}

public static int findSeparator(byte[] utf, int start, int length,
byte sep) {
for (int i = start; i < (start + length); i++) {
if (utf[i] == sep) {
return i;
}
}
return -1;
}

public static void setKeyValue(Text key, Text value, byte[] line,
int lineLen, int pos) {
if (pos == -1) {
key.set(line, 0, lineLen);
value.set("");
} else {
key.set(line, 0, pos);
value.set(line, pos + 1, lineLen - pos - 1);
}
}
/** Read key/value pair in a line. */
public synchronized boolean nextKeyValue()
throws IOException {
byte[] line = null;
int lineLen = -1;
if (lineRecordReader.nextKeyValue()) {
innerValue = lineRecordReader.getCurrentValue();
line = innerValue.getBytes();
lineLen = innerValue.getLength();
} else {
return false;
}
if (line == null)
return false;
if (key == null) {
key = new Text();
}
if (value == null) {
value = new Text();
}
int pos = findSeparator(line, 0, lineLen, this.separator);
setKeyValue(key, value, line, lineLen, pos);
return true;
}

public Text getCurrentKey() {
return key;
}

public Text getCurrentValue() {
return value;
}

public float getProgress() throws IOException {
return lineRecordReader.getProgress();
}

public synchronized void close() throws IOException {
lineRecordReader.close();
}
}


对于自定义数据输出格式 可以参考已有的数据输出格式,继承自它即可,只要重写GetRecordWriter方法得到一个自己写的RecordWriter即可
一种实现:我们一般继承自FileOutputFormat来改写

因为FileOutputFormat已经帮我们实现了许多通用的功能

package com.my.input;

import java.io.DataOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.ReflectionUtils;

public class MyOutputFormat<K, V> extends FileOutputFormat<K, V> {
public static String SEPERATOR = "mapreduce.output.textoutputformat.separator";

protected static class MyLineRecordWriter<K, V> extends RecordWriter<K, V> {
private static final String utf8 = "UTF-8";
private static final byte[] newline;

static {
try {
newline = "\n".getBytes(utf8);
} catch (UnsupportedEncodingException uee) {
throw new IllegalArgumentException("can't find " + utf8 + " encoding");
}
}

protected DataOutputStream out;
private final byte[] keyValueSeparator;

public MyLineRecordWriter(DataOutputStream out, String keyValueSeparator) {
this.out = out;
try {
this.keyValueSeparator = keyValueSeparator.getBytes(utf8);
} catch (UnsupportedEncodingException uee) {
throw new IllegalArgumentException("can't find " + utf8 + " encoding");
}
}
//改写了源码 把\t改为了=========>
public MyLineRecordWriter(DataOutputStream out) {
this(out, "=========>");
}

/**
* Write the object to the byte stream, handling Text as a special case.
*
* @param o
* the object to print
* @throws IOException
* if the write throws, we pass it on
*/
private void writeObject(Object o) throws IOException {
if (o instanceof Text) {
Text to = (Text) o;
out.write(to.getBytes(), 0, to.getLength());
} else {
out.write(o.toString().getBytes(utf8));
}
}

public synchronized void write(K key, V value) throws IOException {

boolean nullKey = key == null || key instanceof NullWritable;
boolean nullValue = value == null || value instanceof NullWritable;
if (nullKey && nullValue) {
return;
}
if (!nullKey) {
writeObject(key);
}
if (!(nullKey || nullValue)) {
out.write(keyValueSeparator);
}
if (!nullValue) {
writeObject(value);
}
out.write(newline);
}

public synchronized void close(TaskAttemptContext context) throws IOException {
out.close();
}
}

public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
Configuration conf = job.getConfiguration();
boolean isCompressed = getCompressOutput(job);
String keyValueSeparator = conf.get(SEPERATOR, "=========>");
CompressionCodec codec = null;
String extension = "";
if (isCompressed) {
Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job, GzipCodec.class);
codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);
extension = codec.getDefaultExtension();
}
Path file = getDefaultWorkFile(job, extension);
FileSystem fs = file.getFileSystem(conf);
if (!isCompressed) {
FSDataOutputStream fileOut = fs.create(file, false);
return new MyLineRecordWriter<K, V>(fileOut, keyValueSeparator);
} else {
FSDataOutputStream fileOut = fs.create(file, false);
return new MyLineRecordWriter<K, V>(new DataOutputStream(codec.createOutputStream(fileOut)),
keyValueSeparator);
}
}
}

第二种实现:FileOutputFormat的一个重要子类就是TextOutputFormat,我们也可以继承它,然后重写getRecordWriter方法即可


package com.my.input;
import java.io.DataOutputStream;
import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.ReflectionUtils;

public class MyOutputFormat2<K, V> extends TextOutputFormat<K, V> {

@Override
public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
Configuration conf = job.getConfiguration();
boolean isCompressed = getCompressOutput(job);
//改写了源码 把\t改为了=========>
String keyValueSeparator = conf.get(SEPERATOR, "=========>");
CompressionCodec codec = null;
String extension = "";
if (isCompressed) {
Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job, GzipCodec.class);
codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);
extension = codec.getDefaultExtension();
}
Path file = getDefaultWorkFile(job, extension);
FileSystem fs = file.getFileSystem(conf);
if (!isCompressed) {
FSDataOutputStream fileOut = fs.create(file, false);
return new LineRecordWriter<K, V>(fileOut, keyValueSeparator);
} else {
FSDataOutputStream fileOut = fs.create(file, false);
return new LineRecordWriter<K, V>(new DataOutputStream(codec.createOutputStream(fileOut)),
keyValueSeparator);
}
}
}

最后就可以用来测试了

干货--Hadoop自定义数据类型和自定义输入输出格式整合项目案例


测试代码

package com.my.input;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

import com.tg.type.Point3D;




public class Point3DDriver {

/**
*
* @author 汤高
* Point3D为自定义数据类型 把它作为map的输出类型
*
*/
// Map过程
static int count=0;
public static class MyMapper extends Mapper<Text, Text, Text, Point3D> {
/***
*
*/
@Override
protected void map(Text key, Text value, Mapper<Text, Text, Text, Point3D>.Context context)
throws IOException, InterruptedException {
count++;
//这里得到的键是自定义输入格式输出的内容 本例是 One 、two、three
//这里得到的值是X:1.0, Y:2.0, Z:3.0 等
//根据都好截取值里面的内容 分别设置到自定义数据类型Point3D里面去
String[] vs = value.toString().split(",");
Point3D p = new Point3D(Float.parseFloat(vs[0].split(":")[1]), Float.parseFloat(vs[1].split(":")[1]), Float.parseFloat(vs[2].split(":")[1]));
// 写出去 把自定义数据类型输出去

context.write(new Text(key), p);
System.out.println("几个map==========>"+count);
}
}
//Reduce过程
public static class MyReducer extends Reducer<Text, Point3D, Text, Point3D>{

protected void reduce(Text key, Point3D value,
Reducer<Text, Point3D, Text, Point3D>.Context context) throws IOException, InterruptedException {

context.write(key, value);
}

}

public static void main(String[] args) {

try {
Configuration conf = new Configuration();
String[] paths = new GenericOptionsParser(conf, args).getRemainingArgs();
if (paths.length < 2) {
throw new RuntimeException("usage <input> <output>");
}

Job job = Job.getInstance(conf, "Point3DDriver");
job.setJarByClass(Point3DDriver.class);

job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
job.setInputFormatClass(myInputFormat.class);

job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Point3D.class);
job.setOutputFormatClass(MyOutputFormat.class);
//job.setOutputFormatClass(MyOutputFormat2.class);
FileInputFormat.addInputPaths(job, paths[0]);
FileOutputFormat.setOutputPath(job, new Path(paths[1] + System.currentTimeMillis()));// 整合好结果后输出的位置
System.exit(job.waitForCompletion(true) ? 0 : 1);// 执行job


} catch (IOException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

}

对于编写Map函数和Reduce函数不熟悉的朋友,可以参看我上篇博客 里面讲解了如何实现MapReduce编程  

MapReduce工作原理详解   
结果:

干货--Hadoop自定义数据类型和自定义输入输出格式整合项目案例



码字不易,转载请指明出自  http://blog.csdn.net/tanggao1314/article/details/51305852