实现一个mapreduce的job

时间:2021-02-04 18:19:54

介绍

Hadoop安装好后,有人会想做一个mapreduce的job跑一跑,mapreduce其实是两个功能,一个是mapper,一个是reducer,废话不多说,现在开始。


正文

1 环境

1.1 部署hadoop

单机版即可,namenode,datanode,resourcemanager, nodemanager,secondnamenode都部署在同一台机器上。

创建hadoop用户

生成ssh公钥私钥,保证ssh localhost能通

配置文件core-site.xml

<configuration>
    <property>
        <name>hadoop.tmp.dir</name>
        <value>/opt/hadoop-2.6.0/tmp</value>
    </property>

    <property>
      <name>fs.defaultFS</name>
        <value>hdfs://localhost:9000</value>
        </property>

</configuration>

配置文件hdfs-site.xml

<configuration>
<property>
             <name>dfs.namenode.name.dir</name>
                     <value>file:/opt/hadoop-2.6.0/hdfs/name</value>
                         </property>
    <property>
            <name>dfs.datanode.data.dir</name>
                    <value>file:/opt/hadoop-2.6.0/hdfs/data</value>
                        </property>
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
</configuration>

配置文件mapred-site.xml

<configuration>
<property>
<name>mapred.job.tracker</name>
<value>localhost:8000</value>
</property>
</configuration>

具体步骤清参见我的博客的文章:  

linux上部署hadoop集群 基础篇  

1.2 安装eclipse

这个自己想办法搞定吧,只要能启动就行了,这里就不一一赘述了。


2 写java程序

2.1 新建一个java project,再新建一个类:MaxTemperatureMapper

代码如下:

package mapreduce_maxtempature;import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class MaxTemperatureMapper extends
 Mapper<LongWritable, Text, Text, IntWritable> {
private static final int MISSING = 9999;
@Override
 public void map(LongWritable key, Text value, Context context)
 throws IOException, InterruptedException {
String line = value.toString();
 String year = line.substring(15, 19);
 int airTemperature;
 if (line.charAt(87) == '+') { // parseInt doesn't like leading plus
 // signs
 airTemperature = Integer.parseInt(line.substring(88, 92));
 } else {
 airTemperature = Integer.parseInt(line.substring(87, 92));
 }
 String quality = line.substring(92, 93);
 if (airTemperature != MISSING && quality.matches("[01459]")) {
 context.write(new Text(year), new IntWritable(airTemperature));
 }
 }
}

2.2 再建一个类:MaxTemperatureReducer

代码如下:

package mapreduce_maxtempature;import java.io.IOException;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;public class MaxTemperatureReducerextends Reducer<Text, IntWritable, Text, IntWritable> {@Overridepublic void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int maxValue = Integer.MIN_VALUE; for (IntWritable value : values) { maxValue = Math.max(maxValue, value.get()); } context.write(key, new IntWritable(maxValue));}}

2.3 再建一个类:MaxTemperatureDriver

代码如下:

package mapreduce_maxtempature;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;/*This class is responsible for running map reduce job*/public class MaxTemperatureDriver extends Configured implements Tool{public int run(String[] args) throws Exception { if(args.length !=2) { System.err.println("Usage: MaxTemperatureDriver <input path> <outputpath>"); System.exit(-1); } @SuppressWarnings("deprecation")Job job = new Job(); job.setJarByClass(MaxTemperatureDriver.class); job.setJobName("Max Temperature"); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job,new Path(args[1])); job.setMapperClass(MaxTemperatureMapper.class); job.setReducerClass(MaxTemperatureReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); System.exit(job.waitForCompletion(true) ? 0:1);  boolean success = job.waitForCompletion(true); return success ? 0 : 1; }public static void main(String[] args) throws Exception { MaxTemperatureDriver driver = new MaxTemperatureDriver(); int exitCode = ToolRunner.run(driver, args); System.exit(exitCode); }}

最后一个调用ToolRunner来运行job。保证三个java程序都没有错误。


2.4 打包成jar包

如何打jar包,这里不再赘述,不会的请自己搞定。


2.5 生成sample.txt

新建一个文件sample.txt,内容如下:

0035227070999991902122213004+62167+030650FM-12+010299999V0209991C000019999999N0000001N9-01721+99999101221ADDGF107991999999999999999999MW1721
0029227070999991902122220004+62167+030650FM-12+010299999V0209991C000019999999N0000001N9-02001+99999101551ADDGF100991999999999999999999
0035227070999991902122306004+62167+030650FM-12+010299999V0201401N002119999999N0000001N9-01611+99999101161ADDGF100991999999999999999999MW1721
0035227070999991902122313004+62167+030650FM-12+010299999V0201801N001019999999N0000001N9-00781+99999100191ADDGF108991999999999999999999MW1721
0029227070999991902122320004+62167+030650FM-12+010299999V0203201N002119999999N0000001N9-00281+99999099601ADDGF108991999999999999999999
0029227070999991902122406004+62167+030650FM-12+010299999V0203201N004119999999N0000001N9-00111+99999098601ADDGF100991999999999999999999
0029227070999991902122413004+62167+030650FM-12+010299999V0209991C000019999999N0000001N9-00281+99999098711ADDGF108991999999999999999999
0029227070999991902122420004+62167+030650FM-12+010299999V0209991C000019999999N0000001N9-00501+99999098831ADDGF100991999999999999999999
0029227070999991902122506004+62167+030650FM-12+010299999V0201801N001019999999N0000001N9-00281+99999097351ADDGF108991999999999999999999
0029227070999991902122513004+62167+030650FM-12+010299999V0201801N015919999999N0000001N9-00221+99999095821ADDGF108991999999999999999999
0029227070999991902122520004+62167+030650FM-12+010299999V0201801N015919999999N0000001N9-00331+99999095751ADDGF108991999999999999999999
0029227070999991902122606004+62167+030650FM-12+010299999V0201801N006219999999N0000001N9-00891+99999095401ADDGF100991999999999999999999
0029227070999991902122613004+62167+030650FM-12+010299999V0202301N002119999999N0000001N9-00891+99999095281ADDGF107991999999999999999999
0029227070999991902122620004+62167+030650FM-12+010299999V0202301N001019999999N0000001N9-01331+99999095581ADDGF100991999999999999999999
0029227070999991902122706004+62167+030650FM-12+010299999V0203201N001019999999N0000001N9-01111+99999095801ADDGF108991999999999999999999
0035227070999991902122713004+62167+030650FM-12+010299999V0203601N002119999999N0000001N9-01061+99999096221ADDGF108991999999999999999999MW1721
0029227070999991902122720004+62167+030650FM-12+010299999V0203201N002119999999N0000001N9-01171+99999096711ADDGF108991999999999999999999
0029227070999991902122806004+62167+030650FM-12+010299999V0203201N004119999999N0000001N9-01171+99999097441ADDGF100991999999999999999999
0029227070999991902122813004+62167+030650FM-12+010299999V0202901N001019999999N0000001N9-00891+99999097791ADDGF108991999999999999999999
0029227070999991902122820004+62167+030650FM-12+010299999V0203201N002119999999N0000001N9-01061+99999097671ADDGF100991999999999999999999
0029227070999991902122906004+62167+030650FM-12+010299999V0203201N002119999999N0000001N9-01281+99999097601ADDGF100991999999999999999999
0029227070999991902122913004+62167+030650FM-12+010299999V0203601N004119999999N0000001N9-01221+99999097721ADDGF108991999999999999999999
0029227070999991902122920004+62167+030650FM-12+010299999V0203601N001019999999N0000001N9-01441+99999097821ADDGF100991999999999999999999
0029227070999991902123006004+62167+030650FM-12+010299999V0203601N006219999999N0000001N9-01561+99999098041ADDGF106991999999999999999999
0029227070999991902123013004+62167+030650FM-12+010299999V0200501N001019999999N0000001N9-01561+99999097981ADDGF108991999999999999999999
0029227070999991902123020004+62167+030650FM-12+010299999V0209991C000019999999N0000001N9-01501+99999098461ADDGF100991999999999999999999
0029227070999991902123106004+62167+030650FM-12+010299999V0200501N002119999999N0000001N9-01171+99999098641ADDGF108991999999999999999999
0029227070999991902123113004+62167+030650FM-12+010299999V0200501N002119999999N0000001N9-01281+99999099551ADDGF108991999999999999999999
0029227070999991902123120004+62167+030650FM-12+010299999V0200501N002119999999N0000001N9-01831+99999100111ADDGF100991999999999999999999

2.6 上传sample.txt到hdfs文件系统,即hdfs://localhost:9000/

hadoop dfs -put sample.txt hdfs://localhost:9000/

查看结果:

hadoop dfs -ls hdfs://localhost:9000/

2.7 执行job

hadoop jar mapreduce_maxtempature.jar     /sample.txt /output

成功后会在hdfs文件系统中自动生成output文件夹,里面有内容,是job执行结果。

若有错误请根据具体的结果调试。

我的结果如下:

hdfs dfs -cat hdfs://localhost:9000/output/part-r-000001901    3171902    244


本文出自 “Linux运维” 博客,谢绝转载!