了解Hadoop数据类型,输入输出格式及用户如何自定义。

时间:2021-06-08 20:57:29

一:Hadoop内置的数据类型。

    Hadoop提供如下内置的数据类型,这些数据类型都实现了WritableComparable接口,以便用这些类型定义的数据可以被序列化进行网络传输和文件存储,以及进行大小比较。

BooleanWritable 标准布尔型数值
ByteWritable 单字节数值
DoubleWritable 双字节数
FloatWritable 浮点数
IntWritable 整型数
LongWritable 长整型数
Text 使用UTF-8格式存储的文本
NullWritable 当<key,value>中的key或value为空时使用
//简单知道这些类型
IntWritable iw = new IntWritable(1);
System.out.println( iw.get() ); // 1

BooleanWritable bw = new BooleanWritable(true);
System.out.println( bw.get() ); // true

二:Hadoop-用户自定义的数据类型。

    自定义数据类型时,需满足两个基本要求,即

        1.实现Writable接口,以便该数据能被序列化后完成网络传输或文件输入/输出。

        2.如果该数据需要作为主键key使用,或需要比较数值大小时,则需要实现WritableComparable接口。

//Hadoop2.6.4版 - Writable源码:
public interface Writable {

void write(DataOutput out) throws IOException;

void readFields(DataInput in) throws IOException;

}
public interface WritableComparable<T> extends Writable, Comparable<T> {}

三:Hadoop内置的数据输入格式和RecordReader。

    数据输入格式(InputFormat)用于描述MapReduce作业的数据输入规范。MapReduce框架依靠数据输入格式完成输入规范检查、对数据文件进行输入分块(InputSplit),以及提供从输入分块中将数据记录逐一读出、并转换为Map过程的输入键值对等功能。

    Hadoop提供了丰富的内置数据输入格式,最常用的数据输入格式包括:TextInputFormat 和 KeyValueInputFormat。

    TextInputFormat是系统默认的数据输入格式,可以将文本文件分块并逐行读入以便Map节点进行处理。读入一行时,所产生的主键key就是当前行在整个文本文件中的字节偏移位置,而value就是该行的内容。

//TextInputFormat部分源码:
public class TextInputFormat extends FileInputFormat<LongWritable, Text> {

@Override
public RecordReader<LongWritable, Text>
createRecordReader(InputSplit split,
TaskAttemptContext context) {
String delimiter = context.getConfiguration().get(
"textinputformat.record.delimiter");
byte[] recordDelimiterBytes = null;
if (null != delimiter)
recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);
return new LineRecordReader(recordDelimiterBytes);
}

//....
}

    KeyValueTextInputFormat是另一个常用的数据输入格式,可将一个按照<key,value>格式逐行存放的文本文件逐行读出,并自动解析生成相应的key和value。

//KeyValueTextInputFormat部分源码:
public class KeyValueTextInputFormat extends FileInputFormat<Text, Text> {

// ...

public RecordReader<Text, Text> createRecordReader(InputSplit genericSplit,
TaskAttemptContext context) throws IOException {

context.setStatus(genericSplit.toString());
return new KeyValueLineRecordReader(context.getConfiguration());
}

}

    RecordReader对于一个数据输入格式,都需要有一个对应的RecordReader,主要用于将一个文件中的数据记录拆分成具体的键值对。TextInputFormat的默认RecordReader是LineRecordReader,而KeyValueTextInputFormat的默认RecordReader是KeyValueLineRecordReader。

四:Hadoop内置的数据输出格式与RecordWriter。

    数据输出格式(OutputFormat)用于描述MapReduce作业的数据输出规范。MapReduce框架依靠数据输出格式完成输出规范检查以及提供作业结果数据输出功能。

    同样,最常用的数据输出格式是TextOutputFormat,也是系统默认的数据输出格式,可以将计算结果以 “key + \t + vaue”的形式逐行输出到文本文件中。

    与数据输入格式类似样,数据输出格式也提供一个对应的RecordWriter,以便系统明确输出结果写入到文件中的具体格式。TextInputFormat的默认RecordWriter是LineRecordWriter,其实际操作是将结果数据以“key + \t + value”的形式输出到文本文件中。

//TextOutputFormat的部分源码:
public class TextOutputFormat<K, V> extends FileOutputFormat<K, V> {

protected static class LineRecordWriter<K, V> extends RecordWriter<K, V> {
// ...

public LineRecordWriter(DataOutputStream out, String keyValueSeparator) {
//...
}

public LineRecordWriter(DataOutputStream out) {
this(out, "\t");
}

private void writeObject(Object o) throws IOException {
// ...
}

public synchronized void write(K key, V value) throws IOException {
//...
out.write(newline);
}

}

public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job
) throws IOException, InterruptedException {
// ...
}
}

五:通过打印输出UserInfo小例子来实现简单的用户自定义数据类型,数据输入格式,数据输出格式。 (简单的说就是模仿源码,基本上没多大变化)。

        以下附上案例源码:

1.定义自己的UserInfo,作为数据类型。

package com.hadoop.mapreduce.test4.outputformat;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;

public class UserInfo implements WritableComparable<UserInfo> {

private int id;
private String name;
private int age;
private String sex;
private String address;

public UserInfo() {
}
public UserInfo(int id, String name, int age, String sex, String address) {
this.id = id;
this.name = name;
this.age = age;
this.sex = sex;
this.address = address;
}

// JavaBean 普通的get set方法....

@Override
public void readFields(DataInput in) throws IOException {
this.id = in.readInt();
this.name = in.readUTF();
this.age = in.readInt();
this.sex = in.readUTF();
this.address = in.readUTF();
}

@Override
public void write(DataOutput out) throws IOException {
out.writeInt(id);
out.writeUTF(name);
out.writeInt(age);
out.writeUTF(sex);
out.writeUTF(address);
}

@Override
public String toString() {
return "Id:" + id + ", Name:" + name + ", Age:" + age + ", Sex:" + sex + ", Address:" + address ;
}

@Override
public int compareTo(UserInfo userInfo) {
return 0;
}
}

2.定制自己的数据输入格式:UserInfoTextInputFormat。

package com.hadoop.mapreduce.test4.outputformat;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

public class UserInfoTextInputFormat extends FileInputFormat<Text, UserInfo> {
@Override
public RecordReader<Text, UserInfo> createRecordReader(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
context.setStatus(split.toString());
UserInfoRecordReader userInforRecordReader = new UserInfoRecordReader(context.getConfiguration() );
return userInforRecordReader;
}
}

3.定制自己的RecordReader:UserInfoRecordReader。

package com.hadoop.mapreduce.test4.outputformat;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;

public class UserInfoRecordReader extends RecordReader<Text, UserInfo> {
public static final String KEY_VALUE_SEPERATOR =
"mapreduce.input.keyvaluelinerecordreader.key.value.separator";

private final LineRecordReader lineRecordReader;

private byte separator = (byte) '\t';

private Text innerValue;
private Text key;

private UserInfo value;


public Class getKeyClass() {
return Text.class;
}

public UserInfoRecordReader(Configuration conf)throws IOException {
lineRecordReader = new LineRecordReader();
String sepStr = conf.get(KEY_VALUE_SEPERATOR,"\t");
this.separator = (byte) sepStr.charAt(0);
}

public void initialize(InputSplit genericSplit,TaskAttemptContext context) throws IOException {
lineRecordReader.initialize(genericSplit, context);
}

public static int findSeparator(byte[] utf, int start, int length, byte sep) {
for (int i = start; i < (start + length); i++) {
if (utf[i] == sep) {
return i;
}
}
return -1; //将这个截取标识符的位置给返回回去。
}

public static void setKeyValue(Text key, UserInfo value, byte[] line,int lineLen, int pos) {
if (pos == -1) {
key.set(line, 0, lineLen);
value.setId(0);
value.setName("");
value.setAge(0);
value.setSex("");
value.setAddress("");
} else {
key.set(line, 0, pos); //设置键 从 第 0位置 到 截取标识符的位置
Text text = new Text();
text.set(line, pos + 1, lineLen - pos - 1);
System.out.println("text的值: "+text);
String[] str = text.toString().split(",");
for (int i=0;i<str.length;i++) {
//System.out.println("根据逗号分隔开来的值: " + str[i] );
String[] strKeyValue = str[i].split(":");
//System.out.println("strKeyValue的Key-Value:" + key+"--->"+value);
if("ID".equals(strKeyValue[0])){
value.setId(Integer.parseInt( strKeyValue[1]) );
}else if("Name".equals(strKeyValue[0])){
value.setName( strKeyValue[1]);
}else if("Age".equals(strKeyValue[0])){
value.setAge(Integer.parseInt( strKeyValue[1]) );
}else if("Sex".equals(strKeyValue[0])){
value.setSex(strKeyValue[1] );
}else if("Address".equals(strKeyValue[0])){
value.setAddress(strKeyValue[1] );
}
}
// System.out.println( "key--> " + key);
// System.out.println( "value--> "+value +"\n\n");
}
}

public synchronized boolean nextKeyValue()throws IOException {
byte[] line = null;
int lineLen = -1;
if (key == null) {
key = new Text();
}
if (value == null) {
value = new UserInfo();
}
if (lineRecordReader.nextKeyValue()) {
innerValue = lineRecordReader.getCurrentValue();
line = innerValue.getBytes();
lineLen = innerValue.getLength();
} else {
return false;
}
if (line == null){
return false;
}

int pos = findSeparator(line, 0, lineLen, this.separator);
setKeyValue(key, value, line, lineLen, pos);
return true;
}

public Text getCurrentKey() {
return key;
}
public UserInfo getCurrentValue() {
return value;
}

public float getProgress() throws IOException {
return lineRecordReader.getProgress();
}

public synchronized void close() throws IOException {
lineRecordReader.close();
}

}

3.定制自己的输出格式:UserInfoTextOutputFormat。

 

package com.hadoop.mapreduce.test4.outputformat;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.ReflectionUtils;

public class UserInfoTextOutputFormat<K, V> extends FileOutputFormat<K, V> {
public static String SEPERATOR = "mapreduce.output.textoutputformat.separator";
protected static class LineRecordWriter<K, V> extends RecordWriter<K, V> {
private static final String utf8 = "UTF-8";
private static final byte[] newline;
static {
try {
newline = "\n".getBytes(utf8);
//System.out.println( "newline --> " + newline);
} catch (UnsupportedEncodingException uee) {
throw new IllegalArgumentException("can't find " + utf8 + " encoding");
}
}

protected DataOutputStream out;
private final byte[] keyValueSeparator;

public LineRecordWriter(DataOutputStream out, String keyValueSeparator) {
this.out = out;
try {
this.keyValueSeparator = keyValueSeparator.getBytes(utf8);
} catch (UnsupportedEncodingException uee) {
throw new IllegalArgumentException("can't find " + utf8 + " encoding");
}
}

public LineRecordWriter(DataOutputStream out) {
this(out, "\t");
}

private void writeObject(Object o) throws IOException {
if (o instanceof Text) {
Text to = (Text) o;
System.out.println( "o instanceof Text --> True : "+ to.toString() );
out.write(to.getBytes(), 0, to.getLength());
} else {
out.write(o.toString().getBytes(utf8));
System.out.println( "o instanceof Text --> false : "+ o.toString() );
}
}

public synchronized void write(K key, V value) throws IOException {
boolean nullKey = key == null || key instanceof NullWritable;
boolean nullValue = value == null || value instanceof NullWritable;
System.out.println( "nullKey--> "+nullKey +" , nullValue--> "+nullValue);
if (nullKey && nullValue) {
return;
}
System.out.println( " nullkey --> "+ nullKey + ", !nullkey -->"+nullKey);
if (!nullKey) {
writeObject(key);
}
System.out.println( "(nullKey || nullValue) --> " + (nullKey || nullValue) );
if (!(nullKey || nullValue)) {
out.write(keyValueSeparator);
}
if (!nullValue) {
writeObject(value);
}
out.write(newline);
}

public synchronized void close(TaskAttemptContext context) throws IOException {
out.close();
}
}

public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
Configuration conf = job.getConfiguration();
boolean isCompressed = getCompressOutput(job);
String keyValueSeparator= conf.get(SEPERATOR, "---->");
System.out.println( "keyValueSeparator---> "+keyValueSeparator);
CompressionCodec codec = null;
String extension = "";
if (isCompressed) {
Class<? extends CompressionCodec> codecClass =
getOutputCompressorClass(job, GzipCodec.class);
codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);
extension = codec.getDefaultExtension();
}
Path file = getDefaultWorkFile(job, extension);
System.out.println( "file --> Path : "+ file );
FileSystem fs = file.getFileSystem(conf);

if (!isCompressed) {
FSDataOutputStream fileOut = fs.create(file, false);
System.out.println( "if---isCompressed-->: "+fileOut);
return new LineRecordWriter<K, V>(fileOut, keyValueSeparator);
} else {
FSDataOutputStream fileOut = fs.create(file, false);
System.out.println( "else---isCompressed-->: "+fileOut);
return new LineRecordWriter<K, V>(new DataOutputStream(codec.createOutputStream(fileOut)),keyValueSeparator);
}
}
}

5.测试类:PrintUserInfo

package com.hadoop.mapreduce.test4.outputformat;

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class PrintUserInfo {
public static final IntWritable ONE = new IntWritable(1);
public static class UserInfoMapper extends Mapper<Text, UserInfo, Text, UserInfo>{
@Override
protected void map(Text key, UserInfo value, Mapper<Text, UserInfo, Text, UserInfo>.Context context)
throws IOException, InterruptedException {
super.map(key, value, context);
}
}

public static void main(String[] args) {
try {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "UserInfo");

job.setJarByClass(PrintUserInfo.class);
job.setMapperClass(UserInfoMapper.class);

//定制输入格式:
job.setInputFormatClass(UserInfoTextInputFormat.class);
//定制输出格式:
job.setOutputFormatClass(UserInfoTextOutputFormat.class);

job.setMapOutputKeyClass(Text.class);
//用的自己定义的数据类型
job.setMapOutputValueClass(UserInfo.class);

FileInputFormat.addInputPath(job, new Path("hdfs://192.168.226.129:9000/rootdir/mapuserinfo.txt"));
FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.226.129:9000/rootdir/data/output7/"+System.currentTimeMillis()+"/"));
System.exit(job.waitForCompletion(true) ? 0 : 1);//执行job

} catch (IOException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

6.输出结果:

1.数据文件:

1	ID:221,Name:xj,Age:22,Sex:man,Address:hunan,
2 ID:222,Name:cc,Age:21,Sex:Woman,Address:miluo,

2.结果文件:

1---->Id:221, Name:xj, Age:22, Sex:man, Address:hunan
2---->Id:222, Name:cc, Age:21, Sex:Woman, Address:miluo