【智能大数据分析 | 实验四】Spark实验:Spark Streaming-五、实验步骤

时间:2024-10-26 22:58:07

(一)启动 Hadoop 集群和 Spark 集群

具体部署 Hadoop 和 Spark 集群的步骤可参考:【智能大数据分析 | 实验二】Spark实验:部署Spark集群

这里,登录大数据实验一体机,启动实验,并点击右上方的一键搭建按钮,等待一键搭建完成。

使用jps检验 Hadoop 集群和 Spark 集群是否成功启动。成功启动 Hadoop 集群和 Spark 集群的情况使用jps命令能成功看到以下 java 进程。

jps

在这里插入图片描述在这里插入图片描述在这里插入图片描述

(二)编写 SparkStreaming 代码

打开 IntelliJ IDEA 准备编写 Spark-streaming 代码。点击 File -> New -> Module -> Maven -> Next -> 输入 GroupId 和 AriifactId -> Next -> 输入 Module name 新建一个 maven 的 Module。

在这里插入图片描述在这里插入图片描述

打开项目录,点击目录下的pom.xml文件,在标签中输入 maven 的依赖。然后右键 -> maven -> Reimport 导入 maven 依赖, 效果如下:

在这里插入图片描述

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    
    <groupId>com.cstor.sparkstreaming</groupId>
    <artifactId>nice</artifactId>
    <version>1.0-SNAPSHOT</version>
    
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.6</source>
                    <target>1.6</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
    <!-- https://mvnrepository.com/artifact/org.apache.spark/Spark Streaming_2.10 -->
    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.10</artifactId>
            <version>1.6.0</version>
        </dependency>
    </dependencies>
</project>

src/main/java的目录下,点击java目录新建一个 package 命名为spark.streaming.test,然后在包下新建一个SparkStreaming的 java class。在SparkStreaming中键入代码。

在这里插入图片描述

package spark.streaming.test;

import scala.Tuple2;
import com.google.common.collect.Lists;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.StorageLevels;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import java.util.Iterator;
import java.util.regex.Pattern;

public class SparkStreaming {
    private static final Pattern SPACE = Pattern.compile(" ");
    public static void main(String[] args) throws InterruptedException {
        if (args.length < 2) {
            System.err.println("Usage: JavaNetworkWordCount <hostname> <port>");System.exit(1);
        }
        SparkConf sparkConf = new SparkConf().setAppName("JavaNetworkWordCount");
        JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(1));
        JavaReceiverInputDStream<String> lines = ssc.socketTextStream(
                args[0], Integer.parseInt(args[1]), StorageLevels.MEMORY_AND_DISK_SER);
        JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public Iterable<String> call(String x){
                return Lists.newArrayList(SPACE.split(x));
            }
        });
        
        JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
                new PairFunction<String, String, Integer>() {
                    @Override
                    public Tuple2<String, Integer> call(String s) {
                        return new Tuple2<String, Integer>(s, 1);
                    }
                }).reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer i1, Integer i2) {
                return i1 + i2;
            }
        });
        
        wordCounts.print();
        ssc.start();
        ssc.awaitTermination();
    }
}

:由于原程序运行后,每1秒刷新一次(即从监听入口接收一次信息)很难即时截到图,所以将程序中ssc的刷新时间适当提高,便很容易截到。

在这里插入图片描述

点击 File -> Project Structure -> Aritifacts -> 点击加号 -> JAR -> from modules with dependences -> 选择刚才新建的 module -> 选择 Main Class -> Ok -> 选择 Output directory -> 点击 Ok。

在这里插入图片描述
去掉除guava-14.0.1.jarguice-3.0.jar以外所有的 JAR 包,点击 Apply,再点击 Ok。

在这里插入图片描述
点击 Build -> Build Aritifacts 。

在这里插入图片描述
在这里插入图片描述
然后,就可以在类似该路径下D:\DELL\AppData\IdealWorkSpace\out\artifacts\sparkstreaming_jar找到刚才生成的 jar 包。

在这里插入图片描述
选择刚才设置的 jar 包,上传到 master 上去。

在这里插入图片描述

(三)运行 Sparksteaming JAR包

新建一个 SSH 连接,登录 master 服务器,使用命令nc -lk 9999设置路由器。

nc -lk 9999

在这里插入图片描述

:如果系统只没有nc这个命令,可以使用yum install nc安装nc命令。

进入 spark 的安装目录,执行下面的命令。

cd /usr/cstor/spark
bin/spark-submit --class spark.streaming.test.SparkStreaming ~/sparkstreaming.jar localhost 9999

在网络流中输入单词。按回车结束一次输出。

在命令提交的 xshell 连接中观察程序输出。按 Ctrl+C 可终止程序运行。