(一)启动 Hadoop 集群和 Spark 集群
具体部署 Hadoop 和 Spark 集群的步骤可参考:【智能大数据分析 | 实验二】Spark实验:部署Spark集群
这里,登录大数据实验一体机,启动实验,并点击右上方的一键搭建按钮,等待一键搭建完成。
使用jps
检验 Hadoop 集群和 Spark 集群是否成功启动。成功启动 Hadoop 集群和 Spark 集群的情况使用jps
命令能成功看到以下 java 进程。
jps
(二)编写 SparkStreaming 代码
打开 IntelliJ IDEA 准备编写 Spark-streaming 代码。点击 File -> New -> Module -> Maven -> Next -> 输入 GroupId 和 AriifactId -> Next -> 输入 Module name 新建一个 maven 的 Module。
打开项目录,点击目录下的pom.xml
文件,在标签中输入 maven 的依赖。然后右键 -> maven -> Reimport 导入 maven 依赖, 效果如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.cstor.sparkstreaming</groupId>
<artifactId>nice</artifactId>
<version>1.0-SNAPSHOT</version>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.6</source>
<target>1.6</target>
</configuration>
</plugin>
</plugins>
</build>
<!-- https://mvnrepository.com/artifact/org.apache.spark/Spark Streaming_2.10 -->
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>1.6.0</version>
</dependency>
</dependencies>
</project>
在src/main/java
的目录下,点击java
目录新建一个 package 命名为spark.streaming.test
,然后在包下新建一个SparkStreaming
的 java class。在SparkStreaming
中键入代码。
package spark.streaming.test;
import scala.Tuple2;
import com.google.common.collect.Lists;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.StorageLevels;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import java.util.Iterator;
import java.util.regex.Pattern;
public class SparkStreaming {
private static final Pattern SPACE = Pattern.compile(" ");
public static void main(String[] args) throws InterruptedException {
if (args.length < 2) {
System.err.println("Usage: JavaNetworkWordCount <hostname> <port>");System.exit(1);
}
SparkConf sparkConf = new SparkConf().setAppName("JavaNetworkWordCount");
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(1));
JavaReceiverInputDStream<String> lines = ssc.socketTextStream(
args[0], Integer.parseInt(args[1]), StorageLevels.MEMORY_AND_DISK_SER);
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterable<String> call(String x){
return Lists.newArrayList(SPACE.split(x));
}
});
JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) {
return new Tuple2<String, Integer>(s, 1);
}
}).reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
});
wordCounts.print();
ssc.start();
ssc.awaitTermination();
}
}
附:由于原程序运行后,每1秒刷新一次(即从监听入口接收一次信息)很难即时截到图,所以将程序中ssc
的刷新时间适当提高,便很容易截到。
点击 File -> Project Structure -> Aritifacts -> 点击加号 -> JAR -> from modules with dependences -> 选择刚才新建的 module -> 选择 Main Class -> Ok -> 选择 Output directory -> 点击 Ok。
去掉除guava-14.0.1.jar
和guice-3.0.jar
以外所有的 JAR 包,点击 Apply,再点击 Ok。
点击 Build -> Build Aritifacts 。
然后,就可以在类似该路径下D:\DELL\AppData\IdealWorkSpace\out\artifacts\sparkstreaming_jar
找到刚才生成的 jar 包。
选择刚才设置的 jar 包,上传到 master 上去。
(三)运行 Sparksteaming JAR包
新建一个 SSH 连接,登录 master 服务器,使用命令nc -lk 9999
设置路由器。
nc -lk 9999
注:如果系统只没有nc
这个命令,可以使用yum install nc
安装nc命令。
进入 spark 的安装目录,执行下面的命令。
cd /usr/cstor/spark
bin/spark-submit --class spark.streaming.test.SparkStreaming ~/sparkstreaming.jar localhost 9999
在网络流中输入单词。按回车结束一次输出。
在命令提交的 xshell 连接中观察程序输出。按 Ctrl+C 可终止程序运行。