spark第六章:SparkStreaming基本操作

时间:2022-04-19 01:20:01

系列文章目录

spark第一章:环境安装
spark第二章:sparkcore实例
spark第三章:工程化代码
spark第四章:SparkSQL基本操作
spark第五章:SparkSQL实例
spark第六章:SparkStreaming基本操作



前言

我们现在来进行SparkStreaming的学习


一、添加pom

     <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.12</artifactId>
            <version>3.2.3</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
            <version>3.2.3</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-core</artifactId>
            <version>2.14.2</version>
        </dependency>

这是我们后边实验需要的全部依赖
创建scala文件.
spark第六章:SparkStreaming基本操作

二、简单实例

1.WordCount

package com.atguigu.bigdata.spark.streaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object WordCount {
  def main(args: Array[String]): Unit = {
    //1.初始化 Spark 配置信息
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStream")
    //2.初始化 SparkStreamingContext
    val ssc = new StreamingContext(sparkConf,Seconds(3))

    val lines: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)

    val words: DStream[String] = lines.flatMap(_.split(" "))

    val wordToOne: DStream[(String, Int)] = words.map((_, 1))

    val wordToCount: DStream[(String, Int)] = wordToOne.reduceByKey(_ + _)

    wordToCount.print()
    
    ssc.start()
    ssc.awaitTermination()

  }

}

启用netcat向本机发送数据
spark第六章:SparkStreaming基本操作
启动服务器端,开始监听9999
spark第六章:SparkStreaming基本操作
发送数据
spark第六章:SparkStreaming基本操作

2.Queue

现在我们用for循环模拟数据发送,仅用于效果展示.

package com.atguigu.bigdata.spark.streaming

import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}

import scala.collection.mutable

object Queue {
  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStream")

    val ssc = new StreamingContext(sparkConf,Seconds(3))

    val rddQueue = new mutable.Queue[RDD[Int]]()

    val inputStream: InputDStream[Int] = ssc.queueStream(rddQueue, oneAtATime = false)

    val mappedStream: DStream[(Int, Int)] = inputStream.map((_, 1))
    val reducedStream: DStream[(Int, Int)] = mappedStream.reduceByKey(_ + _)

    reducedStream.print()

    ssc.start()

    for (i <- 1 to 5){
      rddQueue+=ssc.sparkContext.makeRDD(1 to 300 ,10)
      Thread.sleep(2000)
    }
  }
}

spark第六章:SparkStreaming基本操作

3.DIY

自定义数据源

package com.atguigu.bigdata.spark.streaming

import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.receiver.Receiver
import org.apache.spark.streaming.{Seconds, StreamingContext}

import scala.util.Random

object DIY {
  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStream")
    val ssc = new StreamingContext(sparkConf,Seconds(3))

    val messaheDS: ReceiverInputDStream[String] = ssc.receiverStream(new MyReceiver)
    messaheDS.print()
    ssc.start()
    ssc.awaitTermination()

  }

  //自定义数据采集器
  class MyReceiver extends Receiver[String](StorageLevel.MEMORY_ONLY){
  	//flg为停止信号
    private var flg=true
    override def onStart(): Unit = {
      new Thread(new Runnable {
        override def run(): Unit = {
          while (flg){
            //用随机函数模拟自定义采集
            val message: String = "采集的数据为:"+new Random().nextInt(10).toString
            store(message)

            Thread.sleep(500)
          }
        }
      }).start()
    }

    override def onStop(): Unit = {
      flg=false

    }
  }

}

spark第六章:SparkStreaming基本操作

4.kafka

kafka是一种高级的数据输入源,具体就不介绍了,不知道的可以看我以前的专栏.Kafka

package com.atguigu.bigdata.spark.streaming

import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{InputDStream, ReceiverInputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}


object kafka {
  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStream")
    val ssc = new StreamingContext(sparkConf,Seconds(3))

    val kafkaPara: Map[String, Object] = Map[String, Object](
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop102:9092,hadoop103:9092,hadoop104:9092",
      ConsumerConfig.GROUP_ID_CONFIG -> "atguigu",
      "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
      "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer"
    )

    val kafkaDataDS: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
      ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](Set("atguigu"), kafkaPara)
    )

    kafkaDataDS.map(_.value()).print()


    ssc.start()
    ssc.awaitTermination()

  }

}

创建我的需要使用的topic

spark第六章:SparkStreaming基本操作
运行scala文件,接受数据.
spark第六章:SparkStreaming基本操作
先等待时间戳开始打印,然后去kafka生产数据.
spark第六章:SparkStreaming基本操作

5.state

之前的采集都是无状态转换,也就是说每个周期的数据没有关联,现在我希望不同周期之前数据进行累加,然后在进行逻辑操作.

package com.atguigu.bigdata.spark.streaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object state {
  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStream")
    val ssc = new StreamingContext(sparkConf,Seconds(3))
    ssc.checkpoint("cp")

    val datas: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)

    val wordToOne: DStream[(String, Int)] = datas.map((_, 1))

    val state: DStream[(String, Int)] = wordToOne.updateStateByKey(
      (seq: Seq[Int], buff: Option[Int]) => {
        val newCount: Int = buff.getOrElse(0) + seq.sum
        Option(newCount)
      }
    )

    state.print()
    ssc.start()
    ssc.awaitTermination()
  }
}

我们继续用netcat向本机发送数据
spark第六章:SparkStreaming基本操作
可以看到,它将不同周期内的数据进行了累加.

6.state_Join

现在我们要将两个不同来源的数据连接到一起

package com.atguigu.bigdata.spark.streaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object state_Join {
  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStream")
    val ssc = new StreamingContext(sparkConf,Seconds(5))

    val data9999: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)
    val data8888: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 8888)

    val map9999: DStream[(String, Int)] = data9999.map(((_, 9)))
    val map8888: DStream[(String, Int)] = data8888.map(((_, 8)))

    val joinDS: DStream[(String, (Int, Int))] = map9999.join(map8888)

    joinDS.print()

    ssc.start()
    ssc.awaitTermination()
  }
}

spark第六章:SparkStreaming基本操作

7.Close

现在我们要做一个关闭器,在某种特殊情况下处理完当前数据后,进行程序关闭.

package com.atguigu.bigdata.spark.streaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext, StreamingContextState}

object Close {
  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStream")
    val ssc = new StreamingContext(sparkConf, Seconds(3))

    val lines: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)
    val wordToOne: DStream[(String, Int)] = lines.map((_, 1))

    wordToOne.print()

    ssc.start()

    new Thread(
      new Runnable {
        override def run(): Unit = {
//          while (true){
//            if (true){
//              val state: StreamingContextState = ssc.getState()
//              if (state == StreamingContextState.ACTIVE){
//                ssc.stop(true,true)
//              }
//            }
//            Thread.sleep(5000)
//          }
          Thread.sleep(5000)
          val state: StreamingContextState = ssc.getState()
          if (state == StreamingContextState.ACTIVE){
            ssc.stop(true,true)
          }
          System.exit(0)
        }
      }
    ).start()

    ssc.awaitTermination()

  }
}

这里不好演示,就简单说明一下,我们设置一个第三方信号,可以是数据库的信息,zk的集群等等,然后独立创建一个线程,对这个信号进行定时监控,监控到关闭信息,就在处理完当前周期的数据后,终止程序,这样就不会丢失数据了.

8.Resume

在上一步我们终止程序后,当程序再次启动,我们可能还需要之前处理过的数据,所以要进行数据恢复.

package com.atguigu.bigdata.spark.streaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext, StreamingContextState}

object Resume {
  def main(args: Array[String]): Unit = {
    val ssc: StreamingContext = StreamingContext.getActiveOrCreate("cp", () => {
      val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStream")
      val ssc = new StreamingContext(sparkConf, Seconds(3))

      val lines: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)
      val wordToOne: DStream[(String, Int)] = lines.map((_, 1))
      wordToOne.print()

      ssc
    })
    ssc.checkpoint("cp")
    ssc.start()
    ssc.awaitTermination()

  }
}

咱们上一步没有做,所以这里也不掩饰了.


总结

SparkStreaming的基本操作就暂时到这里.