Hadoop 1.x 文件中查找TopKey示例

时间:2022-09-24 23:42:05

首先给出示例文件:

a	1
b 2
c 3
d 1
e 4
f 4
adada 5
afaa 6
fds 8
bnn 10
akal 100


1、某个文件中某列数据出现最大的数据

    输出包括key和对应的value

package org.dragon.hadoop.app.topkey;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
* 某个文件中某列数据出现的最大值
* @author Administrator
*
*/
public class TopKMapReduce {

static class TopKMapper extends Mapper<LongWritable, Text, Text, LongWritable>{

//存储最大的值,初始化为最小值
long topkv = Long.MIN_VALUE;

//Map输出的key
private Text mapOutputKey = new Text();
//Map输出的value
private LongWritable mapOutputValue = new LongWritable();

protected void setup(Context context) throws java.io.IOException ,InterruptedException {
}


//针对每行进行map操作
protected void map(
LongWritable key,
Text value,
Context context)
throws java.io.IOException, InterruptedException {
//获取value值
String lineValue = value.toString();
//制表符分割
String[] strs = lineValue.split("\t");
long tempValue = Long.valueOf(strs[1]);
//对比
if(topkv < tempValue){
topkv = tempValue;
mapOutputKey.set(strs[0]); //设置Key

}

}

//map任务执行完后调用
protected void cleanup(Context context) throws java.io.IOException ,InterruptedException {
mapOutputValue.set(topkv);
context.write(mapOutputKey, mapOutputValue);
}
}

//Driver
public int run(String[] args)throws Exception{
//获取Configuration
Configuration conf = new Configuration();

//创建Job
Job job = new Job(conf,TopKMapReduce.class.getSimpleName());
//set job run class
job.setJarByClass(TopKMapReduce.class);
// set job input
FileInputFormat.addInputPath(job, new Path(args[0]));

//mapper
job.setMapperClass(TopKMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);

//reducer 不需要
job.setNumReduceTasks(0);

//set job output
FileOutputFormat.setOutputPath(job, new Path(args[1]));

//提交Job,等待运行结果,并在客户端显示信息
boolean isSuccess = job.waitForCompletion(true);

//结束程序
return isSuccess?0:1;
}

//main entrance
public static void main(String[] args) throws Exception {
args = new String[]{
"hdfs://hadoop-master.dragon.org:9000/opt/data/test/input/simple_file.txt",
"hdfs://hadoop-master.dragon.org:9000/opt/data/test/output4/"
};
int status = new TopKMapReduce().run(args);
System.out.println(status);
}
}
对应的结果如下:

akal	100


2、某个文件中某列数据出现次数前N的数据

    输出查找到的前N的次数

package org.dragon.hadoop.app.topkey;

import java.util.TreeSet;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
* 某个文件中某列数据出现次数的Top N
* value
* @author Administrator
*
*/
public class TopKMapReduce2 {

static class TopKMapper extends Mapper<LongWritable, Text, Text, LongWritable>{

public static final int KEY = 3;
TreeSet<Long> topSet = new TreeSet<Long>();

protected void setup(Context context) throws java.io.IOException ,InterruptedException {
}


//针对每行进行map操作
protected void map(
LongWritable key,
Text value,
Context context)
throws java.io.IOException, InterruptedException {
//获取value值
String lineValue = value.toString();
//制表符分割
String[] strs = lineValue.split("\t");
long tempValue = Long.valueOf(strs[1]);


//add to TopSet
topSet.add(tempValue);

if(topSet.size()>KEY){
topSet.remove(topSet.first());
}

}

//map任务执行完后调用
@Override
protected void cleanup(Context context) throws java.io.IOException ,InterruptedException {
LongWritable setKey = new LongWritable();
for(Long top:topSet){
setKey.set(top);
context.write(new Text(), setKey);
}
}
}

//Driver
public int run(String[] args)throws Exception{
//获取Configuration
Configuration conf = new Configuration();

//创建Job
Job job = new Job(conf,TopKMapReduce2.class.getSimpleName());
//set job run class
job.setJarByClass(TopKMapReduce2.class);
// set job input
FileInputFormat.addInputPath(job, new Path(args[0]));

//mapper
job.setMapperClass(TopKMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);

//reducer 不需要
job.setNumReduceTasks(0);

//set job output
FileOutputFormat.setOutputPath(job, new Path(args[1]));

//提交Job,等待运行结果,并在客户端显示信息
boolean isSuccess = job.waitForCompletion(true);

//结束程序
return isSuccess?0:1;
}

//main entrance
public static void main(String[] args) throws Exception {
args = new String[]{
"hdfs://hadoop-master.dragon.org:9000/opt/data/test/input/simple_file.txt",
"hdfs://hadoop-master.dragon.org:9000/opt/data/test/output6/"
};
int status = new TopKMapReduce2().run(args);
System.out.println(status);
}
}
对应的结果如下:

	8
10
100


3、某个文件中出现次数的前N数据

     输出内容包括对应的key和value

(1)首先自定义字段类型

package org.dragon.hadoop.app.topkey;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

/**
* 用于查找某个文件中TopKey数据
* @author Administrator
*
*/
public class TopKWritable implements WritableComparable<TopKWritable> {

private String word;
private Long count;


public TopKWritable(){

}

public TopKWritable(String word,Long count){
this.set(word, count);
}

public void set(String word,Long count) {
this.word = word;
this.count = count;
}

public String getWord() {
return word;
}


public Long getCount() {
return count;
}


@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(word);
out.writeLong(count);
}

@Override
public void readFields(DataInput in) throws IOException {
this.word = in.readUTF();
this.count = in.readLong();
}

@Override
public int compareTo(TopKWritable o) {
int cmp = this.word.compareTo(o.getWord());
if(0!=cmp)
return cmp;
return this.count.compareTo(o.getCount());
}

@Override
public String toString() {
// TODO Auto-generated method stub
return this.word +":"+ this.count;
}

@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((count == null) ? 0 : count.hashCode());
result = prime * result + ((word == null) ? 0 : word.hashCode());
return result;
}

@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
TopKWritable other = (TopKWritable) obj;
if (count == null) {
if (other.count != null)
return false;
} else if (!count.equals(other.count))
return false;
if (word == null) {
if (other.word != null)
return false;
} else if (!word.equals(other.word))
return false;
return true;
}


}
(2)进行map查找

package org.dragon.hadoop.app.topkey;

import java.util.Comparator;
import java.util.TreeSet;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
* 某个文件中某列数据出现次数的Top N
* key --- value
* @author Administrator
*
*/
public class TopKMapReduce3 {

static class TopKMapper extends Mapper<LongWritable, Text, Text, LongWritable>{

public static final int KEY = 3;
TreeSet<TopKWritable> topSet = new TreeSet<TopKWritable>(new Comparator<TopKWritable>() {
@Override
public int compare(TopKWritable o1, TopKWritable o2) {

return o1.getCount().compareTo(o2.getCount());
}
});

protected void setup(Context context) throws java.io.IOException ,InterruptedException {
}


//针对每行进行map操作
protected void map(
LongWritable key,
Text value,
Context context)
throws java.io.IOException, InterruptedException {
//获取value值
String lineValue = value.toString();
//制表符分割
String[] strs = lineValue.split("\t");
long tempValue = Long.valueOf(strs[1]);


//add to TopSet
topSet.add(new TopKWritable(strs[0],tempValue));

if(topSet.size()>KEY){
topSet.remove(topSet.first());
}

}

//map任务执行完后调用
@Override
protected void cleanup(Context context) throws java.io.IOException ,InterruptedException {
for(TopKWritable top:topSet){
context.write(new Text(top.getWord()), new LongWritable(top.getCount()));
}
}
}

//Driver
public int run(String[] args)throws Exception{
//获取Configuration
Configuration conf = new Configuration();

//创建Job
Job job = new Job(conf,TopKMapReduce3.class.getSimpleName());
//set job run class
job.setJarByClass(TopKMapReduce3.class);
// set job input
FileInputFormat.addInputPath(job, new Path(args[0]));

//mapper
job.setMapperClass(TopKMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);

//reducer 不需要
job.setNumReduceTasks(0);

//set job output
FileOutputFormat.setOutputPath(job, new Path(args[1]));

//提交Job,等待运行结果,并在客户端显示信息
boolean isSuccess = job.waitForCompletion(true);

//结束程序
return isSuccess?0:1;
}

//main entrance
public static void main(String[] args) throws Exception {
args = new String[]{
"hdfs://hadoop-master.dragon.org:9000/opt/data/test/input/simple_file.txt",
"hdfs://hadoop-master.dragon.org:9000/opt/data/test/output7/"
};
int status = new TopKMapReduce3().run(args);
System.out.println(status);
}
}
结果如下:

fds	8
bnn 10
akal 100
       以上是Hadoop在某个文件中TopKey示例讲解。

下面是在多个文件中TopKey数据查找:

package org.dragon.hadoop.app.topkey;

import java.io.IOException;
import java.util.Comparator;
import java.util.TreeSet;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.dragon.hadoop.app.DataWritable;

/**
* 多个文件中某列数据出现次数的Top N
* key --- value
* @author Administrator
*
*/
public class TopKMapReduce4 {

public static final int KEY = 3;

static class TopKMapper extends Mapper<LongWritable, Text, Text, LongWritable>{

private Text mapOutputKey = new Text();
private LongWritable mapOutputValue = new LongWritable();

protected void setup(Context context) throws java.io.IOException ,InterruptedException {
}


//针对每行进行map操作
protected void map(
LongWritable key,
Text value,
Context context)
throws java.io.IOException, InterruptedException {
//获取value值
String lineValue = value.toString();
//制表符分割
String[] strs = lineValue.split("\t");
long tempValue = Long.valueOf(strs[1]);

//set
mapOutputKey.set(strs[0]);
mapOutputValue.set(tempValue);
//set map 输出
context.write(mapOutputKey, mapOutputValue);
}

//map任务执行完后调用
@Override
protected void cleanup(Context context) throws java.io.IOException ,InterruptedException {
super.cleanup(context);
}
}



//Reducer
static class TopKReduce extends Reducer<Text, LongWritable, Text, LongWritable>{

TreeSet<TopKWritable> topSet = new TreeSet<TopKWritable>(new Comparator<TopKWritable>() {
@Override
public int compare(TopKWritable o1, TopKWritable o2) {

return o1.getCount().compareTo(o2.getCount());
}
});


@Override
public void setup(
Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
super.setup(context);
}

@Override
public void reduce(Text key, Iterable<LongWritable> values,
Context context)
throws IOException, InterruptedException {
long temp = 0L;
for(LongWritable value:values){
temp += value.get();
}
//把结果放入topset中
topSet.add(new TopKWritable(key.toString(), temp));
//比较
if(topSet.size() > KEY){
topSet.remove(topSet.first());
}
}

@Override
public void cleanup(
Context context)
throws IOException, InterruptedException {
for(TopKWritable top:topSet){
context.write(new Text(top.getWord()), new LongWritable(top.getCount()));
}
}
}

//Driver
public int run(String[] args)throws Exception{
//获取Configuration
Configuration conf = new Configuration();

//创建Job
Job job = new Job(conf,TopKMapReduce4.class.getSimpleName());
//set job run class
job.setJarByClass(TopKMapReduce4.class);
// set job input
FileInputFormat.addInputPath(job, new Path(args[0]));

//mapper
job.setMapperClass(TopKMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);

//reducer 1--默认
job.setNumReduceTasks(1);
job.setReducerClass(TopKReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);

//set job output
FileOutputFormat.setOutputPath(job, new Path(args[1]));

//提交Job,等待运行结果,并在客户端显示信息
boolean isSuccess = job.waitForCompletion(true);

//结束程序
return isSuccess?0:1;
}

//main entrance
public static void main(String[] args) throws Exception {
args = new String[]{
"hdfs://hadoop-master.dragon.org:9000/opt/data/test/input/simple_file.txt",
"hdfs://hadoop-master.dragon.org:9000/opt/data/test/output7/"
};
int status = new TopKMapReduce4().run(args);
System.out.println(status);
}
}
注意,多了Reducer设置。