Apache Spark进行大数据处理 -- 第一部分:介绍

时间:2023-02-01 09:15:09

什么是Spark

Apache Spark是一个围绕着处理速度,易使用及复杂分析构建的开源大数据处理框架。它最早由加州大学伯克利分校的AMPLab在2009年开发,2010年成为Apache的开源项目。
Spark相对于其他的大数据和MapReduce技术(如:Hadoop和Storm)有几个优点。
首先,Spark为我们提供了完整、统一的框架来管理大数据处理所需要的各种不同特性的数据集(文本数据,图像数据等),以及源数据(批量数据及实时数据流)。
Spark可以让Hadoop集群中的应用程序在内存中执行速度提高100倍,即使在磁盘中执行速度也能提高10倍。
Spark能让你使用Java,Scala或者Python快速地编写应用程序。它内置了80多个高级操作。你可以使用脚本进行交互式数据查询。
处理Map和Reduce操作,它还支持SQL查询,流数据处理,机器学习和图像数据处理。开发者可以单独使用这些功能或者通过一个数据管道将它们结合使用。
在Apache Spark系列文章的第一部分,我们将了解Spark是什么,它和典型的MapReduce解决方案比较有什么不一样,以及它如何提供一套完整的大数据处理工具。
Hadoop和Spark
Hadoop作为一个大数据处理技术已经有10年的时间,且已被证明是大数据处理可选的解决方案。MapReduce一个单步计算(one-pass computations)的伟大的解决方案,但对于需要多遍计算(multi-pass computations)和算法的场景并不是很有效。数据处理过程中的每一步都会包含一个Map阶段和一个Reduce阶段,并且为了利用这个方案,你需要将案例转换为MapReduce模式。
任务下一步开始前会将每一步输出的数据会存储到分布式文件系统中。因此,这种方式会由于复制和磁盘存储导致任务执行速度变慢。同样,Hadoop解决方案通常包含集群,它很难搭建和管理。它还需要集成其他的工具(如Mahout-机器学习,Storm-流数据处理)才能满足不通的大数据处理场景。
如果你要完成一下复杂的任务,你需要串联一系列MapReduce任务并按顺序执行它们。每一个都是高延时的任务,并且只有前序任务执行完成后面的任务才能开始。
Spark允许开发人员使用有向无环图(DAG)模式编写复杂的、多步式数据处理管道。它同样支持在内存中进行多个DAG间数据共享,以便于不同的任务能处理相同的数据。
Spark运行在HDFS上提供了增强的和额外的功能。Spark支持在Hadoop v1集群(SIMR– Spark-Inside-MapReduce)或者Hadoop v2 YARN集群或者Apache Mesos环境下的进行应用开发。
我们应该把Spark看做是Hadoop MapReduce的一个可选替换方案,而不是替代Hadoop。Spark不不是要替代Hadoop,而是提供一个完整和统一的解决方案来应对不同的大数据场景和需求。

Spark的特性

Spark通过很小带代价将数据处理从MapReduce带入下一个级别。拥有像内存数据存储和接近实时处理能力,Spark的性能可以比其他大数据技术快数倍。
Spark同样支持大数据查询的懒加载,来帮助数据处理过程的优化。它提供了一个更高级别的API,来提高开发人员的效率以及为大数据方案提供一个统一的架构模型。
Spark将中间结果保存在内存中而不是写入磁盘,这样处理非常有用特别是需要多次使用相同的数据集时。它是一个既可以基于内存计算又可以基于磁盘计算的执行引擎。当数据没有在内存中时,Spark会执行外部操作。Spark可以用来处理超过集群总内存大小的数据集。
Spark会将尽可能多的数据加载到内存中,超出的数据会放到磁盘中。它可以将数据集的一部分数据加载到内存中,其他部分保留在磁盘中。你需要根据你的数据和使用场景来评估对内存大小的需求。使用内存数据存储,Spark会带来性能优势。
Spark的其他特性:

  • 除了Map和Reduce,还支持其他功能。
  • 优化任意运算图。
  • 大数据查询懒加载,来帮助整个数据处理过程的优化。
  • 提供简单和一致的API(Scala,Java和Python)
  • 支持Scala和Python交互脚本。不支持Java交互脚本。
    Spark使用Scala语言编写并运行在Java虚拟机(JVM)上。目前支持以下语言使用Spark进行应用开发:Scala,Java,Python,Clojure,R。

Spark生态系统

除了Spark核心的API,Spark生态系统还提供了额外的库来支持大数据分析和机器学习领域的能力。
包括:
- Spark Steaming:

Spark Streaming可以用来进行实时数据流处理。它是基于微批处理风格的数据计算和处理。它使用基于一系列RDD的DStream进行实时数据处理。

  • Spark SQL:

Spark SQL提供了通过JDBC API展示Spark数据集的能力,允许使用传统的BI和可视化工具通过SQL查询Spark数据。Spark SQL允许使用者通过ETL抽取不通格式的数据(如JSON,Parquet或者数据库),进行数据转换,并通过查询语句展示这些数据。

  • Spark MLlib:

MLlib是Spark的可以扩展的机器学习库,由通用的学习算法和工具类组成,包括分类,回归,聚类,协同过滤,降维,当然也包括调优的部分。

  • Spark GraphX:

Graphx是新的Spark图形和图形并行计算API。GraphX通过引入Resilient Distributed Property Graph:一种点和边都带属性的有向多重图。为了支持图计算,GraphX公开一组基本的功能操作(如:subgraph,joinVertices和aggregateMessages)以及Pregel API的一个优化。另外,GraphX包含了一个日益增长的图算法和图builders的集合,用以简化图分析任务。除了这些库,还有其他的库,如DlinkDB和Tachyon。
DlinkDB是一个近似查询引擎,能用在海量数据上的交互式 SQL 查询。它允许用户通过权衡查询精度来缩短响应时间。它通过在数据样本上执行查询并保证查询结果被控制在允许的误差范围内。
Tachyon是一个分布式内存文件系统,可以在集群框架里(如Spark和MapReduce)以访问内存的速度来访问存在tachyon里的文件。它通过缓存要处理的文件到内存中,这样就可以避免去磁盘加载需要频繁读取的数据集。这样能让不同的任务/查询和框架以访问内存的速度访问缓存文件。
同时,Spark集成了和其他产品通讯的适配器,如Cassandra(Spark Cassandra Connector)和R(SparkR)。通过Cassandra Connector,你可以使用Spark访问存储在Cassandra数据库中的数据并对数据进行分析。
下图展示了Spark生态系统中的相互关联库。
Apache Spark进行大数据处理 -- 第一部分:介绍
我们将在接下来的一系列的文章中去探索这些库。

Spark架构

Spark架构包含3个主要组件:

  • 数据存储
  • API
  • 管理框架

让我们来看一下这些组件的详细介绍。

  • 数据存储:
    Spark使用HDFS文件系统来存储数据。它适用于任何跟Hadoop兼容的数据源,包括:HDFS,HBase,Cassandra等。

  • API:
    API为开发人员提供了创建Spark应用的标准接口。Spark提供了基于Scala,Java和Python语言的API。
    • Scala API
    • Java
    • Python

  • 资源管理:
    Spark可以做为一个独立的服务器部署,也可以部署在分布式环境下(如Mesos或者YARN)。
    下图展示了Spark框架模型的组件。
    Apache Spark进行大数据处理 -- 第一部分:介绍

弹性分布式数据集(Resilient Distributed Datasets简称RDD)

RDD(基于Matei的研究论文)是Spark框架的核心概念。可以把RDD看作是数据库中的一张表。它可以容纳任何类型的数据。Spark把数据存储在不同分区的RDD中。
它有助于重新计算和数据处理的优化。
它还支持容错,因为RDD知道如何重新创建和重新计算数据集。
RDD是不可变的。你可以通过transformation修改RDD,但transformation后返回的是一个新的RDD,原RDD保持不变。
RDD支持两种类型的操作:

  • Transformation

  • Action

**Transformation:**Transformation不是返回一个单一值,而是返回一个新的RDD。当你调用一个Transformation方法时并不会做任何计算,它只是获取一个RDD并返回一个新的RDD。
部分Transformation方法包括:map,filter,flatMap,groupByKey,reduceByKey,aggregateByKey,pipi和coalesce。
Action: Action操作计算并返回一个新值。当对RDD对象调用一个Action方法时,此时会执行数据查询和计算并返回结果。
部分Action方法包括:reduce,collect,count,first,take,countByKey和foreach。

如何安装Spark

有几种不同的方法安装和使用Spark。你可以在你的电脑上安装一个独立的Spark或者从供应商(如Cloudera、HortonWorks、或者MapR)获取可用的Spark虚拟机镜像。你可以可以直接使用云端Spark(如Databricks Cloud)。
本文中,我们将安装一个独立的Spark并在本地启动它。Spark 1.2.0是最近刚发布的版本。我们将使用这个版本进行样例演示。

如何运行Spark

当你在本地安装Spark或使用云端Spark,你可以通过不同的方式连接到Spark引擎。
下表展示了不同Spark运行模式下的主URL参数

主URL 描述
Local 本地单工作线程运行Spark
local[K] 本地K个工作线程运行Spark(最好设置为你电脑的核数)
local[*] 按本地按逻辑核数的工作线程运行Spark
spark://HOST:PORT 连接到Spark独立集群的主节点。端口必须是主节点配置的端口号,默认是7077
mesos://HOST:PORT 链接到Mesos集群。端口必须和配置的端口号一致,默认是5050。或者,Mesos集群使用了ZooKeeper,则需要使用mesos://zk://…
yarn-client 客户端模式连接到YARN集群。集群的位置从HADOOP_CONF_DIR环境变量获取
yarn-cluster 集群模式连接YARN集群。集群的位置从HADOOP_CONF_DIR环境变量获取

如何跟Spark交互

一旦Spark已启动运行。你就可以连接Spark,并通过脚本进行数据分析交互。Spark脚本支持Scala和Python语言。Java并不支持交互脚本,所以这个特性对Java并不适用。
你可以使用spark-shell.cmd和pyspark.cmd命令分别运行Scala脚本和Python脚本。

Spark Web控制台

当Spark在任何模式下运行时,你可以通过URL(http://localhost:4040)访问Spark Web控制台查看Spark任务的结果和统计信息。
Spark控制台如下图所示,包含Stages,Storage,Environment和Executors。
Apache Spark进行大数据处理 -- 第一部分:介绍

共享变量

Spark提供了两种类型的共享变量也变Spark程序在集群中高效运行。分别是广播变量和累加器。
广播变量:广播变量允许将一个只读的变量缓存在每台机器上,而不用在任务之间传递变量副本。广播变量可被用于有效地给集群中每个节点一个大型输入数据集的副本。
下面代码片段展示了如何使用广播变量:

//
// Broadcast Variables
//
val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar.value

累加器:累加器是仅仅被相关操作累加的变量,因此可以在并行中被有效地支持。它可以被用来实现计数器(如在MapReduce中)或累加器。运行在集群中的任务可以通过使用add方法增加一个累加器变量。但这些任务不能读取累加器的值。只有驱动程序能够读取累加器的值。
下面代码片段展示了如何使用累加器变量:

//
// Accumulators
//
val accum = sc.accumulator(0, "My Accumulator")
sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)
accum.value

Spark示例程序

文章中展示的示例程序是一个简单的单词计数应用。这是一个学习Hadoop大数据处理都会展示的例子。我们将对一个文本文件执行一些数据分析查询。本例中使用的文本文件和数据集都很小,但不需要做任何代码的改动你就可以用来对大数据集进行查询。
为了方便讨论,我们将使用Spark Scala脚本。
首先,让我们先看一下如何在本地安装Spark。
先决条件:

  • 你需要在本地安装Spark运行环境JDK。在后面第1步中会有说明。
  • 你需要安装Spark软件。在后面第2步中会介绍如何安装。
    注意:本文针对的Windows环境。如果你使用的是其他操作系统,你需要根据操作系统修改系统变量和目录路径。

第1步、安装JDK:

1)从Oracle官网下载JDK。推荐使用JDK 1.7。
安装JDK的目录不要包含空格。对于Windows用户,JDK安装在如C:\dev下,而不是”C:\Program Files”下。”Program Files”目录名中包含一个空格,安装在这个目录下会有问题。
注意:不要安装JDK或者Spark(在第2步中描述)在”C:\Program Files”目录下。
2)安装JDK后,进入JDK1.7目录的“bin”文件夹下执行下面的命令验证安装是否正确:

java -version

如果JDK安装正确,执行上述命令会显示Java版本。

第2步、安装SPARK:

从Spark官网下载最新版本的Spark。这边文章发布时Spark的最新版本是1.2。你可以根据Hadoop的版本选择特定版本的Spark安装文件。我下载了针对Hadoop2.4或后续版本的Spark,安装文件名是spark-1.2.0-bin-hadoop2.tgz。
解压安装文件到本地目录(如,c:\dev)。
进入spark安装目录并执行如下目录加载Spark脚本,验证Spark安装。下述命令是针对Windows的。如果你使用Linux或者Mac OS,请根据操作系统修改命令。

c:
cd c:\dev\spark-1.2.0-bin-hadoop2.4
bin\spark-shell

如果Spark安装正确,你会看到控制台输出以下信息。

….
15/01/17 23:17:46 INFO HttpServer: Starting HTTP Server
15/01/17 23:17:46 INFO Utils: Successfully started service 'HTTP class server' on port 58132.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 1.2.0
/_/
Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_71)
Type in expressions to have them evaluated.
Type :help for more information.
….
15/01/17 23:17:53 INFO BlockManagerMaster: Registered BlockManager
15/01/17 23:17:53 INFO SparkILoop: Created spark context..
Spark context available as sc.
You can type the following commands to check if Spark Shell is working correctly.

你可以输入如下命令校验Spark脚本是否正常工作。

sc.version

或者

sc.appName

完成这一步后,你可以使用如下命令退出Spark脚本窗口:

:quit

要加载Spark Python脚本,你需要在本地安装Python。你可以下载和安装Anaconda,它是一个免费的Python包,包含几个流行的包(science、math、engineering和data analysis)。
然后你可以执行如下命令:

c:
cd c:\dev\spark-1.2.0-bin-hadoop2.4
bin\pyspark

单次计数程序

一旦你完成Spark安装并已启动运行,你可以使用Spark API进行数据分析查询。
这些简单的命令能从文本文件读取数据并处理。我们会在后续的文章中展示Spark框架的高级示例。
首先,我们先使用Spark API运行这个很流行的单词计数样例。打开一个新的Spark Scala脚本窗口。下面是这个例子的命令。

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._

val txtFile = "README.md"
val txtData = sc.textFile(txtFile)
txtData.cache()

我们调用cache方法将创建的RDD对象存储到缓存中,这样我们将来使用它进行数据查询的时候,Spark不需要每次都对它进行重新计算。注意cache()是一个懒加载操作。当我们调用cache方法时,Spark并不会马上就加载数据到内存中。只用调用对RDD对象调用action方法时才会加载数据。
现在,我们可以调用count方法来查看文本文件的行数。

txtData.count()

现在,我们可以执行下面的命令来进行单词计数。数量会显示在每个单次的旁边。

val wcData = txtData.flatMap(l => l.split(" ")).map(word => (word, 1)).reduceByKey(_ + _)
wcData.collect().foreach(println)

如果你想查看更多的Spark核心API使用的示例代码,可以从Spark官网查看Spark文档。

接下来是什么

在后续文章中,我们会学习Spark生态系统的其他部分,从Spark SQL开始,然后是Spark Streaming,Spark MLlib和Spark GraphX。我们同样会了解即将出现的框架如Tachyon和BlinkDB。

总结

本文中,我们了解了如何使用Spark标准API来实现大数据处理和分析。我们同样了解了Spark和传统MapReduce实现(如Apache Hadoop)的区别。Spark和Hadoop一样都是基于HDFS文件系统的,因此,如果你已经在Hadoop上进行了大量的投入和基础搭建,你可以将Spark和MapReduce一起使用。
你同样可以结合SparkSQL,机器学习和Spark Streaming使用,我们会在后续文章中介绍。
通过一些适配器的集成,你可以将Spark和其他技术组合使用。文章有一个示例使用了Spark,Kafka和Apache Cassandra,其中Kafka作为数据流的入口,Spark做数据计算,最终将计算结果存储到Cassandra NoSQL数据库中。
但需要认识到,Spark的生态系统还不够成熟,还需要进一步改善,像安全和BI工具集成领域。

原文:https://www.infoq.com/articles/apache-spark-introduction