个人Hadoop编程代码记录

时间:2022-05-10 06:27:31

**WordCount

package cn.cpl.recom;

import java.io.IOException;

import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.conf.Configured;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import org.apache.hadoop.util.Tool;

import org.apache.hadoop.util.ToolRunner;

public class WordCount extends Configured implements Tool{

static class WordCountMapper

extends Mapper<LongWritable, Text, Text, IntWritable>{

// 统计使用变量

private final static IntWritable one=

new IntWritable(1);

// 单词变量

private Text word=new Text();

	/**
* key:当前读取行的偏移量
* value:当前读取的行
* context:map方法执行时上下文
*/
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
StringTokenizer words=
new StringTokenizer(value.toString(), " "); while(words.hasMoreTokens()){
word.set(words.nextToken());
context.write(word, one);
}
}
} static class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
private IntWritable counter = new IntWritable();
/**
* key:待统计的word
* values:待统计word的所有统计标识
* context:reduce方法执行时的上下文
*/
@Override
protected void reduce(Text key,
Iterable<IntWritable> values,
Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
// TODO Auto-generated method stub
int count=0;
for(IntWritable one:values){
count+=one.get();
}
counter.set(count);
context.write(key, counter);
}
}

// @Override

public int run(String[] args) throws Exception {

//获得程序运行时的配置信息

Configuration conf=getConf();

String inputPath=conf.get("input");

String outputPath=conf.get("output");

	//构建新的作业
Job job = Job.getInstance(conf, "Word Frequence Count");
job.setJarByClass(WordCount.class); //给job设置mapper类及map方法输出的键值类型
job.setMapperClass(WordCountMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class); //给job设置reducer类及reduce方法输出的键值类型
job.setReducerClass(WordCountReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class); //设置数据的读取方式(文本文件)及结果的输出方式(文本文件)
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class); //设置输入和输出目录
TextInputFormat.addInputPath(job, new Path(inputPath));
TextOutputFormat.setOutputPath(job, new Path(outputPath)); //将作业提交集群执行
return job.waitForCompletion(true)?0:1;
} public static void main(String[] args) throws Exception{
int status = ToolRunner.run(new WordCount(), args);
System.exit(status);
}

}

**删除文件夹

public static void rmr(String folder,Configuration conf) throws IOException {

Path path = new Path(folder);

FileSystem fs = FileSystem.get(conf);

fs.deleteOnExit(path);

System.out.println("Delete: " + folder);

fs.close();

}