介绍
在“用Apache Spark进行大数据处理”系列的前两篇文章中,我们看到了Apache Spark框架是什么,还有如何使用Spark SQL库访问数据的SQL接口。
这些方案是基于批处理模式下静态信息处理的,比如作为一个按小时或天运行的任务。但若是在数据驱动的业务决策场景下,当需要飞快地分析实时数据流以执行分析并创建决策支持时,又该如何呢?
使用流式数据处理,一旦数据到达计算就会被实时完成,而非作为批处理任务。实时数据处理与分析正在变为大多数组织的大数据战略中至关重要的一个组件。 在本文中,我们将会学习到如何使用Apache Spark中一个被称为Spark流的库进行实时数据分析。
我们将会看到一个网络服务器日志分析用例,该用例会向我们展示Spark流是如何帮助我们对持续产生的数据流进行分析的。
流数据分析
流数据基本上是一组连续的数据记录,它们通常产生于诸如传感器、服务器流量与在线搜索等数据源。常见的流数据的例子有网站上的用户行为、监控数据、服务器日志与其他事件数据。
流数据处理应用会有助于现场面板、实时在线推荐与即时诈骗检测。
如果我们正在构建一个实时收集、处理与分析流数据的应用,我们需要按照与批处理数据应用不同的设计视角进行考虑。
下面列出了三种不同的流数据处理框架:
在本文中我们将专注于Spark流。
Spark流
Spark流是核心Spark API的扩展。Spark流使得基于实时数据流构建容错性处理变得更加简单。
下面的图1展示了Spark流是如何融入到整个Apache Spark生态系统中。
(点击放大图像)
图1.具有Spark流库的Spark生态系统
Spark流工作的方式是将数据流按照预先定义的间隔(N秒)划分为批(称微批次)然后将每批数据视为一个弹性分布式数据集(Resilient Distributed Datasets,RDDs)。随后我们就可以使用诸如map、reduce、reduceByKey、join和window这样的操作来处理这些RDDs。这些RDD操作的结果会以批的形式返回。通常我们会将这些结果保存到数据存储中以供未来分析并生成报表与面板,或是发送基于事件的预警。
为Spark流决定时间间隔是很重要的,这需要基于你的用例与数据处理要求。如果值N太低,那么在分析阶段微批次就没有足够的数据以给出有意义的结果。
与Spark流相比,其他流处理框架是基于每个事件而非一个微批次来处理数据流的。用微批次的方法,我们可以在同一应用下使用Spark流API来应用其他Spark库(比如核心、机器学习等)。
流数据可以来源于许多不同的数据源。下面列出一些这样的数据源:
使用诸如Apache Spark这种大数据处理框架的另外一个优势就是我们可以在同一系统中组合批处理与流处理。我们也可以在数据流上应用Spark的机器学习与图处理算法。在本系列的后续文章当中,我们将会讨论被称为MLlib和GraphX的机器学习与图处理库。
Spark流结构如下图2所示。
(点击放大图像)
图2.Spark流如何工作
Spark流用例
Spark流正在变为实现实时数据处理与分析方案的首选平台,这些实时数据往往来源于物联网(Internet of Things,IoT)和传感器。它被用于各种用例与商业应用。
下面是一些最有趣的Spark流用例:
- Uber,车驾共享服务背后的公司,在他们的持续流式ETL管道中使用了Spark流以每天从其移动用户处收集TB级的事件数据来进行实时遥测分析。
- Pinterest,可视化书签工具背后的公司,使用Spark流、MemSQL与Apache Kafka技术以实时地深入了解他们全球的用户是怎样使用Pins的。
- Netflix使用Kafka与Spark流来构建一个实时在线电影推荐与数据监控解决方案,该方案每天要处理来自于不同数据源的数十亿条事件。
Spark流其他现实世界的样例还包括:
- 供应链分析
- 实时安全情报操作以寻找威胁
- 广告竞价平台
- 实时视频分析,以帮助观看者实现个性化与互动体验
让我们看一下Spark流的架构与API方法。若要编写Spark流程序,我们需要知晓两个组件:DStream与流上下文。
DStream
Dstream(离散流,Discretized Stream,的缩写)是Spark流中最基本的抽象,它描述了一个持续的数据流。DStream既可以从诸如Kafka、Flume与Kinesis这样的数据源中创建,也可以对其他DStream实施操作。在内部,一个DStream被描述为一个RDD对象的序列。
与RDDs上的转换与动作操作类似,DStream支持以下操作:
- map
- flatMap
- filter
- count
- reduce
- countByValue
- reduceByKey
- join
- updateStateByKey
流上下文
与Spark中的Spark上下文(SparkContext)相似,流上下文(StreamingContext)是所有流功能的主入口。
流上下文拥有内置方法可以将流数据接收到Spark流程序中。
使用该上下文,我们可以创建一个描述基于TCP数据源的流数据的DStream,可以用主机名与端口号指定TCP数据源。比如,如果我们使用像netcat这样的工具来测试Spark流程序的话,我们将会从运行netcat的机器(比如localhost)的9999端口上接收到数据流。
当代码被执行,在启动时,Spark流仅是设置将要执行的计算,此时还没有进行实时处理。在所有的转换都被设置完毕后,为了启动处理,我们最终会调用start()方法来启动计算,还有awaitTermination()方法来等待计算终结。
Spark流API
Spark流附带了若干个用于处理数据流的API方法。有类似于RDD的操作,比如map、flatMap、filter、count、reduce、groupByKey、reduceByKey、sortByKey和join。它也提供了其他基于window与stateful操作的处理流数据的API。包括window、countByWindow、reduceByWindow、countByValueAndWindow、reduceByKeyAndWindow和updateStateByKey。
Spark流库当前支持Scala、Java和Python编程语言。这里是每个语言对应的Spark流API链接:
Spark编程的步骤
在我们讨论样例应用之前,先来看看Spark流编程中与众不同的步骤:
- Spark流上下文被用于处理实时数据流。因此,第一步就是用两个参数初始化流上下文对象,Spark上下文和切片间隔时间。切片间隔设置了流中我们处理输入数据的更新窗口。一旦上下文被初始化,就无法再向已经存在的上下文中定义或添加新的计算。并且,在同一时间只有一个流上下文对象可以被激活。
- 当Spark流上下文被定义后,我们通过创建输入DStreams来指定输入数据源。在我们的样例应用中,输入数据源是一个使用了Apache Kafka分布式数据库和消息系统的日志消息生成器。日志生成器程序创建随机日志消息以模拟网络服务器的运行时环境,作为各种网络应用服务用户而产生的流量,日志消息被持续不断地生成。
- 使用map和reduce这样的Spark流变换API为DStreams定义计算。
- 当流计算逻辑被定义好后,我们可以使用先前创建的流上下文对象中的start方法来开始接收并处理数据。
- 最终,我们使用流上下文对象的awaitTermination方法等待流数据处理完毕并停止它。
样例应用
在本文中我们讨论的样例应用是一个服务器日志处理与分析程序。它可以被用于对服务器日志进行实时监控并执行基于这些日志的数据分析。这些日志消息被认为是时序数据,也就是由在一个指定时间间隔内所捕捉到的连续度量的数据点组成的序列。
时序数据的例子包括传感器数据、天气信息和点击流数据。时序分析就是处理时序数据以提取有助于制定业务决策的信息。该数据也可以被用于基于历史数据的预测分析。
使用这样的方案,我们不需要每小时或每天的批处理任务来处理服务器日志。Spark流接收持续产生的数据,对其进行处理并计算日志统计,以此来挖掘数据。
为了遵循服务器日志分析的标准样例,我们将会使用在Data Bricks Spark流参考应用中所讨论的Apache日志分析器作为我们样例应用的参考。该应用已经具备将在我们的应用中被重用的日志消息解析代码。这个参考应用是一个用来学习Spark通用框架以及Spark流的优秀资源。
点击他们的网站,以查看更多关于Databricks Spark参考应用的细节。
用例
样例应用的用例是一个网络服务器日志分析与统计的生成器。在样例应用中,我们分析网络服务器日志以计算如下统计信息,这些信息有助于进一步的数据分析和报表及面板的创建:
- 不同HTTP响应代码的响应计数
- 响应内容大小
- 导致最高网络流量的访问客户端的IP地址
- 最热门的终端URL以识别那些比其他服务被访问的更多服务
与本系列的前两篇文章不同,在本文中我们将使用Java而非Scala来创建Spark程序。我们按照独立应用的方式运行程序,而不是在控制台窗口中运行代码。在测试与产品环境中部署Spark程序也如此。Shell控制台接口(使用Scala、Python或R语言)仅仅是用于开发者本地测试而已。
技术
在样例程序中我们将使用如下的技术来演示如何使用Spark流库处理实时数据流。
Zookeeper
Zookeeper是一个为分布式应用提供可靠分布式协调的集中化的服务。Kafka,我们在样例应用中使用的消息系统,依赖于Zoopkeeper在整个集群中的详细设置。
Kafka
Apache Kafka是一个实时的、容错的、可扩展的消息系统,它用于实时地移动数据。对于诸如捕捉网站上用户活动、日志、股票行情数据以及仪表数据这些用例来说,它是一个很好的选择。
Kafka的工作方式类似于分布式数据库,它是基于被分区和复制的低延迟提交日志的。当我们将一个消息发送给Kafka,在集群中它会被复制给不同的服务器,与此同时它也会被提交到磁盘。
Apache Kafka包含客户端API以及一个称为Kafka连接的数据转换器框架。
Kafka客户端:Kafka包括Java客户端(针对消息生产者与消费者)。在我们的样例应用中我们将会使用Java生产者客户端API。
Kafka连接:Kafka也包含了Kafka连接,即一个介于Apache Kafka与外部数据系统之间的流数据框架,它可以支持组织内的数据管道。它包含了导入与导出连接器以将数据集移入或移出Kafka。Kafka连接程序可以作为独立进程或分布式服务运行,它支持REST接口的方式,即使用REST API提交连接器到Kafka连接集群。
Spark流
我们将会使用Spark流Java API来接收数据流,计算日志统计信息并且运行查询以回答诸如“最多网络请求来自于哪个IP地址”这样的问题。
下面的表1展示了样例应用中所使用的技术与工具以及他们的版本。
技术 |
版本 |
URL |
Zookeeper |
3.4.6 |
https://zookeeper.apache.org/doc/r3.4.6/ |
Kafka |
2.10 |
http://kafka.apache.org/downloads.html |
Spark 流 |
1.4.1 |
https://spark.apache.org/releases/spark-release-1-4-1.html |
JDK |
1.7 |
http://www.oracle.com/technetwork/java/javase/downloads/jdk7-downloads-1880260.html |
Maven |
3.3.3 |
http://archive.apache.org/dist/maven/maven-3/3.3.3/ |
表1.Spark流样例应用技术及工具
在图3中演示了Spark流样例应用中不同架构组件。
(点击放大图像)
图3.Spark流样例应用架构
Spark流应用运行时
为了在本地设置Java项目,可以从Github上下载Databricks参考应用代码。一旦获取了参考应用代码,就需要两个额外的Java类来运行我们的样例应用。
- 日志生成器(SparkStreamingKafkaLogGenerator.java)
- 日志分析器(SparkStreamingKafkaLogAnalyzer.java)
在文章网站上提供了这些文件的zip压缩包(spark-streaming-kafka-sample-app.zip)。如果你想在你本地机器上运行样例应用,使用链接下载zip文件,抽出Java类并将他们添加到之前步骤中创建的Java项目中。
样例应用可以被执行在不同的操作系统上。我在Windows和Linux(CentOS VM)环境下都运行了应用。
让我们看一下应用架构中的每个组件还有执行Spark流程序的步骤。
Zookeeper命令:
在样例程序中我使用的Zookeeper版本是3.4.6。为了启动服务器,需要设置两个环境变量,JAVA_HOME与ZOOKEEPER_HOME来指定JDK和Zookeeper各自的安装目录。然后导航到Zookeeper的home目录并运行如下命令来启动Zookeeper服务器。
bin\zkServer.cmd
如果你使用的是Linux环境,命令就是:
bin/zkServer.sh start
Kafka服务器命令:
在程序中使用的Kafka版本是2.10-0.9.00,基于Scala2.10版本。在Kafka中所使用的Scala版本是非常重要的,因为若是没有使用恰当的版本的话,当执行Spark流程序时就会遇到运行时错误。这里是启动Kafka服务器实例的步骤:
- 打开一个新的命令行窗口
- 设置JAVA_HONE与KAFKA_HOME环境变量
- 导航到Kafka的home目录
- 运行如下命令
对于Linux环境,命令如下:bin\windows\kafka-server-start.bat config\server.properties
bin/kafka-server-start.sh config/server.properties
日志生成器命令:
在我们的样例应用中下一步就是运行消息日志生成器。
日志生成器以不同的HTTP响应码(诸如200、401和404)及不同的终端URL创建测试日志消息。
在我们运行日志生成器之前,我们需要创建一个主题(Topic),我们可以将消息写到里面去。
与之前的步骤类似,打开一个新的命令行窗口,设置JAVA_HOME和KAFKA_HOME环境变量,并且导航到Kafka的home目录。然后首先运行以下命令来查看在Kafka服务器中已经存在的可用主题。
bin\windows\kafka-run-class.bat kafka.admin.TopicCommand --zookeeper localhost:2181 --list
在Linux上:
bin/kafka-run-class.sh kafka.admin.TopicCommand --zookeeper localhost:2181 --list
我们将会用以下命令创建一个叫做“spark-streaming-sample-topic”的新主题:
bin\windows\kafka-run-class.bat kafka.admin.TopicCommand --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --create --topic spark-streaming-sample-topic
在Linux上:
bin/kafka-run-class.sh kafka.admin.TopicCommand --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --create --topic spark-streaming-sample-topic
你可以再次运行list主题命令以查看是否新主题已经被正确创建。
当主题已经被创建好后,我们就可以运行日志生成器程序了。通过调用称为SparkStreamingKafkaLogGenerator的Java类来完成此步骤。日志生成器类接收以下四个参数来指定配置参数:
- 组标识:spark-streaming-sample-group
- 主题:spark-streaming-sample-topic
- 迭代次数:50
- 间隔:1000
打开一个新的命令行窗口来运行日志生成器。我们将要为JDK、Maven和Kafka目录分别设置三个环境变量(JAVA_HOME、MAVEN_HOME和KAFKA_HOME)。然后导航到样例项目根目录(比如c:\dev\projects\spark-streaming-kafka-sample-app)并运行以下命令。
mvn exec:java -Dexec.mainClass=com.sparkstreaming.kafka.example.SparkStreamingKafkaLogGenerator -Dexec.args="spark-streaming-sample-groupid spark-streaming-sample-topic 50 1000"
一旦日志生成器程序运行起来,就应该在控制台上通过debug消息看到被创建的测试日志消息。这只是个样例代码,所以日志消息被随机地生成以模拟从诸如网络服务器这种事件源生成的持续不断的数据流。
下面的图4展示了日志消息生产者还有正在生成的日志消息截屏。
(点击放大图像)
图4.Spark流日志生成器程序输出
Spark流命令:
这是使用了Spark流API的日志消息消费者。我们使用叫做SparkStreamingKafkaLogAnalyzer的Java类来从Kafka服务器上接收并处理数据流以创建日志统计信息。
Spark流处理服务器日志消息并生成累计日志统计信息,比如网络请求大小(最小、最大与平均)、响应代码计数、IP地址与热点终端。
我们用“local[*]”创建Spark上下文,它会在本地系统中检测内核的数量并使用它们运行程序。
为了运行Spark流Java类,将会在classpath中用到以下JAR文件:
- kafka_2.10-0.9.0.0.jar
- kafka-clients-0.9.0.0.jar
- metrics-core-2.2.0.jar
- spark-streaming-kafka_2.10-1.4.0.jar
- zkclient-0.3.jar
将上述JAR文件添加到classpath后我用Eclipse IDE运行了程序。日志分析Spark流程序的输出如图5。
(点击放大图像)
图5.Spark流日志分析程序输出
Spark流应用的可视化
当Spark流程序运行的时候,我们可以检查Spark控制台来查看Spark任务的细节。
打开一个新的网络浏览器窗口并导航到URL http://localhost:4040 以访问Spark控制台。
先看看一些展示Spark流程序统计信息的图表。
第一个可视化就是任务的DAG(无回路有向图,Direct Acyclic Grapg),它展示了我们所运行的程序中不同操作的依赖图,操作有map、window和foreachRDD等。下面的图6展示了我们样例程序中Spark流任务的可视化截屏。
(点击放大图像)
图6.Spark流任务的可视化图形
我们将要看的下一个图形就是包含了输入比率的流统计图,它显示了每秒的事件数量,以及处理所花费的毫秒数。
图7展示了Spark流程序执行期间的这些统计信息,左面是流数据还没有产生时的情况,而右边是数据流被发送到Kafka并且被Spark流消费者处理的情况。
图7.为样例程序展示流统计信息的Spark可视化
结论
Spark流库,Apache Spark生态系统中的一部分,用于实时流数据的数据处理。在本文中,我们学习了如何使用Spark流API来处理由服务器日志生成的数据并基于实时数据流执行分析。
下一步是什么
机器学习、预测分析和数据科学在近期都在获得越来越多的关注,他们都是不同用例下的问题解决方案。Spark MLlib,Spark机器学习库,提供了若干内置方法以使用诸如协同过滤、聚簇与归类这样的不同机器学习算法。
在下一篇文章中,我们将会探索Spark MLlib并观察几个用例来演示如何利用Spark的数据科学计算能力,它可以使机器学习算法的使用变得更加简单。
在本系的后续文章中,我们将看看像BlinkDB与Tachyon这样的即将到来的框架。