ShuffleManager(一)
本篇,我们来看一下spark内核中另一个重要的模块,Shuffle管理器ShuffleManager。shuffle可以说是分布式计算中最重要的一个概念了,数据的join,聚合去重等操作都需要这个步骤。另一方面,spark之所以比mapReduce的性能高其中一个主要的原因就是对shuffle过程的优化,一方面spark的shuffle过程更好地利用内存(也就是我们前面在分析内存管理时所说的执行内存),另一方面对于shuffle过程中溢写的磁盘文件归并排序和引入索引文件。当然,spark性能高的另一个主要原因还有对计算链的优化,把多步map类型的计算chain在一起,大大减少中间过程的落盘,这也是spark显著区别于mr的地方。
spark新版本的Shuffle管理器默认是SortShuffleManager。
SparkEnv初始化部分的代码:
val shortShuffleMgrNames = Map(
"sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName,
"tungsten-sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName)
ShuffleMapTask.runTask
看shuffle管理器的源码,我们首先应该ShuffleManager的调用时机。想一下shuffle的过程,无非就是两个步骤,写和读。写是在map阶段,将数据按照一定的分区规则归类到不同的分区中,读是在reduce阶段,每个分区从map阶段的输出中拉取属于自己的数据,所以我们分析ShuffleManager源码基本也可以沿着这个思路。我们先来分析写的过程,因为对于一个完整的shuffle过程,肯定是先写然后才读的。
回顾一下之前的对作业运行过程的分析,我们应该还记得作业被切分成任务后是在executor端执行的,而Shuffle阶段的的stage被切分成了ShuffleMapTask,shuffle的写过程正是在这个类中完成的,我们看一下代码:
可以看到通过ShuffleManager.getWriter获取了一个shuffle写入器,从而将rdd的计算数据写入磁盘。
override def runTask(context: TaskContext): MapStatus = {
// Deserialize the RDD using the broadcast variable.
val threadMXBean = ManagementFactory.getThreadMXBean
val deserializeStartTime = System.currentTimeMillis()
val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
threadMXBean.getCurrentThreadCpuTime
} else 0L
val ser = SparkEnv.get.closureSerializer.newInstance()
// 反序列化RDD和shuffle, 关键的步骤
// 这里思考rdd和shuffle反序列化时,内部的SparkContext对象是怎么反序列化的
val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](
ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
_executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime
_executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime
} else 0L
var writer: ShuffleWriter[Any, Any] = null
try {
// shuffle管理器
val manager = SparkEnv.get.shuffleManager
// 获取一个shuffle写入器
writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)
// 这里可以看到rdd计算的核心方法就是iterator方法
// SortShuffleWriter的write方法可以分为几个步骤:
// 将上游rdd计算出的数据(通过调用rdd.iterator方法)写入内存缓冲区,
// 在写的过程中如果超过 内存阈值就会溢写磁盘文件,可能会写多个文件
// 最后将溢写的文件和内存中剩余的数据一起进行归并排序后写入到磁盘中形成一个大的数据文件
// 这个排序是先按分区排序,在按key排序
// 在最后归并排序后写的过程中,没写一个分区就会手动刷写一遍,并记录下这个分区数据在文件中的位移
// 所以实际上最后写完一个task的数据后,磁盘上会有两个文件:数据文件和记录每个reduce端partition数据位移的索引文件
writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
// 主要是删除中间过程的溢写文件,向内存管理器释放申请的内存
writer.stop(success = true).get
} catch {
case e: Exception =>
try {
if (writer != null) {
writer.stop(success = false)
}
} catch {
case e: Exception =>
log.debug("Could not stop writer", e)
}
throw e
}
}
SortShuffleManager.getWriter
这里根据shuffle类型获取不同的ShuffleWriter对象,大多数情况下,都是SortShuffleWriter类型,所以我们直接看SortShuffleWriter.write方法。
/** Get a writer for a given partition. Called on executors by map tasks. */
// 获取一个shuffle存储器,在executor端被调用,在执行一个map task调用
override def getWriter[K, V](
handle: ShuffleHandle,
mapId: Int,
context: TaskContext): ShuffleWriter[K, V] = {
numMapsForShuffle.putIfAbsent(
handle.shuffleId, handle.asInstanceOf[BaseShuffleHandle[_, _, _]].numMaps)
val env = SparkEnv.get
handle match {
case unsafeShuffleHandle: SerializedShuffleHandle[K @unchecked, V @unchecked] =>
new UnsafeShuffleWriter(
env.blockManager,
shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver],
context.taskMemoryManager(),
unsafeShuffleHandle,
mapId,
context,
env.conf)
case bypassMergeSortHandle: BypassMergeSortShuffleHandle[K @unchecked, V @unchecked] =>
new BypassMergeSortShuffleWriter(
env.blockManager,
shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver],
bypassMergeSortHandle,
mapId,
context,
env.conf)
case other: BaseShuffleHandle[K @unchecked, V @unchecked, _] =>
new SortShuffleWriter(shuffleBlockResolver, other, mapId, context)
}
}
SortShuffleWriter.write
总结一下这个方法的主要逻辑:
获取一个排序器,根据是否需要map端聚合传递不同的参数
将数据插入排序器中,这个过程或溢写出多个磁盘文件
根据shuffleid和分区id获取一个磁盘文件名,
将多个溢写的磁盘文件和内存中的排序数据进行归并排序,并写到一个文件中,同时返回每个reduce端分区的数据在这个文件中的位移
将索引写入一个索引文件,并将数据文件的文件名由临时文件名改成正式的文件名。
最后封装一个MapStatus对象,用于ShuffleMapTask.runTask的返回值。
-
在stop方法中还会做一些收尾工作,统计磁盘io耗时,删除中间溢写文件
override def write(records: Iterator[Product2[K, V]]): Unit = {
sorter = if (dep.mapSideCombine) {
// map端进行合并的情况,此时用户应该提供聚合器和顺序
require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!")
new ExternalSorter[K, V, C](
context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)
} else {
// In this case we pass neither an aggregator nor an ordering to the sorter, because we don't
// care whether the keys get sorted in each partition; that will be done on the reduce side
// if the operation being run is sortByKey.
new ExternalSorter[K, V, V](
context, aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer)
}
// 将map数据全部写入排序器中,
// 这个过程中可能会生成多个溢写文件
sorter.insertAll(records) // Don't bother including the time to open the merged output file in the shuffle write time,
// because it just opens a single file, so is typically too fast to measure accurately
// (see SPARK-3570).
// mapId就是shuffleMap端RDD的partitionId
// 获取这个map分区的shuffle输出文件名
val output = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)
// 加一个uuid后缀
val tmp = Utils.tempFileWith(output)
try {
val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID)
// 这一步将溢写到的磁盘的文件和内存中的数据进行归并排序,
// 并溢写到一个文件中,这一步写的文件是临时文件名
val partitionLengths = sorter.writePartitionedFile(blockId, tmp)
// 这一步主要是写入索引文件,使用move方法原子第将临时索引和临时数据文件重命名为正常的文件名
shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp)
// 返回一个状态对象,包含shuffle服务Id和各个分区数据在文件中的位移
mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)
} finally {
if (tmp.exists() && !tmp.delete()) {
logError(s"Error while deleting temp file ${tmp.getAbsolutePath}")
}
}
}
IndexShuffleBlockResolver
我们首先看一下获取shuffle输出文件名,是通过IndexShuffleBlockResolver组件获取的,而它的内部又是通过BlockManager内部的DiskBlockManager分配文件名的,这个DiskBlockManager我在之前分析块管理器时提到过,它的作用就是管理文件名的分配,以及spark使用的目录,子目录的创建删除等。我们看到对于数据文件和索引文件的命名规则是不一样的,他们的命名规则分别定义在ShuffleDataBlockId和ShuffleIndexBlockId中。
def getDataFile(shuffleId: Int, mapId: Int): File = {
blockManager.diskBlockManager.getFile(ShuffleDataBlockId(shuffleId, mapId, NOOP_REDUCE_ID))
}
private def getIndexFile(shuffleId: Int, mapId: Int): File = {
blockManager.diskBlockManager.getFile(ShuffleIndexBlockId(shuffleId, mapId, NOOP_REDUCE_ID))
}
ExternalSorter.insertAll
我们根据SortShuffleWriter中的调用顺序,首先看一下ExternalSorter.insertAll方法:
首选根据是否在爱map端合并分为两种情况,这两种情况使用的内存存储结构也不一样,对于在map端合并的情况使用的是PartitionedAppendOnlyMap结构,不在map合并则使用PartitionedPairBuffer。其中,PartitionedAppendOnlyMap是用数组和线性探测法实现的map结构。
-
然后将数据一条一条地循环插入内存的存储结构中,同时考虑到map端合并的情况
def insertAll(records: Iterator[Product2[K, V]]): Unit = {
// TODO: stop combining if we find that the reduction factor isn't high
val shouldCombine = aggregator.isDefined // 在map端进行合并的情况
if (shouldCombine) {
// Combine values in-memory first using our AppendOnlyMap
val mergeValue = aggregator.get.mergeValue
val createCombiner = aggregator.get.createCombiner
var kv: Product2[K, V] = null
val update = (hadValue: Boolean, oldValue: C) => {
if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)
}
while (records.hasNext) {
addElementsRead()
kv = records.next()
// 向内存缓冲中插入一条数据
map.changeValue((getPartition(kv._1), kv._1), update)
// 如果缓冲超过阈值,就会溢写到磁盘生成一个文件
// 每写入一条数据就检查一遍内存
maybeSpillCollection(usingMap = true)
}
} else {// 不再map端合并的情况
// Stick values into our buffer
while (records.hasNext) {
addElementsRead()
val kv = records.next()
buffer.insert(getPartition(kv._1), kv._1, kv._2.asInstanceOf[C])
maybeSpillCollection(usingMap = false)
}
}
}
AppendOnlyMap.changeValue
我们看一个稍微复杂一点的结构,AppendOnlyMap,
- 首先考虑空值的情况
- 计算key的hash,然后对容量取余。注意,这里由于容量是2的整数次幂,所以对容量取余的操作等同于和容量-1进行位与操作,java HashMap中的操作。
- 如果,不存在旧值,那么直接插入,
- 如果存在旧值,更新旧值
- 如果发生hash碰撞,那么需要向后探测,并且是跳跃性的探测,
可以看出,这个结构设计还是很精良的,这里面有个很重的方法,incrementSize方法中会判断当前数据量的大小,如果超过阈值就会扩容,这个扩容的方法比较复杂,就是一个重新hash再分布的过程,不过有一点,发不论是在插入新数据还是重新hash再分布的过程中,对于hash碰撞的处理策略一定要相同,否则可能造成不一致。
// 向数组中插入一个kv对,
def changeValue(key: K, updateFunc: (Boolean, V) => V): V = {
assert(!destroyed, destructionMessage)
val k = key.asInstanceOf[AnyRef]
// 处理key为空的情况
if (k.eq(null)) {
// 如果是第一次插入空值,那么需要将大小增加1
if (!haveNullValue) {
incrementSize()
}
nullValue = updateFunc(haveNullValue, nullValue)
haveNullValue = true
return nullValue
}
var pos = rehash(k.hashCode) & mask
// 线性探测法处理hash碰撞
// 这里是一个加速的线性探测,即第一次碰撞时走1步,
// 第二次碰撞时走2步,第三次碰撞时走3步
var i = 1
while (true) {
val curKey = data(2 * pos)
if (curKey.eq(null)) {// 如果旧值不存在,直接插入
val newValue = updateFunc(false, null.asInstanceOf[V])
data(2 * pos) = k
data(2 * pos + 1) = newValue.asInstanceOf[AnyRef]
incrementSize()
return newValue
} else if (k.eq(curKey) || k.equals(curKey)) {// 如果旧值存在,需要更新
val newValue = updateFunc(true, data(2 * pos + 1).asInstanceOf[V])
data(2 * pos + 1) = newValue.asInstanceOf[AnyRef]
return newValue
} else {// 发生hash碰撞,向后探测,跳跃性的探测
val delta = i
pos = (pos + delta) & mask
i += 1
}
}
null.asInstanceOf[V] // Never reached but needed to keep compiler happy
}
ExternalSorter.maybeSpillCollection
我们回到ExternalSorter的插入方法中,没插入一条数据都要检查内存占用,判断是否需要溢写到磁盘,如果需要就溢写到磁盘。
这个方法里调用了map.estimateSize来估算当前插入的数据的内存占用大小,对于内存占用的追踪和估算的功能是在SizeTracker特质中实现的,这个特质我在之前分析MemoryStore时提到过,在将对象类型的数据插入内存中时使用了一个中间态的数据结构DeserializedValuesHolder,它的内部有一个SizeTrackingVector,这个类就是通过继承SizeTracker特征从而实现对象大小的追踪和估算。
private def maybeSpillCollection(usingMap: Boolean): Unit = {
var estimatedSize = 0L
if (usingMap) {
estimatedSize = map.estimateSize()
if (maybeSpill(map, estimatedSize)) {
map = new PartitionedAppendOnlyMap[K, C]
}
} else {
estimatedSize = buffer.estimateSize()
if (maybeSpill(buffer, estimatedSize)) {
buffer = new PartitionedPairBuffer[K, C]
}
}
if (estimatedSize > _peakMemoryUsedBytes) {
_peakMemoryUsedBytes = estimatedSize
}
}
ExternalSorter.maybeSpill
首先检查当前内存占用是否超过阈值,如果超过会申请一次执行内存,如果没有申请到足够的执行内存,那么依然需要溢写到磁盘
protected def maybeSpill(collection: C, currentMemory: Long): Boolean = {
var shouldSpill = false
// 每写入32条数据检查一次
if (elementsRead % 32 == 0 && currentMemory >= myMemoryThreshold) {
// Claim up to double our current memory from the shuffle memory pool
val amountToRequest = 2 * currentMemory - myMemoryThreshold
// 向内存管理器申请执行内存
val granted = acquireMemory(amountToRequest)
myMemoryThreshold += granted
// If we were granted too little memory to grow further (either tryToAcquire returned 0,
// or we already had more memory than myMemoryThreshold), spill the current collection
// 如果内存占用超过了阈值,那么就需要溢写
shouldSpill = currentMemory >= myMemoryThreshold
}
shouldSpill = shouldSpill || _elementsRead > numElementsForceSpillThreshold
// Actually spill
if (shouldSpill) {
_spillCount += 1
logSpillage(currentMemory)
// 溢写到磁盘
spill(collection)
_elementsRead = 0
_memoryBytesSpilled += currentMemory
// 释放内存
releaseMemory()
}
shouldSpill
}
ExternalSorter.spill
接着上面的方法,
override protected[this] def spill(collection: WritablePartitionedPairCollection[K, C]): Unit = {
// 获取一个排序后的迭代器
val inMemoryIterator = collection.destructiveSortedWritablePartitionedIterator(comparator)
// 将数据写入磁盘文件中
val spillFile = spillMemoryIteratorToDisk(inMemoryIterator)
spills += spillFile
}
WritablePartitionedPairCollection.destructiveSortedWritablePartitionedIterator
这个方法返回按照分区和key排序过的迭代器,其具体的排序逻辑在AppendOnlyMap.destructiveSortedIterator中
AppendOnlyMap.destructiveSortedIterator
这段代码分为两块,首先对数组进行压紧,是的稀疏的数据全部转移到数组的头部;
然后对数组按照比较器进行排序,比较器首先是按照分区进行比较,如果分区相同才按照key进行比较;
然后返回一个迭代器,这个迭代器仅仅是对数组的封装。通过这个方法,我们大概知道了AppendonlyMap的排序逻辑。
def destructiveSortedIterator(keyComparator: Comparator[K]): Iterator[(K, V)] = {
destroyed = true
// Pack KV pairs into the front of the underlying array
// 这段代码将稀疏的数据全部转移到数组头部,将数据压紧
var keyIndex, newIndex = 0
while (keyIndex < capacity) {
if (data(2 * keyIndex) != null) {
data(2 * newIndex) = data(2 * keyIndex)
data(2 * newIndex + 1) = data(2 * keyIndex + 1)
newIndex += 1
}
keyIndex += 1
}
assert(curSize == newIndex + (if (haveNullValue) 1 else 0))
// 根据比较器对数据进行排序
new Sorter(new KVArraySortDataFormat[K, AnyRef]).sort(data, 0, newIndex, keyComparator)
new Iterator[(K, V)] {
var i = 0
var nullValueReady = haveNullValue
def hasNext: Boolean = (i < newIndex || nullValueReady)
def next(): (K, V) = {
if (nullValueReady) {
nullValueReady = false
(null.asInstanceOf[K], nullValue)
} else {
val item = (data(2 * i).asInstanceOf[K], data(2 * i + 1).asInstanceOf[V])
i += 1
item
}
}
}
}
ExternalSorter.spillMemoryIteratorToDisk
回到ExternalSorter.spill方法中,在获取了经过排序后 的迭代器之后,我们就可以将数据溢写到磁盘上了。
这个方法的代码我不贴了,总结一下主要步骤:
- 首先通过DiskBlockManager获取一个临时块的BlockId和临时文件名
- 通过blockManager获取一个磁盘写入器,即DiskBlockObjectWriter对象,内部封装了调用java流api写文件的逻辑
- 循环将每条数据写入磁盘,并定期进行刷写(每隔一定的数据条数将内存中的数据刷写到磁盘上)
- 如果发生异常,则会对之前写入的文件进行回滚
小结
总结一下数据通过ExternalSorter向磁盘溢写的全过程:
- 首先,数据会被一条一条地向内部的map结构中插入
- 每插入一条数据都会检查内存占用情况,如果内存占用超过阈值,并且申请不到足够的执行内存,就会将目前内存中的数据溢写到磁盘
- 对于溢写的过程:首先会将数据按照分区和key进行排序,相同分区的数据排在一起,然后根据提供的排序器按照key的顺序排;然后通过DiskBlockManager和BlockManager获取DiskBlockWriter将数据写入磁盘形成一个文件。,并将溢写的文件信息
- 在整个写入过程中,会溢写多个文件
ExternalSorter.writePartitionedFile
总结一下主要的步骤:
仍然是通过blockManager获取一个磁盘写入器
将内部溢写的多个磁盘文件和滞留在内存的数据进行归并排序,并分装成一个按照分区归类的迭代器
-
循环将数据写入磁盘,每当一个分区的数据写完后,进行一次刷写,将数据从os的文件缓冲区同步到磁盘上,然后获取此时的文件长度,记录下每个分区在文件中的位移
def writePartitionedFile(
blockId: BlockId,
outputFile: File): Array[Long] = { // Track location of each range in the output file
val lengths = new Array[Long](numPartitions)
val writer = blockManager.getDiskWriter(blockId, outputFile, serInstance, fileBufferSize,
context.taskMetrics().shuffleWriteMetrics) // 如果前面没有数据溢写到磁盘中,
// 则只需要将内存中的数据溢写到磁盘
if (spills.isEmpty) {
// Case where we only have in-memory data
val collection = if (aggregator.isDefined) map else buffer
// 返回排序后的迭代器
val it = collection.destructiveSortedWritablePartitionedIterator(comparator)
while (it.hasNext) {
val partitionId = it.nextPartition()
while (it.hasNext && it.nextPartition() == partitionId) {
it.writeNext(writer)
}
// 写完一个分区刷写一次
val segment = writer.commitAndGet()
// 记录下分区的数据在文件中的位移
lengths(partitionId) = segment.length
}
} else {// 有溢写到磁盘的文件
// We must perform merge-sort; get an iterator by partition and write everything directly.
// 封装一个用于归并各个溢写文件以及内存缓冲区数据的迭代器
// TODO 这个封装的迭代器是实现归并排序的关键
for ((id, elements) <- this.partitionedIterator) {
if (elements.hasNext) {
for (elem <- elements) {
writer.write(elem._1, elem._2)
}
// 每写完一个分区,主动刷写一次,获取文件位移,
// 这个位移就是写入的分区的位移,
// reduce端在拉取数据时就会根据这个位移直接找到应该拉取的数据的位置
val segment = writer.commitAndGet()
lengths(id) = segment.length
}
}
} writer.close()
// 写完后更新一些统计信息
context.taskMetrics().incMemoryBytesSpilled(memoryBytesSpilled)
context.taskMetrics().incDiskBytesSpilled(diskBytesSpilled)
context.taskMetrics().incPeakExecutionMemory(peakMemoryUsedBytes) // 返回每个reduce端分区数据在文件中的位移信息
lengths
}
IndexShuffleBlockResolver.writeIndexFileAndCommit
仍然回到SortShuffleWriter.write方法,最后一步调用了IndexShuffleBlockResolver.writeIndexFileAndCommit方法,
这个方法的作用主要是将每个的分区的位移值写入到一个索引文件中,并将临时的索引文件和临时的数据文件重命名为正常的文件名(重命名操作是一个原子操作)
总结
我总结shuffle写数据的过程,可以分为两个主要的步骤:
- 一是在数据写入的过程中会由于内存不足从而溢写多个数据文件到磁盘中,而所有的文件都是按照分区和key排序的,这为第二部归并排序打下基础
- 第二部就是将这些溢写的小文件和最后内存中剩下的数据进行归并排序,然后写入一个大文件中,并且在写入的过程中记录每个分区数据在文件中的位移,
- 最后还要写入一个索引文件,索引文件即记录了每个reduce端分区在数据文件中的位移,这样reduce在拉取数据的时候才能很快定位到自己分区所需要的数据