Hadoop 的序列化

时间:2023-03-09 07:52:35
Hadoop 的序列化

1. 序列化

  1.1 序列化与反序列化的概念

    序列化:是指将结构化对象转化成字节流在网上传输或写到磁盘进行永久存储的过程

    反序列化:是指将字节流转回结构化对象的逆过程

  1.2 序列化的应用

    序列化用于分布式数据处理的两大领域

    1. 进程间通信
    2. 永久存储

  1.3 序列化的格式要求

    1. 紧凑:体积小,节省带宽
    2. 快速:序列化过程快速
    3. 可扩展:新 API 支持旧数据格式
    4. 支持互操作:跨语言

2. Writable 接口

  2.1 说明

    Hadoop 使用的序列化格式为 Writeable

    Writeable 接口定义了两个方法

    1. write           将对象写入 DataOutput 二进制流
    2. readFields  从 DataInput 二进制流读取对象

  2.2 Writeable接口实现的类

    Writeable 接口实现类包含以下

    1. int 对应的 Writeable 为 IntWriteable
    2. Long 对应的 Writeable 为 LongWriteable
    3. String 对应的 Writeable 为 Text

   以 IntWritable 为例,在阅读源码之后发现,可以直接通过 new 的方式直接带参创建实例化对象,也可以调用空参构造创建实例化对象之后通过 set 方法赋值。

  2.3 IntWritable 案例

  使用 IntWritable 实现 Hadoop 的序列化与反序列化

 import org.apache.hadoop.io.IntWritable;
import org.junit.Test; import java.io.*; /**
* @user: share
* @date: 2018/7/28
* @description: 测试Hadoop的序列化与反序列化
*/
public class TestHadoopSerial { /**
* 单元测试Hadoop的序列化
* @throws IOException
*/
@Test
public void testSerial() throws IOException {
//创建IntWritable对象
IntWritable iw = new IntWritable(66);
//创建输出流对象
DataOutputStream dos = new DataOutputStream(new FileOutputStream("e:/e/haddop.h"));
//iw将值写入输出流dos
iw.write(dos);
//关闭输出流
dos.close();
} /**
* 单元测试Hadoop的反序列化
* @throws IOException
*/
@Test
public void testDeserial() throws IOException {
//创建输入流对象
DataInputStream dis = new DataInputStream(new FileInputStream("e:/e/haddop.h"));
//创建IntWritable对象
IntWritable iw = new IntWritable();
//iw读取输入流dis的值
iw.readFields(dis);
//得到iw中的值
int i = iw.get();
//输出i
System.out.println(i);
//关闭输入流
dis.close();
}
}

  2.4 自定义 PersonWriteable

  【自定义 Person 类】

 import java.io.Serializable;

 /**
* @user: share
* @date: 2018/7/28
* @description: 自定义Person类
*/
public class Person implements Serializable { private String name;
private int age; public Person() {
} public Person(String name, int age) {
this.name = name;
this.age = age;
} public String getName() {
return name;
} public void setName(String name) {
this.name = name;
} public int getAge() {
return age;
} public void setAge(int age) {
this.age = age;
} @Override
public String toString() {
return "Person{" +
"name='" + name + '\'' +
", age=" + age +
'}';
}
}

  【自定义 PersonWriteable】

 import org.apache.hadoop.io.Writable;

 import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException; /**
* @user: share
* @date: 2018/7/28
* @description: 自定义PersonWriteable实现Person的序列化与反序列化
*/
public class PersonWriteable implements Writable {
//定义person
private Person person; //设置get方法
public Person getPerson() {
return person;
}
//设置set方法
public void setPerson(Person person) {
this.person = person;
} /**
* 重写序列化方法
* @param out
* @throws IOException
*/
public void write(DataOutput out) throws IOException {
//序列化name字段
out.writeUTF(person.getName());
//序列化age字段
out.writeInt(person.getAge());
} /**
* 重写反序列化方法
* @param in
* @throws IOException
*/
public void readFields(DataInput in) throws IOException {
//初始化person
person = new Person();
//反序列化name字段
person.setName(in.readUTF());
//反序列化age字段
person.setAge(in.readInt());
}
}

  【Person 的序列化测试类】

 import org.junit.Test;

 import java.io.*;

 /**
* @user: share
* @date: 2018/7/28
* @description: 测试Person的序列化与反序列化
*/
public class TestPersonSerial {
/**
* 单元测试Person的序列化
* @throws IOException
*/
@Test
public void testPersonSerial() throws IOException {
//新建Person对象
Person p = new Person("sam", 20);
//创建PersonWriteable对象
PersonWriteable pw = new PersonWriteable();
//调用set方法赋值
pw.setPerson(p);
//创建输出流对象
DataOutputStream dos = new DataOutputStream(new FileOutputStream("e:/e/person.j"));
//pw将值写入输出流dos
pw.write(dos);
//关闭输出流
dos.close();
} /**
* 单元测试Person的反序列化
* @throws IOException
*/
@Test
public void testPersonDeserial() throws IOException {
//创建PersonWriteable对象
PersonWriteable pw = new PersonWriteable();
//创建输出流对象
DataInputStream dis = new DataInputStream(new FileInputStream("e:/e/person.j"));
//读取输入流中的对象
pw.readFields(dis);
//得到Person对象
Person p = pw.getPerson();
//输出Person
System.out.println(p.toString());
//关闭输入流
dis.close();
}
}