系列文章目录
spark第一章:环境安装
spark第二章:sparkcore实例
spark第三章:工程化代码
spark第四章:SparkSQL基本操作
spark第五章:SparkSQL实例
spark第六章:SparkStreaming基本操作
文章目录
前言
我们现在来进行SparkStreaming的学习
一、添加pom
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.12</artifactId>
<version>3.2.3</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
<version>3.2.3</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.14.2</version>
</dependency>
这是我们后边实验需要的全部依赖
创建scala文件.
二、简单实例
1.WordCount
package com.atguigu.bigdata.spark.streaming
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object WordCount {
def main(args: Array[String]): Unit = {
//1.初始化 Spark 配置信息
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStream")
//2.初始化 SparkStreamingContext
val ssc = new StreamingContext(sparkConf,Seconds(3))
val lines: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)
val words: DStream[String] = lines.flatMap(_.split(" "))
val wordToOne: DStream[(String, Int)] = words.map((_, 1))
val wordToCount: DStream[(String, Int)] = wordToOne.reduceByKey(_ + _)
wordToCount.print()
ssc.start()
ssc.awaitTermination()
}
}
启用netcat向本机发送数据
启动服务器端,开始监听9999
发送数据
2.Queue
现在我们用for循环模拟数据发送,仅用于效果展示.
package com.atguigu.bigdata.spark.streaming
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.collection.mutable
object Queue {
def main(args: Array[String]): Unit = {
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStream")
val ssc = new StreamingContext(sparkConf,Seconds(3))
val rddQueue = new mutable.Queue[RDD[Int]]()
val inputStream: InputDStream[Int] = ssc.queueStream(rddQueue, oneAtATime = false)
val mappedStream: DStream[(Int, Int)] = inputStream.map((_, 1))
val reducedStream: DStream[(Int, Int)] = mappedStream.reduceByKey(_ + _)
reducedStream.print()
ssc.start()
for (i <- 1 to 5){
rddQueue+=ssc.sparkContext.makeRDD(1 to 300 ,10)
Thread.sleep(2000)
}
}
}
3.DIY
自定义数据源
package com.atguigu.bigdata.spark.streaming
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.receiver.Receiver
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.util.Random
object DIY {
def main(args: Array[String]): Unit = {
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStream")
val ssc = new StreamingContext(sparkConf,Seconds(3))
val messaheDS: ReceiverInputDStream[String] = ssc.receiverStream(new MyReceiver)
messaheDS.print()
ssc.start()
ssc.awaitTermination()
}
//自定义数据采集器
class MyReceiver extends Receiver[String](StorageLevel.MEMORY_ONLY){
//flg为停止信号
private var flg=true
override def onStart(): Unit = {
new Thread(new Runnable {
override def run(): Unit = {
while (flg){
//用随机函数模拟自定义采集
val message: String = "采集的数据为:"+new Random().nextInt(10).toString
store(message)
Thread.sleep(500)
}
}
}).start()
}
override def onStop(): Unit = {
flg=false
}
}
}
4.kafka
kafka是一种高级的数据输入源,具体就不介绍了,不知道的可以看我以前的专栏.Kafka
package com.atguigu.bigdata.spark.streaming
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{InputDStream, ReceiverInputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object kafka {
def main(args: Array[String]): Unit = {
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStream")
val ssc = new StreamingContext(sparkConf,Seconds(3))
val kafkaPara: Map[String, Object] = Map[String, Object](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop102:9092,hadoop103:9092,hadoop104:9092",
ConsumerConfig.GROUP_ID_CONFIG -> "atguigu",
"key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
"value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer"
)
val kafkaDataDS: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](Set("atguigu"), kafkaPara)
)
kafkaDataDS.map(_.value()).print()
ssc.start()
ssc.awaitTermination()
}
}
创建我的需要使用的topic
运行scala文件,接受数据.
先等待时间戳开始打印,然后去kafka生产数据.
5.state
之前的采集都是无状态转换,也就是说每个周期的数据没有关联,现在我希望不同周期之前数据进行累加,然后在进行逻辑操作.
package com.atguigu.bigdata.spark.streaming
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object state {
def main(args: Array[String]): Unit = {
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStream")
val ssc = new StreamingContext(sparkConf,Seconds(3))
ssc.checkpoint("cp")
val datas: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)
val wordToOne: DStream[(String, Int)] = datas.map((_, 1))
val state: DStream[(String, Int)] = wordToOne.updateStateByKey(
(seq: Seq[Int], buff: Option[Int]) => {
val newCount: Int = buff.getOrElse(0) + seq.sum
Option(newCount)
}
)
state.print()
ssc.start()
ssc.awaitTermination()
}
}
我们继续用netcat向本机发送数据
可以看到,它将不同周期内的数据进行了累加.
6.state_Join
现在我们要将两个不同来源的数据连接到一起
package com.atguigu.bigdata.spark.streaming
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object state_Join {
def main(args: Array[String]): Unit = {
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStream")
val ssc = new StreamingContext(sparkConf,Seconds(5))
val data9999: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)
val data8888: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 8888)
val map9999: DStream[(String, Int)] = data9999.map(((_, 9)))
val map8888: DStream[(String, Int)] = data8888.map(((_, 8)))
val joinDS: DStream[(String, (Int, Int))] = map9999.join(map8888)
joinDS.print()
ssc.start()
ssc.awaitTermination()
}
}
7.Close
现在我们要做一个关闭器,在某种特殊情况下处理完当前数据后,进行程序关闭.
package com.atguigu.bigdata.spark.streaming
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext, StreamingContextState}
object Close {
def main(args: Array[String]): Unit = {
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStream")
val ssc = new StreamingContext(sparkConf, Seconds(3))
val lines: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)
val wordToOne: DStream[(String, Int)] = lines.map((_, 1))
wordToOne.print()
ssc.start()
new Thread(
new Runnable {
override def run(): Unit = {
// while (true){
// if (true){
// val state: StreamingContextState = ssc.getState()
// if (state == StreamingContextState.ACTIVE){
// ssc.stop(true,true)
// }
// }
// Thread.sleep(5000)
// }
Thread.sleep(5000)
val state: StreamingContextState = ssc.getState()
if (state == StreamingContextState.ACTIVE){
ssc.stop(true,true)
}
System.exit(0)
}
}
).start()
ssc.awaitTermination()
}
}
这里不好演示,就简单说明一下,我们设置一个第三方信号,可以是数据库的信息,zk的集群等等,然后独立创建一个线程,对这个信号进行定时监控,监控到关闭信息,就在处理完当前周期的数据后,终止程序,这样就不会丢失数据了.
8.Resume
在上一步我们终止程序后,当程序再次启动,我们可能还需要之前处理过的数据,所以要进行数据恢复.
package com.atguigu.bigdata.spark.streaming
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext, StreamingContextState}
object Resume {
def main(args: Array[String]): Unit = {
val ssc: StreamingContext = StreamingContext.getActiveOrCreate("cp", () => {
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStream")
val ssc = new StreamingContext(sparkConf, Seconds(3))
val lines: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)
val wordToOne: DStream[(String, Int)] = lines.map((_, 1))
wordToOne.print()
ssc
})
ssc.checkpoint("cp")
ssc.start()
ssc.awaitTermination()
}
}
咱们上一步没有做,所以这里也不掩饰了.
总结
SparkStreaming的基本操作就暂时到这里.