Spark Streaming 2.2.1 处理TCP数据源的案例与解读
本节讲解Spark Streaming处理TCP数据源的案例。
(一)准备工程,并构建测试类。构建程序打包的两种方式:
- 基于IDEA的Artifacts构建打包。
- 基于SBT的命令打包。
1) 基于IDEA构建应用程序。
在第2章构建的工程上,参考章节2.4.2 基于IDEA构建Spark应用程序的实例部分,继续添加依赖包如图4-4所示。
图 4 - 4 IDEA中的Project Structure…菜单
在IDEA中添加依赖包如图所示。
图 4 - 5 IDEA中添加的依赖包
图 4 - 6 IDEA中添加spark-examples_2.11-2.2.1源码依赖
本节案例基于Spark 2.2.1Examples提供的NetworkWordCount 类来实战TCP流数据的处理。如图4-6所示,可以为spark-examples_2.11-2.2.1.jar添加源码关联。在IDEA中查找examples中的NetworkWordCount 类,查找结果如图4-7所示。
图 4 - 7 IDEA中查找NetworkWordCount 类
构建自己的package,名为stream,在scala目录下,右键打开上下文菜单,选择”New”, 选择Package,操作步骤如图4-8所示。
图 4 - 8 IDEA中添加package
键入stream作为package名,点击OK按钮,如图4-9所示。
图 4 - 9 IDEA中设置添加的package名称
构建package后的目录结构如图4-10所示。
图 4 - 10 IDEA中构建package后的目录结构
在stream上右键,然后创建一个NetworkWordCount对象,如图4-11所示。
图 4 - 11 IDEA中创建一个NetworkWordCount对象
点击OK,拷贝代码,如图4-12所示。
图 4 - 12 IDEA中拷贝NetworkWordCount对象的代码
构建应用程序的Jar包,如图4-13所示。
图 4 - 13 IDEA中构建应用程序的Jar包
这里的Artifacts参见章节2.4.2基于IDEA构建Spark应用程序的实例部分。
查看构建的Jar包,已经包含了NetworkWordCount类,包含内容如图4-14所示。
图 4 - 14查看构建的jar包的类
可以通过WinRAR等解压工具打开 Jar包进行查看,也可以在命令行中使用Jar命令来解压查看,使用方法和Tar类似,具体可以查看命令的帮助信息。
2) 基于SBT构建应用程序。
在build.sbt文件中添加spark-streaming_2.11的依赖,build.sbt示例:
name :="WorkSpace"
version :="0.1"
scalaVersion :="2.11.12"
libraryDependencies+= "org.apache.spark" % "spark-core_2.11" %"2.2.1"
libraryDependencies+= "org.apache.spark" % "spark-sql_2.11" %"2.2.1"
libraryDependencies+= "org.apache.spark" % "spark-streaming_2.11" %"2.2.1"
打开终端,如IDEA中的终端Terminal(也可以打开Win下的cmd窗口),输入命令sbt package,具体操作如图4-15所示。
图 4 - 15 sbt package方式构建Jar包
本节案例使用IDEA的Artifacts打包的方式。需打包运行的业务代码如下。NetworkWordCount.scala的代码:
1. // scalastyle:off println2. package stream3. 4. import org.apache.spark.SparkConf5. import org.apache.spark.examples.streaming.StreamingExamples6. import org.apache.spark.storage.StorageLevel7. import org.apache.spark.streaming.{Seconds, StreamingContext}8. 9. /**10. * Counts words in UTF8 encoded,'\n' delimited text received from the network every second.11. *12. * Usage: NetworkWordCount<hostname> <port>13. * <hostname> and<port> describe the TCP server that Spark Streaming would connect toreceive data.14. *15. * To run this on your localmachine, you need to first run a Netcat server16. * `$ nc -lk 9999`17. * and then run the example18. * `$ bin/run-exampleorg.apache.spark.examples.streaming.NetworkWordCount localhost 9999`19. */20. object NetworkWordCount {21. def main(args: Array[String]) {22. if (args.length < 2) {23. System.err.println("Usage:NetworkWordCount <hostname> <port>")24. System.exit(1)25. }26. 27. StreamingExamples.setStreamingLogLevels()28. 29. // Create the context with a 1second batch size30. val sparkConf = newSparkConf().setAppName("NetworkWordCount")31. val ssc = newStreamingContext(sparkConf, Seconds(1))32. 33. // Create a socket stream ontarget ip:port and count the34. // words in input stream of \ndelimited text (eg. generated by 'nc')35. // Note that no duplication instorage level only for running locally.36. // Replication necessary indistributed scenario for fault tolerance.37. val lines =ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)38. val words =lines.flatMap(_.split(" "))39. val wordCounts = words.map(x=> (x, 1)).reduceByKey(_ + _)40. wordCounts.print()41. ssc.start()42. ssc.awaitTermination()43. }44. }45. // scalastyle:on println
任何作用在DStream实例上的操作都会转换为对其底层RDDs序列的操作,比如,代码中flatMap方法对应的DStream内部操作如图4-17所示。
图4 - 17DStream的flatMap方法对应的内部操作
其中,一个框框对应一个批数据,即一个RDD实例。
(一) 测试NetworkWordCount应用程序。
在Spark2.2.1集群上进行测试,测试内容:
- 数据发送端:使用nc-lk 工具发送消息。
- 数据接收端:使用SparkStreaming 2.2.1 接收数据,NetworkWordCount实现实时在线单词统计功能。
Spark2.2.1集群分别以Standalone模式、Yarn方式提交应用程序。
一、Spark以Standalone模式提交应用。
1) 数据接收端:在Spark集群中以Standalone模式提交NetworkWordCount应用。
在IDEA中通过Artifacts方式打包,打包Jar名字设置为testProject.Jar,将打包的testProject.Jar包通过Winscp工具上传到Master节点的/usr/local/streaming-examples-test目录。检查Jar包已经上传。
root@master:/usr/local/streaming-examples-test# ls -ltrtotal 8-rw-r--r-- 1 root root 7278 Feb 2213:42 testProject.jar 启动Hadoop集群、Spark集群。root@master:~#/usr/local/hadoop-2.6.0/sbin/start-all.sh……root@master:~#/usr/local/spark-2.2.1-bin-hadoop2.6/sbin/start-all.sh… 在$SPARK_HOME路径以Standalone模式通过Spark-Submit提交应用程序:root@master:~#spark-submit --master spark://master:7077 --executor-memory 512m --total-executor-cores 2 --jars/usr/local/streaming-examples-test/spark-streaming_2.11-2.2.1.jar,/usr/local/streaming-examples-test/spark-examples_2.11-2.2.1.jar --class stream.NetworkWordCount /usr/local/streaming-examples-test/testProject.jar master 9999……..18/02/22 14:07:48INFO spark.SparkContext: Running Spark version 2.2.1……
根据NetworkWordCount 应用的使用说明:"Usage:NetworkWordCount <hostname> <port>",在Spark-Submit的最后输入对应的master9999,作为应用程序的参数。
需要注意的是,由于NetworkWordCount代码中使用了StreamingExamples类,因此需要将依赖的/usr/local/streaming-examples-test/spark-examples_2.11-2.2.1.jar作--jars参数传人,否则Executor执行时会找不到StreamingExamples类。
查看Spark Web UI界面,如图4-18所示。由于当前虚拟机使用的内核数为2,可以分配给Executor和接收流的Receiver。
图 4 - 18Spark应用程序界面信息
2) 数据发送端:使用nc -lk 工具开始发送消息。
准备发送的消息内容:使用$SPARK_HOME 路径下的README.md文本中的内容:
root@master:/usr/local/spark-2.2.1-bin-hadoop2.6# catREADME.md |more# Apache Spark Spark is a fast and general cluster computing system forBig Data. It provideshigh-level APIs in Scala, Java, Python, and R, and anoptimized engine thatsupports general computation graphs for data analysis. Italso supports arich set of higher-level tools including Spark SQL forSQL and DataFrames,MLlib for machine learning, GraphX for graph processing,and Spark Streaming for streamprocessing.
在SecureCRT中打开Master连接的另一个终端,输入以下命令启动Netcat:root@master:~# nc -lk 9999 然后将README.md的内容拷贝到该终端界面上。root@master:~# nc -lk 9999# Apache Spark Spark is a fast and general cluster computing system forBig Data. It provideshigh-level APIs in Scala, Java, Python, and R, and anoptimized engine thatsupports general computation graphs for data analysis. Italso supports arich set of higher-level tools including Spark SQL forSQL and DataFrames,MLlib for machine learning, GraphX for graph processing,and Spark Streaming for streamprocessing.
切换到Spark-Submit的终端,可以看到如下输出:
18/02/22 14:35:10 INFO scheduler.DAGScheduler: Job 4522finished: print atNetworkWordCount.scala:57, took 0.027192 s-------------------------------------------Time: 1519281306000 ms-------------------------------------------(stream,1)(analysis.,1)(R,,1)(SQL,2)(,1)(learning,,1)(Python,,1)(Streaming,1)(supports,2)(that,1)... 18/02/22 14:35:10 INFOscheduler.JobScheduler: Finished job streaming job 1519281306000 ms.0 from jobset of time 1519281306000 ms……-------------------------------------------Time: 1519281307000 ms-------------------------------------------
在Time处看到每隔1s提交一次Job进行单词统计,如果没有收到数据但也提交job,打印显示时间戳。
二、Spark以Yarn方式提交应用。
先用Ctrl+C停止之前以Standalone模式提交运行的spark-submit命令,或者直接在另一个终端上查询Pid,然后kill掉进程。
root@master:~# jps3056 SecondaryNameNode4432 Jps3200 ResourceManager3523 Master4372 SparkSubmit2847 NameNoderoot@master:~# kill -9 4372root@master:~#
本节Spark将以Yarn方式提交应用,按照Spark应用程序中的Driver分布方式的不同,Sparkon YARN有两种模式:YARN-Client模式、YARN-Cluster模式。
- YARN-Client 模式:在Spark-Shell或者Spark-Submit 中,Driver都运行在启动Spark应用的机器上。在这种情形下,YARNApplication Master仅负责从YARN中请求资源。
- YARN-Cluster 模式:Driver自动运行在YARN Container(容器)里,客户端可以从集群中断开,或者用于其他作业。
YARN-Client模式下,ApplicationMaster仅向YARN请求Executor,Client会和请求的Container通信来调度它们工作。YARN-Client模式适合调试Spark程序,能在控制台输出一些调试信息。YARN-Cluster模式下,Driver运行在AM(ApplicationMaster)中,负责向YARN申请资源,并监督作业的运行状况。企业生产环境下会用YARN-Cluster模式来运行Spark应用程序。
Yarn集群资源管理器的启动:如果通过/usr/local/hadoop-2.6.0/sbin/start-all.sh启动Hadoop集群,默认已经启动Yarn集群资源管理。
Yarn也可通过以下命令启动:
root@master:/usr/local/hadoop-2.6.0/sbin#start-yarn.sh
Yarn启动以后,登陆Yarn Web页面(http://192.168.189.1:8088)查看,如图4-19所示。
图4 - 19Yarn资源管理查询页面
接下来讲解使用YARN-Client模式提交应用程序。
1) 数据接收端:Spark 以YARN-Client模式提交应用程序。到$SPARK_HOME路径下,以YARN-Client模式再次提交命令。
root@master:~# spark-submit --master yarn-client --executor-memory 1g --jars /usr/local/streaming-examples-test/spark-examples_2.11-2.2.1.jar --class stream.NetworkWordCount /usr/local/streaming-examples-test/testProject.jar master 9999……
2) 数据发送端:在Master节点另一个终端启动nc服务,将README.md文件内容拷贝进去。
root@master:~# nc -lk 9999# Apache Spark Spark is a fast and general cluster computing system forBig Data. It provideshigh-level APIs in Scala, Java, Python, and R, and anoptimized engine thatsupports general computation graphs for data analysis. Italso supports arich set of higher-level tools including Spark SQL forSQL and DataFrames,MLlib for machine learning, GraphX for graph processing,and Spark Streaming for streamprocessing.…….
3) 切换到Spark-Submit的终端,再次看到NetworkWordCount应用输出单词统计信息。
18/02/22 20:16:20 INFO scheduler.DAGScheduler: Job 680finished: print atNetworkWordCount.scala:57, took 3.566583 s-------------------------------------------Time: 1519301777000 ms-------------------------------------------(stream,1)(example,1)(analysis.,1)(its,1)(R,,1)(can,2)(Building,1)(build,2)(SQL,2)(with,1)... 18/02/22 20:16:20 INFOscheduler.JobScheduler: Finished job streaming job 1519301777000 ms.0 from jobset of time 1519301777000 ms
打开Hadoop的ResourceManager监控界面,查看应用提交结果,如图4-20所示。
图 4 - 20 Hadoop ResourceManager监控界面的应用程序信息
ResourceManager监控界面地址为:http://master:8088,其中master是启动ResourceManager进程的节点。
提交成功,点击进入应用后,出现界面,如图4-21所示。
图 4 - 21 Spark的job信息
继续查看executor信息,如图4-22所示。
图 4 - 22 Spark的executor信息
Hadoop界面中对应的Driver中没有Logs信息Stdout和Stderr,以YARN-Client模式运行时,Driver及Client运行在提交应用的节点上,可以直接在终端上观察应用运行的日志,查看Driver的日志输出信息。
对应在Yarn模式下执行的应用程序,可以用以下命令查询应用程序ID。
root@master:~# yarn application -list18/02/22 20:39:54 INFOclient.RMProxy: Connecting to ResourceManager at master/192.168.189.1:8032Total number ofapplications (application-types: [] and states: [SUBMITTED, ACCEPTED,RUNNING]):1 Application-Id Application-Name Application-Type User Queue State Final-State Progress Tracking-URLapplication_1519301113196_0001 NetworkWordCount SPARK root default RUNNING UNDEFINED 10% http://master:4040 然后Kill掉应用进程。 root@master:~# yarn application -kill application_1519301113196_0001
上述是以YARN-Client模式提交应用程序的方式,接下来讲解YARN-Cluster模式提交应用程序的方式。
1) 数据接收端:Spark 以YARN-Cluster模式提交应用程序。到$SPARK_HOME路径下,以YARN-Cluster模式再次提交命令。
root@master:~# spark-submit --master yarn-cluster --executor-memory 1g --jars /usr/local/streaming-examples-test/spark-examples_2.11-2.2.1.jar --class stream.NetworkWordCount/usr/local/streaming-examples-test/testProject.jar master 9999……18/02/22 20:55:23 INFO yarn.Client: Uploading resourcefile:/tmp/spark-3c4d9731-3a71-442a-b44a-bb43922e1520/__spark_libs__7069571623134602328.zip->hdfs://master:9000/user/root/.sparkStaging/application_1519301113196_0002/__spark_libs__7069571623134602328.zip18/02/22 20:56:04 INFO yarn.Client: Uploading resourcefile:/usr/local/streaming-examples-test/testProject.jar ->hdfs://master:9000/user/root/.sparkStaging/application_1519301113196_0002/testProject.jar18/02/22 20:56:04 INFO yarn.Client: Uploading resourcefile:/usr/local/streaming-examples-test/spark-examples_2.11-2.2.1.jar ->hdfs://master:9000/user/root/.sparkStaging/application_1519301113196_0002/spark-examples_2.11-2.2.1.jar18/02/22 20:56:05 INFO yarn.Client: Uploading resourcefile:/tmp/spark-3c4d9731-3a71-442a-b44a-bb43922e1520/__spark_conf__1424867155160650192.zip->hdfs://master:9000/user/root/.sparkStaging/application_1519301113196_0002/__spark_conf__.zip18…… 在Yarn-Cluster模式提交时,会将依赖的Jar包和主资源Jar包一起上传到hdfs上。在HDFS中查看上传后的路径下的文件:root@master:~# hdfs dfs -lshdfs://master:9000/user/root/.sparkStaging/application_1519301113196_0002/Found 4 items-rw-r--r-- 3 root supergroup 84947 2018-02-22 20:56hdfs://master:9000/user/root/.sparkStaging/application_1519301113196_0002/__spark_conf__.zip-rw-r--r-- 3 root supergroup 205854615 2018-02-22 20:56hdfs://master:9000/user/root/.sparkStaging/application_1519301113196_0002/__spark_libs__7069571623134602328.zip-rw-r--r-- 3 root supergroup 1991400 2018-02-22 20:56hdfs://master:9000/user/root/.sparkStaging/application_1519301113196_0002/spark-examples_2.11-2.2.1.jar-rw-r--r-- 3 root supergroup 7278 2018-02-22 20:56hdfs://master:9000/user/root/.sparkStaging/application_1519301113196_0002/testProject.jar…… 查看各个执行节点上的缓存文件,这里以Worker3节点为例,包含文件如下:root@worker3:/usr/local/hadoop-2.6.0/tmp/nm-local-dir/usercache/root/appcache/application_1519301113196_0002/blockmgr-f2c77095-5279-4775-bbda-bc4f5e37e666#ls00 02 04 06 08 0a 0c 0e 10 12 14 16 18 1a 1c 1e 20 22 24 26 28 2a 2c 2e 30 32 34 36 38 3a 3c 3e01 03 05 07 09 0b 0d 0f 11 13 15 17 19 1b 1d 1f 21 23 25 27 29 2b 2d 2f 31 33 35 37 39 3b 3d 3f root@worker3:/usr/local/hadoop-2.6.0/tmp/nm-local-dir/usercache/root/appcache/application_1519301113196_0002/container_1519301113196_0002_01_000002#ls__app__.jar default_container_executor_session.sh launch_container.sh spark-examples_2.11-2.2.1.jar tmpcontainer_tokens default_container_executor.sh __spark_conf__ __spark_libs__
执行节点已经成功将所依赖的Jar包下载到NodeManager的本地路径下,为应用提供依赖Jar包。其中,nm-local-dir是NodeManager执行应用时的Local目录,执行时应该从Hdfs上下载下来,并存放到该目录下。
2) 数据发送端:在Master节点另一个终端启动nc服务,将README.md文件内容拷贝进去。
root@master:~# nc -lk 9999# Apache Spark…….
3) 查看Spark运行日志。这里登陆SparkWeb UI查看日志。
进入RM节点的WebInterface界面(http://cluster04:8088/cluster),如图4-23所示。
图 4 - 23 Hadoop RM的应用信息
单击application_1519301113196_0002,查看Application的具体信息,如图4-24所示。
图 4 - 24 Hadoop RM的指定应用的信息
点击worker2:8042,查看Node节点具体信息,如图4-25所示。
图 4 - 25 Hadoop RM的指定应用的信息
点击List of Containers,查看容器信息,如图4-26所示。
图 4 - 26 Hadoop RM的指定应用的容器的信息
单击container_1519301113196_0002_01_000001,查看容器具体信息,如图4-27所示。
图 4 - 27 Hadoop RM的指定应用的容器的日志信息
单击Link to logs,选择特定日志信息,如图4-28所示。
图 4 - 28 Hadoop RM的指定应用的容器的日志信息
单击stdout : Total file length is160628 bytes.,查看stdout日志信息,如图4-29所示。stdout中已经成功输出SparkStreaming 2.2.1应用程序的处理信息。
图 4 - 29 Hadoop RM的指定应用的容器的stdout日志信息
2018年新春报喜!热烈祝贺王家林大咖大数据经典传奇著作《SPARK大数据商业实战三部曲》畅销书籍 清华大学出版社发行上市!
本书基于Spark 2.2.0最新版本(2017年7月11日发布),以Spark商业案例实战和Spark在生产环境下几乎所有类型的性能调优为核心,以Spark内核解密为基石,分为上篇、中篇、下篇,对企业生产环境下的Spark商业案例与性能调优抽丝剥茧地进行剖析。上篇基于Spark源码,从一个动手实战案例入手,循序渐进地全面解析了Spark 2.2新特性及Spark内核源码;中篇选取Spark开发中最具有代表的经典学习案例,深入浅出地介绍,在案例中综合应用Spark的大数据技术;下篇性能调优内容基本完全覆盖了Spark在生产环境下的所有调优技术。
本书适合所有Spark学习者和从业人员使用。对于有分布式计算框架应用经验的人员,本书也可以作为Spark高手修炼的参考书籍。同时,本书也特别适合作为高等院校的大数据教材使用。
当当网、京东、淘宝、亚马逊等网店已可购买!欢迎大家购买学习!当当网址: http://product.dangdang.com/25230552.html