Spark Tungsten揭秘 Day2 Tungsten-sort Based Shuffle

时间:2022-05-22 16:37:32

Spark Tungsten揭秘 Day2

Tungsten-sort Based Shuffle

今天在对钨丝计划思考的基础上,讲解下基于Tungsten的shuffle。

首先解释下概念,Tungsten-sort是对普通sort的一种优化,排序的不是内容本身,而是内容序列化后字节数组的指针(元数据),把数据的排序转变为了指针数组的排序,实现了直接对序列化后的二进制数据进行排序。由于直接基于二进制数据进行操作,所以在这里面没有序列化和反序列化的过程。内存的消耗大大降低,相应的,会极大的减少的gc的开销。

Page的管理

要做到这种,jvm的内存管理结构无法完成,所以提出了Page的概念。

Page是由block组成的,我们先看一下Block的结构,可以看到,除了记录page编号外,Block内部组成是MemoryLocation。
Spark Tungsten揭秘 Day2 Tungsten-sort Based Shuffle

在MemoryLocation中,重要的就是记录了对象及初始位置的定位offset。实际运行可以onheap或者offheap(用NIO或者Tachyon管理)。

Spark Tungsten揭秘 Day2 Tungsten-sort Based Shuffle

在shuffle角度,都是统一在SortShuffleManager中进行构造。可以看到,在如下位置构造了UnsafeShuffleWriter,但没有UnsafeShuffleReader,从Tungsten角度讲,reader使用的是HashShuffleReader。
Spark Tungsten揭秘 Day2 Tungsten-sort Based Shuffle

从注释中,可以看到数据一旦进来,就使用shuffle write进行序列化,在序列化的二进制基础上进行排序,这样就可以减少内存的GC。这种优化需要我们的序列化器可以在不反序列化的情况下重新排序。
Spark Tungsten揭秘 Day2 Tungsten-sort Based Shuffle

数据写入

让我们进入UnsafeShuffleWriter

会通过MyByteArrayOutputStream直接对内存操作
Spark Tungsten揭秘 Day2 Tungsten-sort Based Shuffle

在write方法中,会循环记录,写入Sorter。
Spark Tungsten揭秘 Day2 Tungsten-sort Based Shuffle

Spark Tungsten揭秘 Day2 Tungsten-sort Based Shuffle

其中,serBuffle默认大小是1M,而且已经是序列化之后的数据了。
Spark Tungsten揭秘 Day2 Tungsten-sort Based Shuffle

在插入前,首先会分配内存,之后会根据每条数据,采用游标的方式进行遍历,并计算找到recordAddress,完成插入操作。
Spark Tungsten揭秘 Day2 Tungsten-sort Based Shuffle

在内存分配时,会有两种分配方式UNSAFE和HEAP,内部各有一套自己的内存评估机制
Spark Tungsten揭秘 Day2 Tungsten-sort Based Shuffle
Spark Tungsten揭秘 Day2 Tungsten-sort Based Shuffle

此外,recordAddress是有一套自己的编解码方式。
Spark Tungsten揭秘 Day2 Tungsten-sort Based Shuffle

最终在插入时,仅仅是存放了一个RecordPointer,也就是数据指针。
Spark Tungsten揭秘 Day2 Tungsten-sort Based Shuffle

小结

在具体插入操作的时候,以Page为核心单位,从Page角度讲,插入记录的时候,本身也有location和大小,需要找到page中指针的位置。在整个内存中有多个Page,每个Page有限定的大小,满了之后会分配下一个Page。从jvm角度讲,最底层的数据结构是字节数组,所以outputStream和序列化都是对字节数组来操作的。进行shuffle操作的时候,实际是对指针进行操作,这是没有序列化和反序列化的关键。数据量也少,所以内存使用率低,大大减少了GC。

最后,说明下,即使配置了Tungsten shuffle,在一些情况也会自动变成sort-based shuffle,从数据结构角度讲,限制蛮多,记录不能太大,单条记录不能超过128M,shuffle的时候中间过程不能产生太多的小文件,不能超过160W,aggregation或者输出后需要排序的操作也不可以。

欲知后事如何,且听下回分解!

DT大数据每天晚上20:00YY频道现场授课频道68917580