Hadoop中基于文件的数据格式(1)SequenceFile

时间:2022-12-07 12:39:56

1 概述

1 SequenceFile是Hadoop为例存储二进制的<k,v>而设计的一种平面文件。
2 SequenceFile的key和value是writable或者writable子类。
3 SequenceFile的存储不按照key排序,内部类Writer提供了append方法。
4 SequenceFile作为一个容器,可以将小文件打包到SequenceFile,高效对小文件进行存储和处理。


2 压缩类型

根据CompressionType的不同,有如下压缩类型
NONE:不压缩。每个记录有key长度、value长度、key、value组成,长度字段分别为4字节。
RECORD: 记录压缩。结构与NONE非常类似,用定义在头部的编码器压缩value,key不压缩。
BLOCK:块压缩。一次压缩多条记录,当记录字节数达到一个阈值则天际到块,io.seqfile.compress.blocksize控制。格式为:记录数,键长度,键,值长度,值。

分别对应Writer:

Writer : Uncompressed records
RecordCompressWriter : Record-compressed files, only compress values
BlockCompressWriter : Block-compressed files, both keys & values are collected in 'blocks' separately and compressed. The size of the 'block' is configurable


3 特点

优点

1 支持基于record和block的压缩

2 支持splittable,能够为Mapreduce作为输入分片

3 修改简单,只需要按照业务逻辑修改,不要考虑具体存储格式

缺点

合并后的文件不易查看。hadoop fs -cat看不到,hadoop fs -text可以看到。


4 代码

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.ReflectionUtils;

public class SequenceTest {

public static final String Output_path = "hdfs://192.x.x.x:9000/a.txt";
private static final String[] DATA = { "a", "b", "c", };

@SuppressWarnings("deprecation")
public static void write(String pathStr) throws IOException {
Configuration conf = new Configuration();
Path path = new Path(pathStr);
FileSystem fs = path.getFileSystem(conf);

SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, path,
Text.class, IntWritable.class);
Text key = new Text();
IntWritable value = new IntWritable();
for (int i = 0; i < DATA.length; i++) {
key.set(DATA[i]);
value.set(i);
System.out.printf("[%s]\t%s\t%s\n", writer.getLength(), key, value);
writer.append(key, value);
}
IOUtils.closeStream(writer);
}

@SuppressWarnings("deprecation")
public static void read(String pathStr) throws IOException {
Configuration conf = new Configuration();
Path path = new Path(pathStr);
FileSystem fs = path.getFileSystem(conf);
SequenceFile.Reader reader = new SequenceFile.Reader(fs, new Path(
pathStr), conf);

Writable key = (Writable) ReflectionUtils.newInstance(
reader.getKeyClass(), conf);
Writable value = (Writable) ReflectionUtils.newInstance(
reader.getValueClass(), conf);

while (reader.next(key, value)) {
System.out.printf("%s\t%s\n", key, value);
}
IOUtils.closeStream(reader);
}

public static void main(String[] args) throws IOException {
write(Output_path);
read(Output_path);
}
}