IDEA下新建一个gradle工程,用kotlin写MapReduce。

时间:2021-02-24 20:09:39

话不多说,直接上正餐。

首先我们新建一个gradle的项目,把默认的Java前面的勾去掉,然后再kotlin前面打钩后Next:

IDEA下新建一个gradle工程,用kotlin写MapReduce。

接着设置对应的groupId和artifactId,然后Next:

IDEA下新建一个gradle工程,用kotlin写MapReduce。

接着一直Next(当然要有什么其他的需求配置就自己选择操作):

IDEA下新建一个gradle工程,用kotlin写MapReduce。

在这边确认项目名称之后直接finish,然后项目建成,接着让子弹飞一会儿……

接着打开build.gradle文件,开始配置gradle项目,在dependencies下加入配置:

IDEA下新建一个gradle工程,用kotlin写MapReduce。

接着为了后续打包方便,我们再加入下列的配置:

def appMainClass = 'com.ktdemo.TempMainKt'apply plugin: 'application'
mainClassName = appMainClass

配置完成之后的配置文件截图如下:

IDEA下新建一个gradle工程,用kotlin写MapReduce。

其中的appMainClass是指启动类。后续我们要新建一个对应的main类。然后点击箭头所提示的地方,让gradle自动下载依赖包。

IDEA下新建一个gradle工程,用kotlin写MapReduce。

IDEA下新建一个gradle工程,用kotlin写MapReduce。

接着在新的项目下,kotlin目录中建立新的package:

IDEA下新建一个gradle工程,用kotlin写MapReduce。

新建一个TempMapper的kotlin类,代码如下:

package com.ktdemoimport org.apache.hadoop.io.IntWritableimport org.apache.hadoop.io.LongWritableimport org.apache.hadoop.io.Textimport org.apache.hadoop.mapred.JobConfimport org.apache.hadoop.mapred.OutputCollectorimport org.apache.hadoop.mapred.Reporterimport org.apache.hadoop.mapreduce.Mapperclass TempMapper : Mapper<LongWritable, Text, Text, IntWritable>() {    override fun map(key: LongWritable?, value: Text?, context: Context) {        print("Before Mapper:" + key + ", " + value)        var line : String = value.toString()        var year : String = line.substring(0, 4)        var temperature : Int = line.substring(8).toInt()        context.write(Text(year), IntWritable(temperature))        System.out.println("After Mapper:" + Text(year) + ", " + IntWritable(temperature));    }}
新建一个TempReducer的kotlin类,代码如下:
import org.apache.hadoop.io.IntWritableimport org.apache.hadoop.io.Textimport org.apache.hadoop.mapred.OutputCollectorimport org.apache.hadoop.mapred.Reporterimport org.apache.hadoop.mapreduce.Reducerclass TempReducer : Reducer<Text, IntWritable, Text, IntWritable>() {    override fun reduce(key: Text?, values: Iterable<IntWritable>?, context: Context?) {        var maxValue : Int = Int.MIN_VALUE        var sb : StringBuffer = StringBuffer()        //取values的最大值        if (values != null) {            for (value in values) {                maxValue = Math.max(maxValue, value.get())                sb.append(value).append(", ")            }        }        print("Before Reduce:" + key + ", " + sb.toString())        if (context != null) {            context.write(key, IntWritable(maxValue))        }        print("After Reduce:" + key + ", " + maxValue)    }}

最后我们新建一个main启动类,代码如下:

import org.apache.hadoop.conf.Configurationimport org.apache.hadoop.fs.Pathimport org.apache.hadoop.io.IntWritableimport org.apache.hadoop.io.Textimport org.apache.hadoop.mapreduce.Jobimport org.apache.hadoop.mapreduce.lib.input.FileInputFormatimport org.apache.hadoop.mapreduce.lib.output.FileOutputFormatfun resource(name: String) = TempMain::class.java.getResource(name)object TempMain{    fun run (input : String?, outPut : String?) {        var hadoopConfig : Configuration = Configuration()        hadoopConfig.set("fs.hdfs.impl",                org.apache.hadoop.hdfs.DistributedFileSystem::class.java.name)        hadoopConfig.set("fs.file.impl",                org.apache.hadoop.fs.LocalFileSystem::class.java.name)        //Hadoop的配置文件        hadoopConfig.addResource(resource("/hdfs-conf/core-site.xml"))        hadoopConfig.addResource(resource("/hdfs-conf/hdfs-site.xml"))        var job : Job = Job(hadoopConfig)        //如果需要打成jar运行,需要下面这句        job.setJarByClass(TempMain::class.java)        //job执行作业时输入和输出文件的路径        FileInputFormat.addInputPath(job, Path(input))        FileOutputFormat.setOutputPath(job, Path(outPut))        //指定自定义的Mapper和Reducer作为两个阶段的任务处理类        job.mapperClass = TempMapper::class.java        job.reducerClass = TempReducer::class.java        //设置最后输出结果的Key和Value的类型        job.outputKeyClass = Text::class.java        job.outputValueClass = IntWritable::class.java        //执行job,直到完成        job.waitForCompletion(true)        print("Finished")    }}fun main(args: Array<String>) {    //输入路径 这边的IP是Hadoop的Master地址    val dst = "hdfs://172.16.134.251:9000/test/input.txt"    //输出路径    val dstOut = "hdfs://172.16.134.251:9000/test/output3"    TempMain.run(dst, dstOut)}

其中,有引入Hadoop的配置文件。这边直接把Hadoop的conf下对应的配置文件复制过来即可。

IDEA下新建一个gradle工程,用kotlin写MapReduce。

完成之后,使用gradle把项目打成tar包。

IDEA下新建一个gradle工程,用kotlin写MapReduce。

这个时候就项目build目录下就可以看到对应的文件。

IDEA下新建一个gradle工程,用kotlin写MapReduce。

接下来把打好的tar包,传到Hadoop的虚拟机中,并且把需要计算的数据源文件也准备好。

2014010114201401021620140103172014010410201401050620120106092012010732201201081220120109192012011023200101011620010102122001010310200101041120010105292013010619201301072220130108122013010929201301102320080101052008010216200801033720080104142008010516200701061920070107122007010812200701099920070110232010010114201001021620100103172010010410201001050620150106492015010722201501081220150109992015011023

把上面的数据用vi命令新建成一个input.txt的文件。然后这个时候我们在虚拟机中看到的应该是如下的界面:

IDEA下新建一个gradle工程,用kotlin写MapReduce。

因为MapReduce在计算的时候是取hdfs里的资源,所以我们必须先把input.txt放入到hdfs中。具体命令如下:

hadoop fs -mkdir /test #新建一个test的目录hadoop fs -put /home/hadoop/runfile/input.txt /test/ #把对应的文件传入到hdfs里面的test目录下hadoop fs -ls /test #查询对应目录的信息

这个时候查询出来就应该会显示对应的input.txt已经在hdfs里面了。

IDEA下新建一个gradle工程,用kotlin写MapReduce。

当然也可以直接从浏览器下查看对应的文件是否已经传到hdfs里。

IDEA下新建一个gradle工程,用kotlin写MapReduce。

完成数据文件的准备后,把刚刚传上去的项目用tar命令解压一下:

tar -xvf ktdemo-1.0-SNAPSHOT.tar 

IDEA下新建一个gradle工程,用kotlin写MapReduce。

解压完成之后我们进入的到项目路径下,在bin目录中有一下两个执行文件。这边直接使用下面的命令执行即可:

./ktdemo

因为我们已经把原本的一些参数写在了项目当中,所以这边就无需再给出参数了。执行结果下显示:

[hadoop@Master bin]$ ./ktdemolog4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory).log4j:WARN Please initialize the log4j system properly.log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.Before Mapper:0, 2014010114After Mapper:2014, 14Before Mapper:11, 2014010216After Mapper:2014, 16Before Mapper:22, 2014010317After Mapper:2014, 17Before Mapper:33, 2014010410After Mapper:2014, 10Before Mapper:44, 2014010506After Mapper:2014, 6Before Mapper:55, 2012010609After Mapper:2012, 9Before Mapper:66, 2012010732After Mapper:2012, 32Before Mapper:77, 2012010812After Mapper:2012, 12Before Mapper:88, 2012010919After Mapper:2012, 19Before Mapper:99, 2012011023After Mapper:2012, 23Before Mapper:110, 2001010116After Mapper:2001, 16Before Mapper:121, 2001010212After Mapper:2001, 12Before Mapper:132, 2001010310After Mapper:2001, 10Before Mapper:143, 2001010411After Mapper:2001, 11Before Mapper:154, 2001010529After Mapper:2001, 29Before Mapper:165, 2013010619After Mapper:2013, 19Before Mapper:176, 2013010722After Mapper:2013, 22Before Mapper:187, 2013010812After Mapper:2013, 12Before Mapper:198, 2013010929After Mapper:2013, 29Before Mapper:209, 2013011023After Mapper:2013, 23Before Mapper:220, 2008010105After Mapper:2008, 5Before Mapper:231, 2008010216After Mapper:2008, 16Before Mapper:242, 2008010337After Mapper:2008, 37Before Mapper:253, 2008010414After Mapper:2008, 14Before Mapper:264, 2008010516After Mapper:2008, 16Before Mapper:275, 2007010619After Mapper:2007, 19Before Mapper:286, 2007010712After Mapper:2007, 12Before Mapper:297, 2007010812After Mapper:2007, 12Before Mapper:308, 2007010999After Mapper:2007, 99Before Mapper:319, 2007011023After Mapper:2007, 23Before Mapper:330, 2010010114After Mapper:2010, 14Before Mapper:341, 2010010216After Mapper:2010, 16Before Mapper:352, 2010010317After Mapper:2010, 17Before Mapper:363, 2010010410After Mapper:2010, 10Before Mapper:374, 2010010506After Mapper:2010, 6Before Mapper:385, 2015010649After Mapper:2015, 49Before Mapper:396, 2015010722After Mapper:2015, 22Before Mapper:407, 2015010812After Mapper:2015, 12Before Mapper:418, 2015010999After Mapper:2015, 99Before Mapper:429, 2015011023After Mapper:2015, 23Before Reduce:2001, 12, 10, 11, 29, 16, After Reduce:2001, 29Before Reduce:2007, 23, 19, 12, 12, 99, After Reduce:2007, 99Before Reduce:2008, 16, 14, 37, 16, 5, After Reduce:2008, 37Before Reduce:2010, 10, 6, 14, 16, 17, After Reduce:2010, 17Before Reduce:2012, 19, 12, 32, 9, 23, After Reduce:2012, 32Before Reduce:2013, 23, 29, 12, 22, 19, After Reduce:2013, 29Before Reduce:2014, 14, 6, 10, 17, 16, After Reduce:2014, 17Before Reduce:2015, 23, 49, 22, 12, 99, After Reduce:2015, 99Finished

这个时候,就已经可以查询到执行任务成功后生成的文件了:

IDEA下新建一个gradle工程,用kotlin写MapReduce。

通过下面的命令,可以把执行后的结果文件从HDFS中导出,导出后就可以看到对应的计算结果了。

hadoop fs -get /test/output3/part-r-00000

IDEA下新建一个gradle工程,用kotlin写MapReduce。

至此这个demo就算完成了。




参考:https://my.oschina.net/itblog/blog/275294