0. 说明
测试序列文件的读写操作 && 测试序列文件的排序操作 && 测试序列文件的合并操作 && 测试序列文件的压缩方式 && 测试将日志文件转换成序列文件
作为 Hadoop 序列文件 中的 SequenceFile 的基本操作 部分的补充存在
1. 测试读写 && 压缩
package hadoop.sequencefile; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.junit.Test; import java.io.IOException; /**
* 测试序列文件
*/
public class TestSeqFile { /**
* 测试序列文件写操作
*/
@Test
public void testWriteSeq() throws Exception { Configuration conf = new Configuration(); // 设置文件系统为本地模式
conf.set("fs.defaultFS", "file:///"); FileSystem fs = FileSystem.get(conf); // Path path = new Path("E:/test/none.seq");
// Path path = new Path("E:/test/record.seq");
Path path = new Path("E:/test/block.seq");
// 不压缩
// SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, path, IntWritable.class, Text.class,SequenceFile.CompressionType.NONE);
// 记录压缩
// SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, path, IntWritable.class, Text.class,SequenceFile.CompressionType.RECORD);
// 块压缩
SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, path, IntWritable.class, Text.class, SequenceFile.CompressionType.BLOCK); for (int i = 1; i <= 1000; i++) {
IntWritable key = new IntWritable(i);
Text value = new Text("helloworld" + i); writer.append(key, value); } writer.close();
} /**
* 测试序列文件读操作
*/
@Test
public void testReadSeq() throws Exception {
Configuration conf = new Configuration(); // 设置文件系统为本地模式
conf.set("fs.defaultFS", "file:///"); FileSystem fs = FileSystem.get(conf); Path path = new Path("E:/test/block.seq"); SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf); //初始化两个 Writable 对象
IntWritable key = new IntWritable();
Text value = new Text(); while ((reader.next(key, value))) {
long position = reader.getPosition();
System.out.println("key: " + key.get() + " , " + " val: " + value.toString() + " , " + " pos: " + position);
}
} }
2. 测试排序
package hadoop.sequencefile; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.junit.Test; import java.util.Random; /**
* 测试排序
*/
public class TestSeqFileSort { /**
* 创建无序 key-value 文件
*/
@Test
public void testWriteRandom() throws Exception { Configuration conf = new Configuration(); conf.set("fs.defaultFS", "file:///"); FileSystem fs = FileSystem.get(conf); Path p = new Path("E:/test/random.seq"); SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, p, IntWritable.class, Text.class, SequenceFile.CompressionType.RECORD); // 初始化 random
Random r = new Random(); for (int i = 1; i < 100000; i++) {
// 在0-99999之中随机选取一个值
int j = r.nextInt(100000);
IntWritable key = new IntWritable(j);
Text value = new Text("helloworld" + j); writer.append(key, value); } writer.close(); } /**
* 测试seqFile排序
*/
@Test
public void testSort() throws Exception { Configuration conf = new Configuration(); conf.set("fs.defaultFS", "file:///"); FileSystem fs = FileSystem.get(conf); Path pin = new Path("E:/test/random.seq");
Path pout = new Path("E:/test/sort.seq"); SequenceFile.Sorter sorter = new SequenceFile.Sorter(fs, IntWritable.class, Text.class, conf); sorter.sort(pin, pout);
} /**
* 测试序列文件读操作
*/
@Test
public void testReadSeq() throws Exception {
Configuration conf = new Configuration(); // 设置文件系统为本地模式
conf.set("fs.defaultFS", "file:///"); FileSystem fs = FileSystem.get(conf); Path path = new Path("E:/test/sort.seq"); SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf); //初始化两个 Writable 对象
IntWritable key = new IntWritable();
Text value = new Text(); while ((reader.next(key, value))) {
long position = reader.getPosition();
System.out.println("key: " + key.get() + " , " + " val: " + value.toString() + " , " + " pos: " + position);
}
} }
3. 测试合并
package hadoop.sequencefile; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.junit.Test; /**
* 测试文件合并,必须是同一种压缩类型
*/
public class TestSeqFileMerge {
/**
* 测试序列文件写操作
* 创建两个文件,范围为1-100,100-200
*/
@Test
public void testWriteSeq() throws Exception { Configuration conf = new Configuration(); // 设置文件系统为本地模式
conf.set("fs.defaultFS", "file:///"); FileSystem fs = FileSystem.get(conf); // Path path = new Path("E:/test/block1.seq");
Path path = new Path("E:/test/block2.seq"); // 块压缩
SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, path, IntWritable.class, Text.class, SequenceFile.CompressionType.BLOCK); // for (int i = 1; i <= 100; i++) {
for (int i = 101; i <= 200; i++) {
IntWritable key = new IntWritable(i);
Text value = new Text("helloworld" + i); writer.append(key, value); } writer.close();
} /**
* 测试文件合并,合并的同时排序
*/
@Test
public void testMerge() throws Exception {
Configuration conf = new Configuration(); conf.set("fs.defaultFS", "file:///"); FileSystem fs = FileSystem.get(conf); Path pin1 = new Path("E:/test/block1.seq");
Path pin2 = new Path("E:/test/block2.seq");
Path pout = new Path("E:/test/merge.seq"); SequenceFile.Sorter sorter = new SequenceFile.Sorter(fs, IntWritable.class, Text.class, conf); Path[] p = {pin1, pin2}; sorter.merge(p, pout);
} /**
* 测试序列文件读操作
*/
@Test
public void testReadSeq() throws Exception {
Configuration conf = new Configuration(); // 设置文件系统为本地模式
conf.set("fs.defaultFS", "file:///"); FileSystem fs = FileSystem.get(conf); Path path = new Path("E:/test/merge.seq"); SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf); //初始化两个 Writable 对象
IntWritable key = new IntWritable();
Text value = new Text(); while ((reader.next(key, value))) {
long position = reader.getPosition();
System.out.println("key: " + key.get() + " , " + " val: " + value.toString() + " , " + " pos: " + position);
}
} }
4. 测试将日志文件转换成序列文件
package hadoop.sequencefile; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text; import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException; /**
* 测试将日志文件转换成序列文件
* Windows 下查看压缩后的 SequenceFile :
* hdfs dfs -text file:///E:/test/access.seq
*/
public class Log2Seq {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration(); // 设置文件系统为本地模式
conf.set("fs.defaultFS", "file:///"); FileSystem fs = FileSystem.get(conf); Path path = new Path("E:/test/access.seq"); // 不压缩
// SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, path, IntWritable.class, Text.class,SequenceFile.CompressionType.NONE);
// 记录压缩
// SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, path, IntWritable.class, Text.class,SequenceFile.CompressionType.RECORD);
// 块压缩
SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, path, NullWritable.class, Text.class, SequenceFile.CompressionType.BLOCK); BufferedReader br = new BufferedReader(new FileReader("E:/file/access.log1")); String line = null;
while ((line = br.readLine()) != null) {
NullWritable key = NullWritable.get();
Text value = new Text(line);
writer.append(key, value);
} writer.close();
}
}