Spark Streaming编程示例

时间:2021-03-19 15:00:09

近期也有开始研究使用spark streaming来实现流式处理。本文以流式计算word count为例,简单描述如何进行spark streaming编程。

1. 依赖的jar包

参考《分别用Eclipse和IDEA搭建Scala+Spark开发环境》一文,pom.xml中指定依赖库spark-streaming_2.10.jar。

<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<!-- Spark -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>1.1.0</version>
</dependency>

<!-- HDFS -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.6.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.4</version>
<scope>test</scope>
</dependency>
</dependency>

2. WordCount代码示例

监听socket端口,每5秒统计一次收到的文本的单词数量,并输出到屏幕。

import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions /**
* Spark Streaming示例,统计输入中所有单词出现的次数
*
*/
object StreamingWordCount {
def main(args: Array[String]) {
if (args.length < 2) {
System.err.println("Usage: NetworkWordCount <hostname> <port>")
System.exit(1)
} // Create the context with a 5 second batch size
val sparkConf = new SparkConf().setAppName("NetworkWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(5)) // Create a socket stream on target ip:port and count the
// words in input stream of \n delimited text
val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}

3. 提交任务和监听集群

socketTextStream是从监听service的socket端口。

(1)Job如何提交:
$SPARK_HOME/bin/spark-submit --name StreamingDemo --class StreamingWordCount ./sparktest-1.0-SNAPSHOT.jar localhost 1234

(2)监听socket端口:

nc -lk 1234