一、 场景
◆ Spark[4]:
Scope: a MapReduce-like cluster computing framework designed for low-latency iterativejobs and interactive use from an interpreter(在大规模的特定数据集上的迭代运算或重复查询检索)
正如其目标scope,Spark适用于需要多次操作特定数据集的应用场合。需要反复操作的次数越多,所需读取的数据量越大,受益越大,数据量小但是计算密集度较大的场合,受益就相对较小。
◆ Spark Streaming(构建在Spark上处理Stream数据的框架)
能够满足除对实时性要求非常高(如高频实时交易)之外的所有流式准实时计算场景[2]。
二、 Spark生态系统包括[1]
◆ Shark ( Hive on Spark): Shark基本上就是在Spark的框架基础上提供和Hive一样的H iveQL命令接口,为了最大程度的保持和Hive的兼容性,Shark使用了Hive的API来实现query Parsing和 Logic Plan generation,最后的PhysicalPlan execution阶段用Spark代替Hadoop MapReduce。通过配置Shark参数,Shark可以自动在内存中缓存特定的RDD,实现数据重用,进而加快特定数据集的检索。同时,Shark 通过UDF用户自定义函数实现特定的数据分析学习算法,使得SQL数据查询和运算分析能结合在一起,最大化RDD的重复使用。
◆ Spark streaming: 构建在Spark上处理Stream数据的框架,基本的原理是将Stream数据分成小的时间片断(几秒),以类似batch批量处理的方式来处理这小部 分数据。Spark Streaming构建在Spark上,一方面是因为Spark的低延迟执行引擎(100ms+)可以用于实时计算,另一方面相比基于Record的其它 处理框架(如Storm),RDD数据集更容易做高效的容错处理。此外小批量处理的方式使得它可以同时兼容批量和实时数据处理的逻辑和算法。方便了一些需 要历史数据和实时数据联合分析的特定应用场合。
详见十、Spark Streaming ...
◆ Bagel: Pregel on Spark,可以用Spark进行图计算,这是个非常有用的小项目。Bagel自带了一个例子,实现了Google的PageRank算法。
三、 运行模式[1]
◆ 本地模式
◆ Standalone模式
◆ Mesoes模式
◆ yarn模式
四、 Spark与Hadoop的对比[1]
◆
Spark的中间数据放到内存中,对于迭代运算效率更高。
Spark更适合于迭代运算比较多的ML和DM运算。因为在Spark里面,有RDD的抽象概念。
◆
Spark比Hadoop更通用。
Spark提供的数据集操作类型有很多种,不像Hadoop只提供了Map和Reduce两种操作。比如map, filter, flatMap, sample, groupByKey, reduceByKey, union, join,
cogroup, mapValues, sort,partionBy等多种操作类型,Spark把这些操作称为Transformations。同时还提供Count, collect, reduce,
lookup, save等多种actions操作。
这些多种多样的数据集操作类型,给给开发上层应用的用户提供了方便。各个处理节点之间的通信模型不再像Hadoop那样就是唯一的Data Shuffle一种模式。用户可以命名,物化,控制中间结果的存储、分区等。可以说编程模型比Hadoop更灵活。
不过由于RDD的特性,Spark不适用那种异步细粒度更新状态的应用,例如web服务的存储或者是增量的web爬虫和索引。就是对于那种增量修改的应用模型不适合。
◆
容错性。
在分布式数据集计算时通过checkpoint来实现容错,而checkpoint有两种方式,一个是checkpoint data,一个是logging the updates。用户可以控制采用哪种方式来实现容错。
◆
可用性。
Spark通过提供丰富的Scala, Java,Python API及交互式Shell来提高可用性。
Spark与Hadoop的结合
◆
Spark可以直接对HDFS进行数据的读写,同样支持Spark
on YARN。Spark可以与MapReduce运行于同集群中,共享存储资源与计算,数据仓库Shark实现上借用Hive,几乎与Hive完全兼容。
五、 在业界的使用[1]
◆ Spark项目在2009年启动,2010年开源, 现在使用的有:Berkeley, Princeton, Klout,Foursquare, Conviva,
Quantifind, Yahoo! Research & others,淘宝等,豆瓣也在使用Spark的python克隆版Dpark。
六、 Spark核心概念[1]
Resilient
Distributed Dataset (RDD)弹性分布数据集
◆
RDD是Spark的最基本抽象,是对分布式内存的抽象使用,实现了以操作本地集合的方式来操作分布式数据集的抽象实现。RDD是Spark最核心的东西,它表示已被分区,不可变的并能够被并行操作的数据集合,不同的数据集格式对应不同的RDD实现。RDD必须是可序列化的。RDD可以cache到内存 中,每次对RDD数据集的操作之后的结果,都可以存放到内存中,下一个操作可以直接从内存中输入,省去了MapReduce大量的磁盘IO操作。这对于迭代运算比较常见的机器学习算法, 交互式数据挖掘来说,效率提升比较大。
◆
RDD的特点:
它是在集群节点上的不可变的、已分区的集合对象。
通过并行转换的方式来创建如(map, filter, join, etc)。
失败自动重建。
可以控制存储级别(内存、磁盘等)来进行重用。
必须是可序列化的。
是静态类型的。
◆
RDD的好处
RDD只能从持久存储或通过Transformations操作产生,相比于分布式共享内存(DSM)可以更高效实现容错,对于丢失部分数据分区只需根据它的lineage就可重新计算出来,而不需要做特定的Checkpoint。
RDD的不变性,可以实现类Hadoop MapReduce的推测式执行。
RDD的数据分区特性,可以通过数据的本地性来提高性能,这与Hadoop MapReduce是一样的。
RDD都是可序列化的,在内存不足时可自动降级为磁盘存储,把RDD存储于磁盘上,这时性能会有大的下降但不会差于现在的MapReduce。
◆
RDD的存储与分区
用户可以选择不同的存储级别存储RDD以便重用。
当前RDD默认是存储于内存,但当内存不足时,RDD会spill到disk。
RDD在需要进行分区把数据分布于集群中时会根据每条记录Key进行分区(如Hash 分区),以此保证两个数据集在Join时能高效。
◆
RDD的内部表示
在RDD的内部实现中每个RDD都可以使用5个方面的特性来表示:
分区列表(数据块列表)
计算每个分片的函数(根据父RDD计算出此RDD)
对父RDD的依赖列表
对key-value RDD的Partitioner【可选】
每个数据分片的预定义地址列表(如HDFS上的数据块的地址)【可选】
◆
RDD的存储级别
RDD根据useDisk、useMemory、deserialized、replication四个参数的组合提供了11种存储级别:
val
NONE = new StorageLevel(false, false, false) val DISK_ONLY = new
StorageLevel(true, false, false) val DISK_ONLY_2 = new StorageLevel(true,
false, false, 2) val MEMORY_ONLY = new StorageLevel(false, true, true) val
MEMORY_ONLY_2 = new StorageLevel(false, true, true, 2) val MEMORY_ONLY_SER =
new StorageLevel(false, true, false) val MEMORY_ONLY_SER_2
= new StorageLevel(false, true, false, 2) val MEMORY_AND_DISK = new
StorageLevel(true, true, true) val MEMORY_AND_DISK_2 = new StorageLevel(true,
true, true, 2) val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false)
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, 2)
◆
RDD定义了各种操作,不同类型的数据由不同的RDD类抽象表示,不同的操作也由RDD进行抽实现。
RDD的生成
◆
RDD有两种创建方式:
1、从Hadoop文件系统(或与Hadoop兼容的其它存储系统)输入(例如HDFS)创建。
2、从父RDD转换得到新RDD。
◆
下面来看一从Hadoop文件系统生成RDD的方式,如:val file = spark.textFile("hdfs://..."),file变量就是RDD(实际是HadoopRDD实例),生成的它的核心代码如下:
//
SparkContext根据文件/目录及可选的分片数创建RDD,
这里我们可以看到Spark与Hadoop MapReduce很像 // 需要InputFormat, Key、Value的类型,其实Spark使用的Hadoop的InputFormat, Writable类型。 def textFile(path:
String, minSplits: Int = defaultMinSplits): RDD[String] = { hadoopFile(path,
classOf[TextInputFormat], classOf[LongWritable], classOf[Text], minSplits)
.map(pair => pair._2.toString) } // 根据Hadoop配置,及InputFormat等创建HadoopRDD new HadoopRDD(this,
conf, inputFormatClass, keyClass, valueClass, minSplits)
◆
对RDD进行计算时,RDD从HDFS读取数据时与Hadoop MapReduce几乎一样的:
RDD的转换与操作
◆
对于RDD可以有两种计算方式:转换(返回值还是一个RDD)与操作(返回值不是一个RDD)。
◆
转换(Transformations) (如:map,
filter, groupBy, join等),Transformations操作是Lazy的,也就是说从一个RDD转换生成另一个RDD的操作不是马上执行,Spark在遇到 Transformations操作时只会记录需要这样的操作,并不会去执行,需要等到有Actions操作的时候才会真正启动计算过程进行计算。
◆
操作(Actions) (如:count, collect,
save等),Actions操作会返回结果或把RDD数据写到存储系统中。Actions是触发Spark启动计算的动因。
◆
下面使用一个例子来示例说明Transformations与Actions在Spark的使用。
val
sc = new SparkContext(master, "Example",
System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_TEST_JAR"))) val rdd_A
= sc.textFile(hdfs://.....) val rdd_B = rdd_A.flatMap((line =>
line.split("\\s+"))).map(word => (word, 1)) val rdd_C =
sc.textFile(hdfs://.....) val rdd_D = rdd_C.map(line => (line.substring(10),
1)) val rdd_E = rdd_D.reduceByKey((a, b) => a + b) val rdd_F = rdd_B.jion(rdd_E)
rdd_F.saveAsSequenceFile(hdfs://....)
Lineage(血统)
◆
利用内存加快数据加载,在众多的其它的In-Memory类数据库或Cache类系统中也有实现,Spark的主要区别在于它处理分布式运算环境下的数据容错性(节点实效/数据丢失)问题时采用的方案。 为了保证RDD中数据的鲁棒性,RDD数据集通过所谓的血统关系(Lineage)记住了它是如何从其它RDD中演变过来的。相比其它系统的细颗粒度的内
存数据更新级别的备份或者LOG机制,RDD的Lineage记录的是粗颗粒度的特定数据转换(Transformation)操作(filter, map, join etc.)行为。当这个RDD的部分分区数据丢失时,它可以通过Lineage获取足够的信息来重新运算和恢复丢失的数据分区。这种粗颗粒的数据模型,限 制了Spark的运用场合,但同时相比细颗粒度的数据模型,也带来了性能的提升。
◆
RDD在Lineage依赖方面分为两种Narrow
Dependencies与Wide Dependencies用来解决数据容错的高效性。Narrow Dependencies是指父RDD的每一个分区最多被一个子RDD的分区所用, 表现为一个父RDD的分区对应于一个子RDD的分区或多个父RDD的分区对应于一个子RDD的分区,也就是说一个父RDD的一个分区不可能对应一个子 RDD的多个分区。Wide Dependencies是指子RDD的分区依赖于父RDD的多个分区或所有分区,也就是说存在一个父RDD的一个分区对应一个子RDD的多个分区。对与 Wide Dependencies,这种计算的输入和输出在不同的节点上,lineage方法对与输入节点完好,而输出节点宕机时,通过重新计算,这种情况下,这
种方法容错是有效的,否则无效,因为无法重试,需要向上其祖先追溯看是否可以重试(这就是lineage,血统的意思),Narrow
Dependencies对于数据的重算开销要远小于Wide Dependencies的数据重算开销。
七、 容错[1]
◆
在RDD计算,通过checkpint进行容错,做checkpoint有两种方式,一个是checkpoint data,一个是logging the updates。用户可以控制采用哪种方式来实现容错,默认是logging
the updates方式,通过记录跟踪所有生成RDD的转换(transformations)也就是记录每个RDD的lineage(血统)来重新计算生成
丢失的分区数据。
八、 资源管理与作业调度[1]
◆
Spark对于资源管理与作业调度可以使用Standalone(独立模式),Apache Mesos及Hadoop
YARN来实现。 Spark on Yarn在Spark0.6时引用,但真正可用是在现在的branch-0.8版本。Spark on Yarn遵循YARN的官方规范实现,得益于Spark天生支持多种Scheduler和Executor的良好设计,对YARN的支持也就非常容 易,Spark on Yarn的大致框架图。
◆ 让Spark运行于YARN上与Hadoop共用集群资源可以提高资源利用率。
九、 编程接口[1]
◆
Spark通过与编程语言集成的方式暴露RDD的操作,类似于DryadLINQ和FlumeJava,每个数据集都表示为RDD对象,对数据集的操作就 表示成对RDD对象的操作。Spark主要的编程语言是Scala,选择Scala是因为它的简洁性(Scala可以很方便在交互式下使用)和性能 (JVM上的静态强类型语言)。
◆
Spark和Hadoop MapReduce类似,由Master(类似于MapReduce的Jobtracker)和Workers(Spark的Slave工作节点)组成。 用户编写的Spark程序被称为Driver程序,Dirver程序会连接master并定义了对各RDD的转换与操作,而对RDD的转换与操作通过 Scala闭包(字面量函数)来表示,Scala使用Java对象来表示闭包且都是可序列化的,以此把对RDD的闭包操作发送到各Workers节点。 Workers存储着数据分块和享有集群内存,是运行在工作节点上的守护进程,当它收到对RDD的操作时,根据数据分片信息进行本地化数据操作,生成新的
数据分片、返回结果或把RDD写入存储系统。
Scala
◆
Spark使用Scala开发,默认使用Scala作为编程语言。编写Spark程序比编写Hadoop MapReduce程序要简单的多,SparK提供了Spark-Shell,可以在Spark-Shell测试程序。写SparK程序的一般步骤就是创 建或使用(SparkContext)实例,使用SparkContext创建RDD,然后就是对RDD进行操作。如:
val sc = new SparkContext(master,
appName, [sparkHome], [jars]) val textFile =
sc.textFile("hdfs://.....") textFile.map(....).filter(.....).....
Java
◆
Spark支持Java编程,但对于使用Java就没有了Spark-Shell这样方便的工具,其它与Scala编程是一样的,因为都是JVM上的语言,Scala与Java可以互操作,Java编程接口其实就是对Scala的封装。如:
JavaSparkContext sc = new
JavaSparkContext(...); JavaRDD lines = ctx.textFile("hdfs://...");
JavaRDD words = lines.flatMap( new FlatMapFunction() { public Iterable
call(String s) { return Arrays.asList(s.split(" ")); } } );
Python
◆ 现在Spark也提供了Python编程接口,Spark使用py4j来实现python与java的互操作,从而实现使用python编写Spark程 序。Spark也同样提供了pyspark,一个Spark的python shell,可以以交互式的方式使用Python编写Spark程序。 如:
from pyspark import SparkContext sc =
SparkContext("local", "Job Name", pyFiles=['MyFile.py',
'lib.zip', 'app.egg']) words = sc.textFile("/usr/share/dict/words")
words.filter(lambda w: w.startswith("spar")).take(5)
十、 使用示例[1]
Standalone模式
◆
为方便Spark的推广使用,Spark提供了Standalone模式,Spark一开始就设计运行于Apache Mesos资源管理框架上,这是非常好的设计,但是却带了部署测试的复杂性。为了让Spark能更方便的部署和尝试,Spark因此提供了 Standalone运行模式,它由一个Spark Master和多个Spark worker组成,与Hadoop MapReduce1很相似,就连集群启动方式都几乎是一样。
◆
以Standalone模式运行Spark集群
下载Scala2.9.3,并配置SCALA_HOME
下载Spark代码(可以使用源码编译也可以下载编译好的版本)这里下载 编译好的版本(http://spark-project.org/download/spark-0.7.3-prebuilt-cdh4.tgz)
解压spark-0.7.3-prebuilt-cdh4.tgz安装包
修改配置(conf/*) slaves: 配置工作节点的主机名 spark-env.sh:配置环境变量。
SCALA_HOME=/home/spark/scala-2.9.3
JAVA_HOME=/home/spark/jdk1.6.0_45 SPARK_MASTER_IP=spark1
SPARK_MASTER_PORT=30111 SPARK_MASTER_WEBUI_PORT=30118 SPARK_WORKER_CORES=2
SPARK_WORKER_MEMORY=4g SPARK_WORKER_PORT=30333 SPARK_WORKER_WEBUI_PORT=30119
SPARK_WORKER_INSTANCES=1
◆
把Hadoop配置copy到conf目录下
◆
在master主机上对其它机器做ssh无密码登录
◆
把配置好的Spark程序使用scp copy到其它机器
◆
在master启动集群
$SPARK_HOME/start-all.sh
yarn模式
◆
Spark-shell现在还不支持Yarn模式,使用Yarn模式运行,需要把Spark程序全部打包成一个jar包提交到Yarn上运行。目录只有branch-0.8版本才真正支持Yarn。
◆
以Yarn模式运行Spark
下载Spark代码.
git
clone git://github.com/mesos/spark
◆
切换到branch-0.8
cd
spark git checkout -b yarn --track origin/yarn
◆
使用sbt编译Spark并
$SPARK_HOME/sbt/sbt
> package > assembly
◆
把Hadoop yarn配置copy到conf目录下
◆
运行测试
SPARK_JAR=./core/target/scala-2.9.3/spark-core-assembly-0.8.0-
SNAPSHOT.jar \ ./run spark.deploy.yarn.Client --jar
examples/target/scala-2.9.3/ \ --class spark.examples.SparkPi --args
yarn-standalone
使用Spark-shell
◆
Spark-shell使用很简单,当Spark以Standalon模式运行后,使用$SPARK_HOME/spark-shell进入shell即 可,在Spark-shell中SparkContext已经创建好了,实例名为sc可以直接使用,还有一个需要注意的是,在Standalone模式 下,Spark默认使用的调度器的FIFO调度器而不是公平调度,而Spark-shell作为一个Spark程序一直运行在Spark上,其它的 Spark程序就只能排队等待,也就是说同一时间只能有一个Spark-shell在运行。
◆
在Spark-shell上写程序非常简单,就像在Scala
Shell上写程序一样。
scala>
val textFile = sc.textFile("hdfs://hadoop1:2323/user/data") textFile:
spark.RDD[String] = spark.MappedRDD@2ee9b6e3 scala> textFile.count() //
Number of items in this RDD res0: Long = 21374 scala> textFile.first() //
First item in this RDD res1: String = # Spark
编写Driver程序
◆
在Spark中Spark程序称为Driver程序,编写Driver程序很简单几乎与在Spark-shell上写程序是一样的,不同的地方就是SparkContext需要自己创建。如WorkCount程序如下:
import spark.SparkContext import SparkContext._ object
WordCount { def main(args: Array[String]) { if (args.length ==0 ){
println("usage is org.test.WordCount ") } println("the args:
") args.foreach(println) val hdfsPath = "hdfs://hadoop1:8020" //
create the SparkContext,
args(0)由yarn传入appMaster地址 val sc = new SparkContext(args(0), "WrodCount",
System.getenv("SPARK_HOME"),
Seq(System.getenv("SPARK_TEST_JAR"))) val textFile =
sc.textFile(hdfsPath + args(1)) val result = textFile.flatMap(line => line.split("\\s+"))
.map(word => (word, 1)).reduceByKey(_ + _) result.saveAsTextFile(hdfsPath +
args(2)) } }
十一、 Spark Streaming:大规模流式数据处理的新贵[2]
1. 前言
提到Spark
Streaming,我们不得不说一下BDAS(Berkeley
Data Analytics Stack),这个伯克利大学提出的关于数据分析的软件栈。从它的视角来看,目前的大数据处理可以分为如以下三个类型。
- 复杂的批量数据处理(batch data processing),通常的时间跨度在数十分钟到数小时之间。
- 基于历史数据的交互式查询(interactive query),通常的时间跨度在数十秒到数分钟之间。
- 基于实时数据流的数据处理(streaming data processing),通常的时间跨度在数百毫秒到数秒之间。
目前已有很多相对成熟的开源软件来处理以上三种情景,我们可以利用MapReduce来进行批量数据处理,可以用Impala来进行交互式查询,对于流式数据处理,我们可以采用Storm。对于大多数互联网公司来说,一般都会同时遇到以上三种情景,那么在使用的过程中这些公司可能会遇到如下的不便。
- 三种情景的输入输出数据无法无缝共享,需要进行格式相互转换。
- 每一个开源软件都需要一个开发和维护团队,提高了成本。
- 在同一个集群中对各个系统协调资源分配比较困难。
BDAS就是以Spark为基础的一套软件栈,利用基于内存的通用计算模型将以上三种情景一网打尽,同时支持Batch、 Interactive、Streaming的处理,且兼容支持HDFS和S3等分布式文件系统,可以部署在YARN和Mesos等流行的集群资源管理器 之上。BDAS的构架如图1所示,其中Spark可以替代MapReduce进行批处理,利用其基于内存的特点,特别擅长迭代式和交互式数据处理;Shark处理大规模数据的SQL查询,兼容Hive的HQL。本文要重点介绍的Spark Streaming,在整个BDAS中进行大规模流式处理。
图1 BDAS软件栈
2.
Spark Streaming构架
1) 计算流程
Spark Streaming是将流式计算分解成一系列短小的批处理作业。这里的批处理引擎是Spark,也就是把Spark Streaming的输入数据按照batch size(如1秒)分成一段一段的数据(Discretized Stream),每一段数据都转换成Spark中的RDD(Resilient Distributed Dataset),然后将Spark Streaming中对DStream的Transformation操作变为针对Spark中对RDD的Transformation操作,将RDD经 过操作变成中间结果保存在内存中。整个流式计算根据业务的需求可以对中间的结果进行叠加,或者存储到外部设备。图2显示了Spark Streaming的整个流程。
图2 Spark
Streaming构架图
2) 容错性
对于流式计算来说,容错性至关重要。首先我们要明确一下Spark中RDD的容错机制。每一个RDD都是一个不可 变的分布式可重算的数据集,其记录着确定性的操作继承关系(lineage),所以只要输入数据是可容错的,那么任意一个RDD的分区 (Partition)出错或不可用,都是可以利用原始输入数据通过转换操作而重新算出的。
图3 Spark
Streaming中RDD的lineage关系图
对于Spark Streaming来说,其RDD的传承关系如图3所示,图中的每一个椭圆形表示一个RDD,椭圆形中的每个圆形代表一个RDD中的一个 Partition,图中的每一列的多个RDD表示一个DStream(图中有三个DStream),而每一行最后一个RDD则表示每一个Batch Size所产生的中间结果RDD。我们可以看到图中的每一个RDD都是通过lineage相连接的,由于Spark Streaming输入数据可以来自于磁盘,例如HDFS(多份拷贝)或是来自于网络的数据流(Spark Streaming会将网络输入数据的每一个数据流拷贝两份到其他的机器)都能保证容错性。所以RDD中任意的Partition出错,都可以并行地在其 他机器上将缺失的Partition计算出来。这个容错恢复方式比连续计算模型(如Storm)的效率更高。
3) 实时性
对于实时性的讨论,会牵涉到流式处理框架的应用场景。Spark
Streaming将流式计算分解成多个Spark Job,对于每一段数据的处理都会经过Spark DAG图分解,以及Spark的任务集的调度过程。对于目前版本的Spark Streaming而言,其最小的Batch Size的选取在0.5~2秒钟之间(Storm目前最小的延迟是100ms左右),所以Spark Streaming能够满足除对实时性要求非常高(如高频实时交易)之外的所有流式准实时计算场景。
4) 扩展性与吞吐量
Spark目前在EC2上已能够线性扩展到100个节点(每个节点4Core),可以以数秒的延迟处理6GB/s的数据量(60M records/s),其吞吐量也比流行的Storm高2~5倍,图4是Berkeley利用WordCount和Grep两个用例所做的测试,在Grep这个测试中,Spark Streaming中的每个节点的吞吐量是670k records/s,而Storm是115k records/s。
图4 Spark Streaming与Storm吞吐量比较图
3.
Spark Streaming的编程模型
Spark Streaming的编程和Spark的编程如出一辙,对于编程的理解也非常类似。对于Spark来说,编程就是对于RDD的操作;而对于Spark Streaming来说,就是对DStream的操作。下面将通过一个大家熟悉的WordCount的例子来说明Spark Streaming中的输入操作、转换操作和输出操作。
- Spark Streaming初始化:在开始进行DStream操作之前,需要对Spark Streaming进行初始化生成StreamingContext。参数中比较重要的是第一个和第三个,第一个参数是指定Spark Streaming运行的集群地址,而第三个参数是指定Spark
Streaming运行时的batch窗口大小。在这个例子中就是将1秒钟的输入数据进行一次Spark Job处理。
val ssc = new
StreamingContext(“Spark://…”, “WordCount”, Seconds(1), [Homes], [Jars])
- Spark Streaming的输入操作:目前Spark Streaming已支持了丰富的输入接口,大致分为两类:一类是磁盘输入,如以batch size作为时间间隔监控HDFS文件系统的某个目录,将目录中内容的变化作为Spark Streaming的输入;另一类就是网络流的方式,目前支持Kafka、Flume、Twitter和TCP
socket。在WordCount例子中,假定通过网络socket作为输入流,监听某个特定的端口,最后得出输入DStream(lines)。
val lines = ssc.socketTextStream(“localhost”,8888)
- Spark Streaming的转换操作:与Spark RDD的操作极为类似,Spark Streaming也就是通过转换操作将一个或多个DStream转换成新的DStream。常用的操作包括map、filter、flatmap和 join,以及需要进行shuffle操作的groupByKey/reduceByKey等。在WordCount例子中,我们首先需要将 DStream(lines)切分成单词,然后将相同单词的数量进行叠加, 最终得到的wordCounts就是每一个batch size的(单词,数量)中间结果。
val words = lines.flatMap(_.split(“ ”))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
另外,Spark Streaming有特定的窗口操作,窗口操作涉及两个参数:一个是滑动窗口的宽度(Window Duration);另一个是窗口滑动的频率(Slide Duration),这两个参数必须是batch size的倍数。例如以过去5秒钟为一个输入窗口,每1秒统计一下WordCount,那么我们会将过去5秒钟的每一秒钟的WordCount都进行统计,然后进行叠加,得出这个窗口中的单词统计。
val wordCounts = words.map(x => (x,
1)).reduceByKeyAndWindow(_ + _, Seconds(5s),seconds(1))
但上面这种方式还不够高效。如果我们以增量的方式来计算就更加高效,例如,计算t+4秒这个时刻过去5秒窗口的WordCount,那么
我们可以将t+3时刻过去5秒的统计量加上[t+3,t+4]的统计量,在减去[t-2,t-1]的统计量(如图5所示),这种方法可以复用中间三秒的统 计量,提高统计的效率。
val wordCounts = words.map(x => (x,
1)).reduceByKeyAndWindow(_ + _, _ - _, Seconds(5s),seconds(1))
图5 Spark Streaming中滑动窗口的叠加处理和增量处理
- Spark Streaming的输入操作:对于输出操作,Spark提供了将数据打印到屏幕及输入到文件中。在WordCount中我们将DStream wordCounts输入到HDFS文件中。
wordCounts = saveAsHadoopFiles(“WordCount”)
- Spark Streaming启动:经过上述的操作,Spark Streaming还没有进行工作,我们还需要调用Start操作,Spark Streaming才开始监听相应的端口,然后收取数据,并进行统计。
ssc.start()
4.
Spark Streaming案例分析
在互联网应用中,网站流量统计作为一种常用的应用模式,需要在不同粒度上对不同数据进行统计,既有实时性的需求,又需要涉及到聚合、去重、连接等较为复杂的统计需求。传统上,若是使用Hadoop MapReduce框架,虽然可以容易地实现较为复杂的统计需求,但实时性却无法得到保证;反之若是采用Storm这样的流式框架,实时性虽可以得到保证,但需求的实现复杂度也大大提高了。Spark
Streaming在两者之间找到了一个平衡点,能够以准实时的方式容易地实现较为复杂的统计需求。 下面介绍一下使用Kafka和Spark Streaming搭建实时流量统计框架。
- 数据暂存:Kafka作为分布式消息队列,既有非常优秀的吞吐量,又有较高的可靠性和扩展性,在这里采用Kafka作为日志传递中间件来接收日志,抓取客户端发送的流量日志,同时接受Spark
Streaming的请求,将流量日志按序发送给Spark Streaming集群。 - 数据处理:将Spark Streaming集群与Kafka集群对接,Spark Streaming从Kafka集群中获取流量日志并进行处理。Spark Streaming会实时地从Kafka集群中获取数据并将其存储在内部的可用内存空间中。当每一个batch窗口到来时,便对这些数据进行处理。
- 结果存储:为了便于前端展示和页面请求,处理得到的结果将写入到数据库中。
相比于传统的处理框架,Kafka+Spark Streaming的架构有以下几个优点。
- Spark框架的高效和低延迟保证了Spark
Streaming操作的准实时性。 - 利用Spark框架提供的丰富API和高灵活性,可以精简地写出较为复杂的算法。
- 编程模型的高度一致使得上手Spark Streaming相当容易,同时也可以保证业务逻辑在实时处理和批处理上的复用。
在基于Kafka+Spark Streaming的流量统计应用运行过程中,有时会遇到内存不足、GC阻塞等各种问题。下面介绍一下如何对Spark Streaming应用程序进行调优来减少甚至避免这些问题的影响。
5.
性能调优
1) 优化运行时间
- 增加并行度。确保使用整个集群的资源,而不是把任务集中在几个特定的节点上。对于包含shuffle的操作,增加其并行度以确保更为充分地使用集群资源。
- 减少数据序列化、反序列化的负担。Spark Streaming默认将接收到的数据序列化后存储以减少内存的使用。但序列化和反序列化需要更多的CPU时间,因此更加高效的序列化方式(Kryo)和自定义的序列化接口可以更高效地使用CPU。
- 设置合理的batch窗口。在Spark Streaming中,Job之间有可能存在着依赖关系,后面的Job必须确保前面的Job执行结束后才能提交。若前面的Job执行时间超出了设置的 batch窗口,那么后面的Job就无法按时提交,这样就会进一步拖延接下来的Job,造成后续Job的阻塞。因此,设置一个合理的batch窗口确保 Job能够在这个batch窗口中结束是必须的。
- 减少任务提交和分发所带来的负担。通常情况下Akka框架能够高效地确保任务及时分发,但当batch窗口非常小(500ms)时,提交和分发任务的延迟就变得不可接受了。使用Standalone模式和Coarse-grained Mesos模式通常会比使用Fine-Grained
Mesos模式有更小的延迟。
2) 优化内存使用
- 控制batch size。Spark Streaming会把batch窗口内接收到的所有数据存放在Spark内部的可用内存区域中,因此必须确保当前节点Spark的可用内存至少能够容纳这个batch窗口内所有的数据,否则必须增加新的资源以提高集群的处理能力。
- 及时清理不再使用的数据。上面说到Spark Streaming会将接收到的数据全部存储于内部的可用内存区域中,因此对于处理过的不再需要的数据应及时清理以确保Spark Streaming有富余的可用内存空间。通过设置合理的spark.cleaner.ttl时长来及时清理超时的无用数据。
- 观察及适当调整GC策略。GC会影响Job的正常运行,延长Job的执行时间,引起一系列不可预料的问题。观察GC的运行情况,采取不同的GC策略以进一步减小内存回收对Job运行的影响。
6.
总结
Spark Streaming提供了一套高效、可容错的准实时大规模流式处理框架,它能和批处理及即时查询放在同一个软件栈中,降低学习成本。如果你学会了Spark编程,那么也就学会了Spark Streaming编程,如果理解了Spark的调度和存储,那么Spark Streaming也类似。对开源软件感兴趣的读者,我们可以一起贡献社区。目前Spark已在Apache孵化器中。按照目前的发展趋势,Spark Streaming一定将会得到更大范围的使用。
(准实时任务(Firm
Real-Time Task): 通常是指计算机系统允许任务超时,但若任务超时,该任务的计算结果没有任何意义)
十二、 Spark on Yarn的实施过程中遇到的问题[3]
第一个是多生态作业竞争问题,同样是好处也是一个坏处,内存消耗非常多的作业,有可能作业等待很久才能递交上去,比普通的Hadoop更加麻烦,面临着CPU、内存都申请到才可以运行
第二机器内存性能,如果搭建Spark
on Yarn集群,千万不要找内存比较小的搭集群,最好96G或者120G,机器可以相对少一点,比如100台20G搭建小集群,这样花在私人申请、任务调 度、计算时间非常多,最后让Spark性能非常差,既然要跑Spark,就不要珍惜内存了,而且内存价格越来越白菜价,用好机器,真正体验Spark给数
据挖掘带来质的飞跃;
第三个粗粒度的资源预申请,Spark提交的时候,运行算法之前需要把所有算法资源申请到,可能你提交一个算法运
行十个Stage,中间一个需要1T内存,那么作业运行过程仲,要一直占1T内存不放才能成功运行。这个作者会在后面框架做一些改进,但是动作比较大,暂时问题没有办法解决,需要把算法跑需要按照最大资源申请需要的内存以及Core才可以;
最后一个问题开发人员的内存把控能力,因为 Spark作业目前写起来有两个问题,一个是语法问题,一个对内存把控能力。作为普通的机器学习算法开发人员这方面会比较痛苦一点,因为他们更侧重机器学
习算法,而对内存把控不太好,需要学习一些东西,才能在上边娴熟开发和运行算法。
参考:
[1]. Spark:一个高效的分布式计算系统. http://soft.chinabyte.com/database/431/12914931.shtml
[2]. Spark Streaming:大规模流式数据处理的新贵.
http://www.csdn.net/article/2014-01-27/2818282-spark-streaming-big-data
[3]. 淘宝:Spark
on Yarn 为"大象"插上翅膀.
http://blog.csdn.net/it_man/article/details/18961067
[4]. Spark 快速理解.
http://blog.csdn.net/colorant/article/details/8255958