Spark学习(4) Spark Streaming

时间:2023-12-11 16:02:20

什么是Spark Streaming

Spark Streaming类似于Apache Storm,用于流式数据的处理
Spark Streaming有高吞吐量和容错能力强等特点。Spark Streaming支持的数据输入源很多,例如:Kafka、Flume、Twitter、ZeroMQ和简单的TCP套接字等等
数据输入后可以用Spark的高度抽象原语如:map、reduce、join、window等进行运算。而结果也能保存在很多地方,如HDFS,数据库等
Spark Streaming也能和MLlib(机器学习)以及Graphx完美融合
和Spark基于RDD的概念很相似,Spark Streaming使用离散化流(discretized stream)作为抽象表示,叫作DStream。DStream 是随时间推移而收到的数据的序列
DStream 可以从各种输入源创建,比如 Flume、Kafka 或者 HDFS。创建出来的DStream 支持两种操作,一种是转化操作(transformation),会生成一个新的DStream,
另一种是输出操作(output operation),可以把数据写入外部系统中。DStream 提供了许多与 RDD 所支持的操作相类似的操作支持,还增加了与时间相关的新操作,比如滑动窗口

目前流行的三种实时框架对比

Apache

Flink

SparkSteaming

Storm

架构

架构介于spark和storm之间,主从结构与sparkStreaming相似,DataFlow Grpah与storm相似,数据流可以被表示为一个有向图,每个顶点是一个定义的运算,每向边表示数据的流动

Native

架构依赖Spark,主从模式,每个batch批次处理都依赖driver主,可以理解为时间维度上的spark DAG

Micro-Batch

主从模式,且依赖ZK,处理过程中对主的依赖不大

Native

容错

基于Ghandy-Lamport distributed snapshots checkpoint机制

Medium

WAL及RDD血统机制

High(高)

Records Ack

Medium(一般)

处理模型与延时

单条时间处理

亚秒级低延时

一个事件窗口内的所有事件

秒级低延时

每次传入的一个事件

亚秒级低延时

吞吐量

High

High

Low(低)

数据处理保证

Exactly once

High

Exactly once(实现架用Chandy-Lamport算法,即marker-checkpoint)

High

At least once(实现架用record-level acknowledgments),Trident可以支持storm提供exactly once语义

Medium

高级API

Flink,栈中提供了很多高级API和满足不同场景的类库:机器学习、图分析、关系式数据处理

High

能够很容易的对接Spark生态圈里面的组件,同时额能够对接主流的消息传输组件及存储系统

High

应用需要按照特定的storm定义的规模编写

Low

易用性

支持SQL Streaming,Batch和Streaming采用统一编程框架

High

支持SQL Streaming,Batch和Streaming采用统一编程框架

High

不支持SQL Streaming

Medium

成熟性

新兴项目,处于发展阶段

Low

已经发展一段时间

Medium

相对较早的流系统,比较稳定

High

部署性

部署相对简单,只依赖JRE环境

Low

部署相对简单,只依赖JRE环境

Low

依赖JRE环境和ZK

High

     

Spark Streaming架构

Spark Streaming的编程抽象是离散化流,也就是DStream。它是一个 RDD 序列,每个RDD代表数据流中一个时间片内的数据
StreamingContext 会周期性地运行 Spark 作业来处理这些数据,把数据与之前时间区间中的 RDD 进行整合

什么是Dstream

就是将流式计算分解成为一系列确定并且较小的批处理作业
可以将失败或者执行较慢的任务在其他节点上并行执行
有较强的的容错能力,基于lineage
Dstream内含high-level operations进行处理
Dstream内部实现为一个RDD序列 基本数据源:socket、file,akka actoer。Steaming中自带了该数据源的读取API
高级数据源:kafka,flume,kinesis,Twitter等其他的数据。必须单独导入集成的JAR包 Receiver方式:接收器模式是使用Kafka高级Consumer API实现的。与所有接收器一样,从Kafka通过Receiver接收的数据存储在Spark Executor的内存中,然后由Spark Streaming启动的job来处理数据。
Direct:直连模式,在spark1.3之后,引入了Direct方式。不同于Receiver的方式,Direct方式没有receiver这一层,
其会周期性的获取Kafka中每个topic的每个partition中的最新offsets,并且相应的定义要在每个batch中处理偏移范围,
当启动处理数据的作业时,kafka的简单的消费者api用于从kafka读取定义的偏移范围

简单Spark Streaming实现

object WorldCount {
def main(args: Array[String]) {
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds())
val lines = ssc.socketTextStream("master01", )
val words = lines.flatMap(_.split(" "))
val pairs = words.map(word => (word, ))
val wordCounts = pairs.reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}

Kafka对接Stream实现

object KafkaDirectorDemo {
def main(args: Array[String]): Unit = {
//构建conf ssc 对象 初始化Streamingcontext
val conf = new SparkConf().setAppName("Kafka_director").setMaster("local")
val ssc = new StreamingContext(conf,Seconds())
//设置数据检查点进行累计计算 没有的话抛无方法异常
ssc.checkpoint("hdfs://192.168.25.101:9000/checkpoint") //设置kfaka相关信息
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "CentOS1:9092,CentOS2:9092,CentOS3:9092",//用于初始化链接到集群的地址
"key.deserializer" -> classOf[StringDeserializer],//key序列化
"value.deserializer" -> classOf[StringDeserializer],//value序列化
"group.id" -> "group1",//用于标识这个消费者属于哪个消费团体
"auto.offset.reset" -> "latest",//偏移量 latest自动重置偏移量为最新的偏移量
"enable.auto.commit" -> (false: java.lang.Boolean)//如果是true,则这个消费者的偏移量会在后台自动提交
)
//kafka 设置kafka读取topic
val topics = Array("first", "second")
// 获得DStream
val dStreaming = KafkaUtils.createDirectStream(ssc,LocationStrategies.PreferConsistent,Subscribe[String, String](topics, kafkaParams))
val rdd = dStreaming.map(record => (record.key, record.value)) rdd.print()
rdd.count().print()
rdd.countByValue().print()
dStreaming.foreachRDD(rdd=>rdd.foreach(println(_))) ssc.start()
ssc.awaitTermination()
}
}