1. 简述hadoop 和 spark 的不同点(为什么spark更快)
Hadoop是一个分布式管理、存储、计算的生态系统,包括HDFS(分布式文件系统)、MapReduce(计算引擎)和YARN(资源调度器)。Hadoop的作业称为Job,每个Job包含多个Map Task和Reduce Task,这些Task在各自的进程中运行,当Task结束时,进程也会随之结束
Spark是一个基于内存计算的框架,使用RDD(弹性分布式数据集)进行数据处理。Spark的任务称为Application,一个Application包含多个Job,每个Job可以进一步划分为多个Stage,每个Stage包含多个Task。Spark的Task可以在内存中执行,避免了频繁的磁盘读写操作,从而提高了处理速度
Hadoop使用MapReduce模型进行数据处理,数据需要在磁盘上进行读写操作,这导致了较高的I/O开销,从而影响了处理速度
Spark采用内存计算技术,数据存储在内存中,减少了磁盘I/O操作,显著提高了处理速度。Spark的处理速度通常是Hadoop的10到100倍
2. 谈谈你对RDD的理解
弹性分布式数据集(Resilient Distributed Dataset, RDD)是Apache Spark中的核心概念,它是一个容错的、并行的数据结构,可以让开发者以弹性的方式进行数据计算。
RDD具有以下特性:
分布式:RDD的数据可以分布在集群中的不同节点上。
弹性:RDD在执行过程中可以根据需要重新分布数据集。
不可变:RDD是只读的,要改变RDD,只能创建新的RDD。
缓存:可以把RDD缓存起来,在计算中再次使用。
容错:RDD的每个阶段都会进行校验和,如果在计算过程中有数据丢失,可以通过父RDD重建。
3. 简述spark的shuffle过程
Shuffle 过程本质上都是将 Map 端获得的数据使用分区器进行划分,并将数据发送给对应的 Reducer 的过程。
前一个stage的ShuffleMapTask进行shuffle write,把数据存储在blockManager上面,并且把数据元信息上报到dirver的mapOutTarck组件中,下一个stage根据数据位置源信息,进行shuffle read,拉取上一个stage的输出数据
1、基于哈希的shuffle操作:基于哈希的shuffle操作的原理在于将Mapper(stage)生成的中间数据,按照Reduce的数量(Reduce任务的数量取决于当前stage的RDD的分区数量)进行切分。切分成多个bucket,每个bucket对应一个文件。当reduce任务运行时,会根据任务的编号和所依赖的mapper编号远程或者从本地取得相应的bucket作为reduce任务的输入数据进行处理()
2、基于排序的shuffle操作: 基于哈希的shuffle操作会产生很多文件,这对文件系统来说是一个非诚大的负担,而且在总数据量不大而文件特别多的情况下,随机读写会严重降低I/O性能。大量文件的带来的问题还包括缓存。缓存所占用的内存过多是一笔很大的开销。每个shuffle map task只会产生一个索引文件,以及这个文件的索引,其中索引中 记载着,这个文件的那些数据是被下游的那些reduce task(stage)任务使用()
4. spark的作业运行流程是怎么样的
Spark应用程序以进程集合为单位在分布式集群上运行,通过driver程序的main方法创建的SparkContext对象与集群交互。
1、Spark通过SparkContext向Cluster manager(资源管理器)申请所需执行的资源(cpu、内存等)
2、Cluster manager分配应用程序执行需要的资源,在Worker节点上创建Executor
3、SparkContext 将程序代码(jar包或者python文件)和Task任务发送给Executor执行,并收集结果给Driver。
5 spark driver的作用,以及client模式和cluster模式的区别
drive 主要负责管理整个集群的作业任务调度;executor是一个jvm进程,专门用于计算的节点
client模式下,driver运行在客户端;cluster模式下,driver运行在yarn集群。
6. 你知道Application、Job、Stage、Task他们之间的关系吗
1、一个应用程序对应多个job,一个job会有多个stage阶段,一个stage会有多个task
2、一个应用程序中有多少个行动算子就会创建多少个job作业;一个job作业中一个宽依赖会划分一个stage阶段;同一个stage阶段中最后一个算子有多少个分区这个stage就有多少个task,因为窄依赖每个分区任务是并行执行的,没有必要每个算子的一个分区启动一个task任务。如图所示阶段2最后一个map算子是对应5个分区,reducebykey是3个分区,总共是8个task任务。
3、当一个rdd的数据需要打乱重组然后分配到下一个rdd时就产生shuffle阶段,宽依赖就是以shuffle进行划分的。
7. Spark常见的算子介绍一下(10个以上)
Transformation算子
Transformation算子用于转换RDD,生成新的RDD,但不会立即执行计算。它们通常用于中间数据处理。
map:对RDD中的每个元素应用一个函数,返回一个新的RDD。
flatMap:将函数应用于RDD中的每个元素,并将结果扁平化成一个新的RDD。与map的区别在于flatMap可以处理集合类型的元素。
filter:过滤RDD,保留满足条件的元素,返回一个新的RDD。
union:合并两个或多个RDDs。
groupBy:根据给定的函数将元素分组,返回一个键值对RDD。
sortBy:根据给定的函数对RDD进行排序,返回一个新的有序RDD。
repartition:重新分区RDD,可以增加或减少分区数。
coalesce:减少RDD的分区数,主要用于优化性能。
Action算子
Action算子用于触发计算,并返回一个值或执行某些操作,但不返回新的RDD。
count:统计RDD中的元素个数。
collect:将RDD中的元素收集到Driver,并返回一个数组。
first:返回RDD中的第一个元素。
take:返回RDD中的前n个元素。
reduce:对RDD中的元素进行聚合操作。
saveAsTextFile:将RDD保存为文本文件。
foreach:对RDD中的每个元素应用一个函数,通常用于调试或数据导出。
8. 简述map和mapPartitions的区别
map算子:对RDD中的每个元素进行操作,每次传入一个元素到定义的函数中,返回处理后的元素。map算子的主要目的是将数据源中的数据进行转换和改变,不会减少或增多数据
mapPartitions算子:对RDD中的每个分区进行操作,每次传入一个分区的迭代器到定义的函数中,返回处理后的迭代器。mapPartitions算子可以增加或减少数据,因为它处理的是一批数据而不是单个元素
9. 你知道重分区的相关算子吗
repartition(numPartitions: Int): RDD[T]:这个算子会导致一个大的shuffle操作,它会根据一个hashpartitioner的方式来重新分区数据。
coalesce(numPartitions: Int, shuffle: Boolean = false): RDD[T]:这个算子用来减少RDD的分区数量,如果shuffle设置为false,那么只有当新的分区数大于原来的分区数的时候,这个操作才会减少计算资源。如果shuffle设置为true,那么这个操作会进行shuffle,不论新的分区数是否大于原来的分区数。
10. spark目前支持哪几种分区策略
HashPartitioner:采用哈希的方式对<Key,Value>键值对数据进行分区。其数据分区规则为partitionId = Key.hashCode % numPartitions,其中partitionId代表该Key对应的键值对数据应当分配到的Partition标识,Key.hashCode表示该Key的哈希值,numPartitions表示包含的Partition个数
RangePartitioner:将一定范围的数据映射到一个分区中。这种分区策略适用于需要对数据进行范围查询的场景
CustomPartitioner:用户可以根据具体需求自定义分区器,以满足特定的分区要求
11.简述groupByKey和reduceByKey的区别
reduceByKey:这是一个转换操作,它对具有相同键的元素执行一个聚合函数(reduce函数)。具体而言,它按键将元素进行分组,然后对每个组内的值进行合并操作,通常包括对相同键的所有值进行某种累积、求和或其他聚合操作。例如,rdd.reduceByKey(lambda x,y: x + y)会将相同的键的值累加起来
groupByKey:这也是一个转换操作,但它根据键对RDD中的元素进行分组,但不执行任何聚合操作。它只是将具有相同键的元素放在一个组中,形成一个包含键和其对应值的迭代器。例如,rdd.groupByKey()会将相同的键的值放在一起,但不进行任何聚合操作
- 简述reduceByKey、foldByKey、aggregateByKey、combineByKey 的区别
13. 宽依赖和窄依赖之间的区别
窄依赖(Narrow Dependency)
窄依赖指的是父RDD的一个分区最多只被子RDD的一个分区所使用。这种依赖关系的特点是:
高效性:由于每个父RDD的分区只需传输给一个子RDD的分区,不存在数据混洗(shuffle)的过程,因此执行效率较高。
应用场景:常见的操作包括map、filter、union等,这些操作通常不会引起数据的重新组织。
宽依赖(Wide Dependency)
宽依赖指的是父RDD的一个分区被多个子RDD的分区所使用。这种依赖关系的特点是:
性能损失:由于需要重新组织和传输数据,宽依赖会导致性能损失,因为它需要进行全局的数据重新排序和传输,这在处理大规模数据集时尤为昂贵。
应用场景:常见的操作包括groupByKey、reduceByKey、sortByKey等,这些操作通常会引起数据的shuffle。
14. spark为什么需要RDD持久化,持久化的方式有哪几种,他们之间的区别是什么
Spark中的RDD(弹性分布式数据集)是懒加载的,只有在遇到行动算子(如collect、count等)时才会从头开始计算。如果同一个RDD被多次使用,每次都需要重新计算,这会显著增加计算开销。为了避免这种情况,可以将RDD持久化到内存或磁盘中,以便在后续操作中直接使用持久化的数据,从而避免重复计算,提高计算效率
- cache
这是persist的一种简化方式,作用是将RDD缓存到内存中,以便后续快速访问。cache操作是懒执行的,即执行action算子时才会触发
- persist
提供了不同的存储级别,包括仅磁盘、仅内存、内存或磁盘、内存或磁盘+副本数、序列化后存入内存或磁盘等。用户可以根据不同的应用场景进行选择
- checkpoint
将数据永久保存,用于减少长血缘关系带来的容错成本。checkpoint不仅保存了数据,还保存了计算该数据的算子操作。当需要恢复数据时,可以通过这些操作重新计算,而不仅仅是依赖于原始数据。checkpoint在作业完成后仍然保留,可以用于后续的计算任务
15. 简述spark的容错机制
- stage输出失败的时候,上层调度器DAGScheduler会进行重试
- 计算过程中,某个task失败,底层调度器会进行重试。
- 计算过程中,如果部分计算结果丢失,可以根据窄依赖和宽依赖的血统重新恢复计算。
- 如果血统非常长,可以对中间结果做检查点,写入磁盘中,如果后续计算结果丢失,那么就从检查点的RDD开始重新计算
16. 除了RDD,你还了解spark的其他数据结构吗
算子以外的代码都是在Driver端执行,算子里面的代码都是在Executor端执行。
了解累加器和广播变量两种数据结构。
累加器就是分布式共享只写变量,简单说一下它的原理:累加器用来把Executor端变量信息聚合到Driver端。在Driver程序中定义的变量,在Executor端的每个Task都会得到这个变量的一份新的副本,每个task更新这些副本的值后,传回Driver端进行合并。
广播变量就是分布式共享只读变量,简单说一下它的原理:用来高效分发较大的对象。向所有工作节点发送一个较大的只读值,以供一个或多个spark操作使用。
17. spark调优
18. 谈一谈RDD,DataFrame,DataSet的区别
RDD 提供了低级别的控制和灵活性,但性能通常较低。DataFrame 提供了结构化的数据处理和优化,但缺乏类型安全。Dataset 结合了两者的优点,提供了类型安全和高级操作,但在 PySpark 中,主要使用 DataFrame API。
- RDD
不可变性:RDD 一旦创建,其内容不能更改。所有的转换操作都会生成一个新的 RDD。
分布式计算:RDD 以分布式方式存储在集群的多个节点上,并支持并行计算。
容错性:RDD 通过追踪其生成过程的血统信息(lineage)实现容错。如果某个分区的数据丢失,可以通过重新计算恢复数据。
操作:RDD 提供了两种操作:转换(如 map、filter、flatMap)和行动(如 collect、count、reduce)。
-
DataFrame
结构化数据:DataFrame 有明确的列和数据类型,类似于关系型数据库中的表。
优化:通过 Catalyst 优化器进行查询优化,并支持 Tungsten 执行引擎以提高性能。
操作:支持 SQL 查询、DataFrame 操作(如 select、filter、join、groupBy)。 -
Dataset
类型安全:在编译时检查数据类型错误,提供类型安全的操作。
高层次 API:提供类似于 DataFrame 的高级操作,同时保留类型安全的特性。
操作:支持类型安全的操作(如 map、flatMap、filter),并且可以通过 DataFrame API 进行操作。
19. Hive on Spark与SparkSql的区别
Spark SQL:主要用于结构化数据处理和对Spark数据执行类SQL的查询。它通过SQL解析引擎解析SQL语句,最终转换为Spark RDD的方式去执行。Spark SQL的目的是为懂得数据库管理系统(DBMS)但不熟悉Spark的用户准备的
Hive on Spark:Hive作为数据仓库,负责一部分的解析和优化计算,而Spark作为Hive的底层执行引擎之一,负责一部分的计算。Hive on Spark结合了Hive的SQL处理能力和Spark的计算性能,提供了高效的数据处理能力
20. sparksql的三种join实现
- Broadcast Join
将大小有限的维度表的全部数据分发到每个节点上,供事实表使用。executor存储维度表的全部数据,一定程度上牺牲了空间,换取shuffle操作大量的耗时,这在SparkSQL中称作 Broadcast Join。 - Shuffle Hash Join
当表比较大时,采用 broadcast join 会对driver端和executor端造成较大的压力,可以通过分区的形式将大批量的数据划分成n份较小的数据集进行并行计算利用key相同必然分区相同的这个原理,Spark SQL将较大表的 join 分而治之,先将表划分成 n 个分区,再对两个表中相对应分区的数据分别进行 Hash Join,这样即在一定程度上减少了driver广播一侧表的压力,也减少了executor端取整张被广播表的内存消耗。 - Sort Merge Join (大表对大表)
上面介绍的两种实现对于一定大小的表表适用,但当两个表都非常大时,显然无论用哪种都会对计算内存造成很大压力。这是因为join 时两者采取的都是 hash join,
首先将两张表按照 join keys 进行了重新shuffle,保证 join keys值相同的记录会被分在相应的分区。分区后对每个分区内的数据进行排序,排序后再对相应的分区内的记录进行连接。因为两个序列都是有序的,从头遍历,碰到 key 相同的就输出,如果不同,左边小就继续取左边,反之取右边。可以看出,无论分区有多大,Sort Merge Join 都不用把某一侧的数据全部加载到内存中,而是即用即丢,从而大大提升了大数量下 sql join 的稳定性。
-
简单介绍下sparkstreaming
-
你知道sparkstreaming的背压机制吗
-
SparkStreaming有哪几种方式消费Kafka中的数据,它们之间的区别是什么