使用maven构建一个基于Java的spark应用程序用于统计唐诗三百首中各汉字出现的次数

时间:2022-01-09 21:34:25

目的:统计唐诗三百首全集每个汉字出现的次数

软件需求:

    下载 Eclipse ,安装Maven     安装好JDK1.8     下载并配置了hadoop-2.7.3     spark-2.2.0-bin-hadoop2.7 步骤: 1,创建Maven Project使用maven构建一个基于Java的spark应用程序用于统计唐诗三百首中各汉字出现的次数 使用maven构建一个基于Java的spark应用程序用于统计唐诗三百首中各汉字出现的次数
根据提示一步步走使用maven构建一个基于Java的spark应用程序用于统计唐诗三百首中各汉字出现的次数 ---选择 Maven-archetype-quickstart 选项 使用maven构建一个基于Java的spark应用程序用于统计唐诗三百首中各汉字出现的次数
下一步:使用maven构建一个基于Java的spark应用程序用于统计唐诗三百首中各汉字出现的次数 填写好Group id 和artifact id  点击完成 使用maven构建一个基于Java的spark应用程序用于统计唐诗三百首中各汉字出现的次数
下一步修改pom配置,将配置修改如下:






<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.dt.spark</groupId>
<artifactId>sparkApps</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>

<name>sparkApps</name>
<url>http://maven.apache.org</url>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>


<dependencies>

<dependency>
<groupId>jdk.tools</groupId>
<artifactId>jdk.tools</artifactId>
<version>1.8</version>
<scope>system</scope>
<systemPath>${JAVA_HOME}/lib/tools.jar</systemPath>
</dependency>
<dependency>

<groupId>org.apache.spark</groupId>

<artifactId>spark-core_2.10</artifactId>

<version>1.6.0</version>

</dependency>

<dependency>

<groupId>org.apache.spark</groupId>

<artifactId>spark-sql_2.10</artifactId>

<version>1.6.0</version>

</dependency>

<dependency>

<groupId>org.apache.spark</groupId>

<artifactId>spark-hive_2.10</artifactId>

<version>1.6.0</version>

</dependency>
</dependencies>
<build>
<sourceDirectory>src/main/java</sourceDirectory>
<testSourceDirectory>src/main/test</testSourceDirectory>


<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<maniClass></maniClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>1.3.1</version>
<executions>
<execution>
<goals>
<goal>exec</goal>
</goals>
</execution>
</executions>
<configuration>
<executable>java</executable>
<includeProjectDependencies>false</includeProjectDependencies>
<classpathScope>compile</classpathScope>
<mainClass>com.dt.spark.SparkApps.WordCount</mainClass>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.6</source>
<target>1.6</target>
</configuration>
</plugin>
</plugins>
</build>
</project>

//完成以后等待自动安装好JAR包即可Eclipse会下载 相关的Jar文件
//关于pom.xml文件的配置,可以通过以下网站(Maven*仓库信息速查 http://maven.outofmemory.cn/org.apache.spark/)
//搜索并对照修改,如要查找 spark-streaming_2.10 的依赖包的话,可以点击并对应找到。

如果下载更新时出现错误,忽略!

安装好之后在使用maven构建一个基于Java的spark应用程序用于统计唐诗三百首中各汉字出现的次数在这个包下面新建一个包,然后在包下面新建一个Java类WordCount1

package com.dt.spark.sparkApps.cores;



import java.util.Arrays;
import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;

import scala.Tuple2;


public final class WordCount1 {


public static void main(String[] args) throws Exception {


//context ,用于读文件 ,类似于scala的sc
//格式为:
// JavaSparkContext(master: String, appName: String, sparkHome: String, jars: Array[String], environment: Map[String, String])
/* JavaSparkContext ctx = new JavaSparkContext("yarn-standalone", "JavaWordCount",
System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(WordCount1.class)); */
SparkConf conf =new SparkConf().setAppName("Spark WordCount written by java").setMaster("local");//本地测试
JavaSparkContext ctx=new JavaSparkContext(conf);
//也可以使用ctx获取环境变量,例如下面的语句
System.out.println("spark home:"+ctx.getSparkHome());

//一次一行,String类型 ,还有hadoopfile,sequenceFile什么的 ,可以直接用sc.textFile("path")
JavaRDD<String> lines = ctx.textFile("D:\\迅雷下载\\唐诗三百首(全集).txt"); //java.lang.String path, int minSplits
lines.cache(); //cache,暂时放在缓存中,一般用于哪些可能需要多次使用的RDD,据说这样会减少运行时间


//collect方法,用于将RDD类型转化为java基本类型,如下
List<String> line = lines.collect();
for(String val:line)
System.out.println(val);

//下面这些也是RDD的常用函数
// lines.collect(); List<String>
// lines.union(); javaRDD<String>
// lines.top(1); List<String>
// lines.count(); long
// lines.countByValue();

/**
* filter test
* 定义一个返回bool类型的函数,spark运行filter的时候会过滤掉那些返回只为false的数据
* String s,中的变量s可以认为就是变量lines(lines可以理解为一系列的String类型数据)的每一条数据
*/
JavaRDD<String> contaninsE = lines.filter(new Function<String, Boolean>() {
public Boolean call(String s) throws Exception {


return (s.contains("they"));
}
});
System.out.println("--------------next filter's result------------------");
line = contaninsE.collect();
for(String val:line)
System.out.println(val);

/**
* sample test
* sample函数使用很简单,用于对数据进行抽样
* 参数为:withReplacement: Boolean, fraction: Double, seed: Int
*
*/

JavaRDD<String> sampletest = lines.sample(false,0.1,5);
System.out.println("-------------next sample-------------------");
line = sampletest.collect();
for(String val:line)
System.out.println(val);

/**
*
* new FlatMapFunction<String, String>两个string分别代表输入和输出类型
* Override的call方法需要自己实现一个转换的方法,并返回一个Iterable的结构
*
* flatmap属于一类非常常用的spark函数,简单的说作用就是将一条rdd数据使用你定义的函数给分解成多条rdd数据
* 例如,当前状态下,lines这个rdd类型的变量中,每一条数据都是一行String,我们现在想把他拆分成1个个的词的话,
* 可以这样写 :
*/


JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
public Iterable<String> call(String s) {
String[] words=s.split("");
return Arrays.asList(words);
}
});

/**
* map 键值对 ,类似于MR的map方法
* pairFunction<T,K,V>: T:输入类型;K,V:输出键值对
* 需要重写call方法实现转换
*/
JavaPairRDD<String, Integer> ones = words.mapToPair(new PairFunction<String, String, Integer>() {
public Tuple2<String, Integer> call(String s) {
return new Tuple2<String, Integer>(s, 1);
}
});


/**
* reduceByKey方法,类似于MR的reduce
* 要求被操作的数据(即下面实例中的ones)是KV键值对形式,该方法会按照key相同的进行聚合,在两两运算
*/
JavaPairRDD<String, Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>() {
public Integer call(Integer i1, Integer i2) { //reduce阶段,key相同的value怎么处理的问题
return i1 + i2;
}
});


//备注:spark也有reduce方法,输入数据是RDD类型就可以,不需要键值对,
// reduce方法会对输入进来的所有数据进行两两运算


JavaPairRDD<String,Integer> sort = counts.sortByKey();
System.out.println("----------next sort----------------------");
/**
* collect方法其实之前已经出现了多次,该方法用于将spark的RDD类型转化为我们熟知的java常见类型
*/
List<Tuple2<String, Integer>> output = sort.collect();
for (Tuple2<?,?> tuple : output) {
System.out.println(tuple._1 + ": " + tuple._2());
}

/**
* 保存函数,数据输出,spark为结果输出提供了很多接口
*/
sort.saveAsTextFile("/tmp/spark-tmp/test");

// sort.saveAsNewAPIHadoopFile();
// sort.saveAsHadoopFile();
System.exit(0);
}
}

缺点:不能对汉字按照发音排序!后续改进