基于案例贯通 Spark Streaming 流计算框架的运行源码

时间:2022-05-19 23:17:14

本期内容 :

  • Spark Streaming+Spark SQL案例展示
  • 基于案例贯穿Spark Streaming的运行源码

一、 案例代码阐述 :

  在线动态计算电商中不同类别中最热门的商品排名,例如:手机类别中最热门的三种手机、电视类别中最热门的三种电视等。

  1、案例运行代码 :

import org.apache.spark.SparkConf
import org.apache.spark.sql.Row
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.streaming.{Seconds, StreamingContext} object OnlineTheTop3ItemForEachCategory2DB {
def main(args: Array[String]){
   /**
    *
    * 第1步:创建Spark的配置对象SparkConf,设置Spark程序的运行时的配置信息,
    */
val conf = new SparkConf() //创建SparkConf对象
conf.setAppName("OnlineTheTop3ItemForEachCategory2DB") //设置应用程序的名称,在程序运行的监控界面可以看到名称
  //conf.setMaster("spark://Master:7077") //此时,程序在Spark集群
conf.setMaster("local[6]")
//设置batchDuration时间间隔来控制Job生成的频率并且创建Spark Streaming执行的入口
val ssc = new StreamingContext(conf, Seconds(5)) ssc.checkpoint("/root/Documents/SparkApps/checkpoint") val userClickLogsDStream = ssc.socketTextStream("Master", 9999) val formattedUserClickLogsDStream = userClickLogsDStream.map(clickLog =>
(clickLog.split(" ")(2) + "_" + clickLog.split(" ")(1), 1))   //val categoryUserClickLogsDStream = formattedUserClickLogsDStream.reduceByKeyAndWindow((v1:Int, v2: Int) => v1 + v2,
  //(v1:Int, v2: Int) => v1 - v2, Seconds(60), Seconds(20)) val categoryUserClickLogsDStream = formattedUserClickLogsDStream.reduceByKeyAndWindow(_+_,
_-_, Seconds(60), Seconds(20)) categoryUserClickLogsDStream.foreachRDD { rdd => {
if (rdd.isEmpty()) {
println("No data inputted!!!")
} else {
val categoryItemRow = rdd.map(reducedItem => {
val category = reducedItem._1.split("_")(0)
val item = reducedItem._1.split("_")(1)
val click_count = reducedItem._2
Row(category, item, click_count)
}) val structType = StructType(Array(
StructField("category", StringType, true),
StructField("item", StringType, true),
StructField("click_count", IntegerType, true)
)) val hiveContext = new HiveContext(rdd.context)
val categoryItemDF = hiveContext.createDataFrame(categoryItemRow, structType) categoryItemDF.registerTempTable("categoryItemTable") val reseltDataFram = hiveContext.sql("SELECT category,item,click_count FROM (SELECT category,item,click_count,row_number()" +
" OVER (PARTITION BY category ORDER BY click_count DESC) rank FROM categoryItemTable) subquery " +
" WHERE rank <= 3")
reseltDataFram.show() val resultRowRDD = reseltDataFram.rdd resultRowRDD.foreachPartition { partitionOfRecords => { if (partitionOfRecords.isEmpty){
println("This RDD is not null but partition is null")
} else {
// ConnectionPool is a static, lazily initialized pool of connections
val connection = ConnectionPool.getConnection()
partitionOfRecords.foreach(record => {
val sql = "insert into categorytop3(category,item,client_count) values('" + record.getAs("category") + "','" +
record.getAs("item") + "'," + record.getAs("click_count") + ")"
val stmt = connection.createStatement();
stmt.executeUpdate(sql); })
ConnectionPool.returnConnection(connection) // return to the pool for future reuse }
}
}
}
}
ssc.start()
ssc.awaitTermination() }
 }
}

  2、案例流程框架图 :

  基于案例贯通 Spark Streaming 流计算框架的运行源码

二、 基于案例的源码解析 :

  1、 构建Spark的配置对象SparkConf,设置Spark程序的运行时的配置信息:

  基于案例贯通 Spark Streaming 流计算框架的运行源码

  2、构建StreamingContext时传递SparkConf参数在内部创建SparkContext :

  基于案例贯通 Spark Streaming 流计算框架的运行源码

  基于案例贯通 Spark Streaming 流计算框架的运行源码

  3、创建了 StreamingContext : 同时说明Spark Streaming 是Spark Core上的一个应用程序

  基于案例贯通 Spark Streaming 流计算框架的运行源码

  4、 checkpoint 持久化

  5、构建SocketTextStream 获取输入源

  基于案例贯通 Spark Streaming 流计算框架的运行源码

    01、 创建Socket 获取输入流

    基于案例贯通 Spark Streaming 流计算框架的运行源码

    02、 SocketInputDstream继承ReceiverInputDStream,通过构建Receiver来接收数据

    基于案例贯通 Spark Streaming 流计算框架的运行源码

    基于案例贯通 Spark Streaming 流计算框架的运行源码

    基于案例贯通 Spark Streaming 流计算框架的运行源码

    03、 创建SocketReceiver

    基于案例贯通 Spark Streaming 流计算框架的运行源码

    04、 通过Receiver 在网络获取相关数据

    基于案例贯通 Spark Streaming 流计算框架的运行源码

    05、数据输出

    基于案例贯通 Spark Streaming 流计算框架的运行源码

    06、生成job作业

    基于案例贯通 Spark Streaming 流计算框架的运行源码

    07、 根据时间间隔产生RDD ,存储数据

    基于案例贯通 Spark Streaming 流计算框架的运行源码

    基于案例贯通 Spark Streaming 流计算框架的运行源码

 6、 Streaming Start :

    基于案例贯通 Spark Streaming 流计算框架的运行源码

 7、 流程总结 :

    01、 在StreamingContext调用start方法的内部其实是会启动JobScheduler的Start方法,进行消息循环。

    02、 在JobScheduler的start内部会构造JobGenerator和ReceiverTacker,并且调用JobGenerator和ReceiverTacker的start方法:

      • JobGenerator启动后会不断的根据batchDuration生成一个个的Job ;
      • ReceiverTracker启动后首先在Spark Cluster中启动Receiver (其实是在Executor中先启动ReceiverSupervisor);

    03、 在Receiver收到数据后会通过ReceiverSupervisor存储到Executor并且把数据的Metadata信息发送给Driver中的ReceiverTracker 。

    04、 在ReceiverTracker内部会通过ReceivedBlockTracker来管理接受到的元数据信息 。

    05、 每个BatchInterval会产生一个具体的Job,其实这里的Job不是Spark Core中所指的Job,它只是基于DStreamGraph而生成的RDD的DAG而已 。

    06、 要想运行Job需要提交给JobScheduler,在JobScheduler中通过线程池的方式找到一个单独的线程来提交Job到集群运行(在线程中基于RDD的Action触发真正的作业的运行)。

  

基于案例贯通 Spark Streaming 流计算框架的运行源码的更多相关文章

  1. Dream&lowbar;Spark-----Spark 定制版:005~贯通Spark Streaming流计算框架的运行源码

    Spark 定制版:005~贯通Spark Streaming流计算框架的运行源码   本讲内容: a. 在线动态计算分类最热门商品案例回顾与演示 b. 基于案例贯通Spark Streaming的运 ...

  2. 贯通Spark Streaming流计算框架的运行源码

    本章节内容: 一.在线动态计算分类最热门商品案例回顾 二.基于案例贯通Spark Streaming的运行源码 先看代码(源码场景:用户.用户的商品.商品的点击量排名,按商品.其点击量排名前三): p ...

  3. 5&period;Spark Streaming流计算框架的运行流程源码分析2

    1 spark streaming 程序代码实例 代码如下: object OnlineTheTop3ItemForEachCategory2DB { def main(args: Array[Str ...

  4. Spark Streaming实时计算框架介绍

    随着大数据的发展,人们对大数据的处理要求也越来越高,原有的批处理框架MapReduce适合离线计算,却无法满足实时性要求较高的业务,如实时推荐.用户行为分析等. Spark Streaming是建立在 ...

  5. 大数据开发实战:Spark Streaming流计算开发

    1.背景介绍 Storm以及离线数据平台的MapReduce和Hive构成了Hadoop生态对实时和离线数据处理的一套完整处理解决方案.除了此套解决方案之外,还有一种非常流行的而且完整的离线和 实时数 ...

  6. 66、Spark Streaming:数据处理原理剖析与源码分析(block与batch关系透彻解析)

    一.数据处理原理剖析 每隔我们设置的batch interval 的time,就去找ReceiverTracker,将其中的,从上次划分batch的时间,到目前为止的这个batch interval ...

  7. 通过案例对 spark streaming 透彻理解三板斧之二:spark streaming运行机制

    本期内容: 1. Spark Streaming架构 2. Spark Streaming运行机制 Spark大数据分析框架的核心部件: spark Core.spark  Streaming流计算. ...

  8. spark streaming 接收kafka消息之四 -- 运行在 worker 上的 receiver

    使用分布式receiver来获取数据使用 WAL 来实现 exactly-once 操作: conf.set("spark.streaming.receiver.writeAheadLog. ...

  9. 【转】Spark Streaming 实时计算在甜橙金融监控系统中的应用及优化

    系统架构介绍 整个实时监控系统的架构是先由 Flume 收集服务器产生的日志 Log 和前端埋点数据, 然后实时把这些信息发送到 Kafka 分布式发布订阅消息系统,接着由 Spark Streami ...

随机推荐

  1. DirectX runtime

    DirectX 9.0 runtime etc https://www.microsoft.com/en-us/download/details.aspx?id=7087 DirectX 11 run ...

  2. CSS之元素选择器

    1.后代元素选择器 div p 以空格分隔,表示div的所有后代p元素 2.子元素选择器 div > p 以大于号分隔,表示div的直接子元素 3.相邻兄弟选择器 div  + p 选择紧接在d ...

  3. 运放——压摆率SR的意义和如何选取

    http://blog.csdn.net/dxshappy/article/details/8065798

  4. DB2解除锁表

    背景 生产环境中,我几乎没有遇到过锁表.多是在开发过程中遇到的,比如团队开发中经常会遇到多个功能访问同一张表的情况.如果有开发人员在这张表加了排它锁,然后又忘记提交事务,那么其他开发人员就要一直等待了 ...

  5. bzoj 3823&colon; 定情信物 线性筛逆元

    3823: 定情信物 Time Limit: 10 Sec  Memory Limit: 128 MBSubmit: 108  Solved: 2[Submit][Status] Descriptio ...

  6. 【MongoDB】应用场景

    24 Use Cases24.1 适合场景 Archiving and event logging 归档和日志记录 Document and Content Management Systems ...

  7. cf479A Expression

    A. Expression time limit per test 1 second memory limit per test 256 megabytes input standard input ...

  8. 总结的OSM 地图相关的分析

    How OSM works:  Tile Format: png,  z: levels [0- 18], x: Latitude [0- ], y: Longitude [0- ];         ...

  9. pycharm安装TensorFlow

    一.首先说下我在安装TensorFlow的过程中遇到的坑: 1.python的版本是3.5的版本,因为TensorFlow好像只支持到3.5现在.然后python需要安装64位的安装包,如果安装的是3 ...

  10. JavaScript 基础,登录验证

    1.<script></script>的三种用法: a.放在<body>中 b.放在<head>中 c.放在外部JS文件中 <!DOCTYPE h ...