MapReduce自定义输入格式

时间:2021-02-23 18:23:31

输入数据如下,是一个Excel表,具体数据是一个月内上网产生的流量记录,我们要做的是统计24小时每一小时的总流量。

MapReduce自定义输入格式


统计结果如下:

MapReduce自定义输入格式


首先使用apache poi解析Excel表格(测试数据在这里下载),每一行数据以tab隔开组成一个字符串,解析完成后以数组形式

返回,具体实现如下:

ExcelDeal.java

import java.io.IOException;
import java.io.InputStream;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;

import org.apache.poi.hssf.usermodel.HSSFDateUtil;
import org.apache.poi.hssf.usermodel.HSSFRow;
import org.apache.poi.hssf.usermodel.HSSFSheet;
import org.apache.poi.hssf.usermodel.HSSFWorkbook;
import org.apache.poi.ss.usermodel.Cell;

public class ExcelDeal {

public static String[] readExcel(InputStream is) throws IOException {

@SuppressWarnings("resource")
HSSFWorkbook hssfWorkbook = new HSSFWorkbook(is);

List<String> list = new ArrayList<String>();

for (int numSheet = 0; numSheet < hssfWorkbook.getNumberOfSheets(); numSheet++) {
HSSFSheet hssfSheet = hssfWorkbook.getSheetAt(numSheet);
if (hssfSheet == null) {
continue;
}
for (int rowNum = 1; rowNum <= hssfSheet.getLastRowNum(); rowNum++) {
String templine = new String();
HSSFRow hssfRow = hssfSheet.getRow(rowNum);
if (hssfRow == null) {
continue;
}
//遍历每一行数据
for (Cell cell : hssfRow) {
templine += getValue(cell)+"\t";
}
list.add(templine);
}
}
return list.toArray(new String[0]);
}

//根据数据类型获取每个表格中的数据
private static String getValue(Cell cell) {
if (cell.getCellType() == Cell.CELL_TYPE_BOOLEAN) {
return String.valueOf(cell.getBooleanCellValue());
} else if (cell.getCellType() == Cell.CELL_TYPE_NUMERIC) {
if (HSSFDateUtil.isCellDateFormatted(cell)) {
SimpleDateFormat sdf = null;
sdf = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
Date date = cell.getDateCellValue();
return sdf.format(date);
}
return String.valueOf(cell.getNumericCellValue());
} else {
return String.valueOf(cell.getStringCellValue());
}
}
}
编写自定义输入格式类,每个自定义输入格式类都需要继承FileInputFormat抽象类并实现抽象方法creatRecordReader,而

FileInputFormat继承自InputFormat,InputFormat类源码如下:

public abstract class InputFormat<K, V> {

public abstract List<InputSplit> getSplits(JobContext context
) throws IOException, InterruptedException;

public abstract RecordReader<K,V> createRecordReader(InputSplit split,
TaskAttemptContext context) throws IOException, InterruptedException;
}

由于creatRecordReader方法返回的是一个RecordReader的实例,所以我们需要编写一个RecordReader类型的类,

RecordReader类源码如下:

public abstract class RecordReader<KEYIN, VALUEIN> implements Closeable {

public abstract void initialize(InputSplit split,
) throws IOException, InterruptedException;

public abstract boolean nextKeyValue() throws IOException, InterruptedException;

public abstract KEYIN getCurrentKey() throws IOException, InterruptedException;

public abstract VALUEIN getCurrentValue() throws IOException, InterruptedException;

public abstract float getProgress() throws IOException, InterruptedException;

public abstract void close() throws IOException;
}
因此我们的输入格式类ExcelInputFormat.java如下:

import java.io.IOException;
import java.io.InputStream;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

public class ExcelInputFormat extends FileInputFormat<LongWritable, Text>{

@Override
public RecordReader<LongWritable, Text> createRecordReader(InputSplit arg0, TaskAttemptContext arg1)
throws IOException, InterruptedException {
return new ExcelRecordReader();
}

public class ExcelRecordReader extends RecordReader<LongWritable, Text>{
private LongWritable key = new LongWritable(-1);
private Text value = new Text();
private InputStream inputStream;//文件输入流
private String[] strArray;//解析结果数组

@Override
public void close() throws IOException {
if(inputStream!=null){
inputStream.close();
}
}

@Override
public LongWritable getCurrentKey() throws IOException, InterruptedException {
return key;
}

@Override
public Text getCurrentValue() throws IOException, InterruptedException {
return value;
}

@Override
public float getProgress() throws IOException, InterruptedException {
return 0;
}

@Override
public void initialize(InputSplit arg0, TaskAttemptContext arg1) throws IOException, InterruptedException {

FileSplit split = (FileSplit) arg0;

Configuration job = arg1.getConfiguration();

Path filePath = split.getPath();

FileSystem fileSystem = filePath.getFileSystem(job);

inputStream = fileSystem.open(split.getPath());

strArray = ExcelDeal.readExcel(inputStream);
}

@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
int next = (int) key.get() + 1;
if(next<strArray.length&&strArray[next]!=null){
key.set(next);
value.set(strArray[next]);
return true;
}
return false;
}
}
}
编写完Excel解析类与输入格式类后,我们来完成下MR类,具体实现如下:
import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class FlowCount {

public static class FlowCountMapper extends Mapper<LongWritable, Text, Text, LongWritable>{
public void map(LongWritable key,Text value, Context context) throws IOException, InterruptedException{

String line = value.toString();
String[] records = line.split("\t");
String month = records[1].substring(11, 13);
long flow = Long.parseLong(records[3]);

context.write(new Text(month), new LongWritable(flow));
}
}

public static class FlowCountReducer extends Reducer<Text, LongWritable, Text, LongWritable>{
public void reduce(Text key,Iterable<LongWritable> value,Context context) throws IOException, InterruptedException{
long sum = 0;
for (LongWritable longWritable : value) {
sum += longWritable.get();
}
context.write(key, new LongWritable(sum));
}
}

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

Configuration conf = new Configuration();

@SuppressWarnings("deprecation")
Job job = new Job(conf,"FlowCount");
job.setJarByClass(FlowCount.class);

job.setMapperClass(FlowCountMapper.class);
job.setReducerClass(FlowCountReducer.class);

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);

job.setInputFormatClass(ExcelInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);

FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

System.exit(job.waitForCompletion(true)?0:1);
}
}


Hadoop API:http://hadoop.apache.org/docs/current/api/allclasses-noframe.html

学之,以记之。