shuffle怎么用()

none

什么时候需要 shuffle writer

假设我们有如下的spark作业依赖关系

我们抽象出rdd和依赖关系:

E & lt;- n -,C & lt- n – D – n – F – s -,A & lt-s-B&中尉;- n – ` – G

相应的划分的RDD结构是:

最后,我们得到了整个执行过程:

中间涉及洗牌过程。前一阶段的ShuffleMapTask执行shuffle write,将数据存储在blockManager上,并将数据位置元信息报告给驱动程序的mapOutTrack组件。下一级根据数据位置元信息执行混洗读取,并拉出前一级的输出数据。

这篇文章是关于无序写入过程的。

spark shuffle 演进的历史Spark 0.8及以前 Hash Based ShuffleSpark 0.8.1 为Hash Based Shuffle引入File Consolidation机制Spark 0.9 引入ExternalAppendOnlyMapSpark 1.1 引入Sort Based Shuffle,但默认仍为Hash Based ShuffleSpark 1.2 默认的Shuffle方式改为Sort Based ShuffleSpark 1.4 引入Tungsten-Sort Based ShuffleSpark 1.6 Tungsten-sort并入Sort Based ShuffleSpark 2.0 Hash Based Shuffle退出历史舞台

综上所述,开始时使用基于散列的混洗。此时,每个映射器将根据reducers的数量创建一个对应的bucket。桶数为M ^ R,其中M为贴图数,R为减速器数。这样会产生大量的小文件,给文件系统造成很大压力,也不利于IO吞吐。后来实在忍无可忍,就进行了优化,将运行在同一个内核上的多个Mapper的输出合并到同一个文件中,这样文件数就变成了cores R,

例如:

最初,三个maptasks和三个reducer会产生九个小文件。

是不是很恐怖,改造之后?

有4个maptasks和4个reducer。如果不使用合并机制,将会生成16个小文件。

但是现在,这四个maptasks分两批在两个内核上运行,只会产生八个小文件。

在同一内核上连续运行的两个maptasks的输出,对应于同一文件的不同段,称为文件段,形成ShuffleBlockFile。

稍后,引入了基于排序的Shuffle,map端的任务将根据分区id和键对记录进行排序。同时将所有结果写入一个数据文件,同时生成一个索引文件。然后,介绍了基于钨排序的Shuffle,它直接使用堆外内存和一种新的内存管理模型,节省了内存空和大量的gc,以提高性能。

2.1现在有三种编写器,分别是Bypassmergesort Shuffle writer、sort Shuffle writer和UnsafeShuffleWriter。顾名思义,我们应该可以互相通信。本着过时不说的原则,本文只描述这三个写手的实现细节。基于Hash的Shuffle已经退出历史舞台,我就不说了。

三种 writer 的分类

以上是判断使用哪个writer,是否开启mapSideCombine的依据,因为有些运算符会先在map端进行合并,以减少数据的传输。因为BypassMergeSortShuffleWriter会临时输出Reducer(分区数)小文件,所以分区数必须小于一个阈值,默认小于200。

UnsafeShuffleWriter需要序列化程序来支持重定位。序列化程序支持重定位:最初序列化原始数据,不再需要相反的顺序。在对其对应的元数据进行排序后,它需要序列化程序来支持重定位,并在指定位置读取对应的数据。

BypassMergeSortShuffleWriter 实现细节

BypassMergeSortShuffleWriter和Hash Shuffle中的Hash Shuffle Writer基本相同。唯一不同的是,地图端的多个输出文件将被汇总到一个文件中。所有分区的数据将被合并到同一个文件中,并生成一个索引文件来索引每个分区的起始地址。可以随机访问某个分区的所有数据。

但需要注意的是,这种方法不能有太多的分区,因为进程中会并发打开所有分区对应的临时文件,对文件系统造成很大压力。

具体实现是给每个分区分配一个临时文件,对每个记录的键使用分区设备(hash模式,如果用户定义的话,使用自定义分区设备)找到对应分区的输出文件句柄,直接写入文件,不需要使用内存中的buffer。最后,copyStream方法将所有的临时分区文件复制到最终的输出文件中,记录每个分区文件的初始写入位置,并将这些位置数据写入索引文件。

SortShuffleWriter 实现细节

我们先考虑一个问题。如果我有100亿条数据,但是我们的内存只有1M,但是我们的磁盘是非常merge的,我们现在要对这100亿条数据进行排序,不可能一次加载到内存就对所有数据进行排序。这涉及到一个外部排序问题。我们1M的内存只能加载1亿条数据,每次只能对这1亿条数据进行排序。排序后,我们可以将它们输出到磁盘,总输出为100。我们可以把每个文件的一部分头数据(按顺序)作为缓冲区,把这100个缓冲区放在一个堆里进行堆排序。比较的方式是比较所有堆元素(缓冲区)的头元素大小,然后不断地弹出堆顶每个缓冲区的头元素输出到最后的文件,然后继续堆排序输出。如果缺少哪个缓冲区空,就去对应的文件继续补充一些数据。最后得到一个全局有序的大文件。

如果你能搞清楚我上面举的例子,你就差不多能搞清楚sortshufflewirter的实现原理了,因为它解决的是同一个问题。

SortShuffleWriter中的处理步骤是

使用 PartitionedAppendOnlyMap 或者 PartitionedPairBuffer 在内存中进行排序, 排序的 K 是(partitionId, hash(key)) 这样一个元组。如果超过内存 limit, 我 spill 到一个文件中,这个文件中元素也是有序的,首先是按照 partitionId的排序,如果 partitionId 相同, 再根据 hash(key)进行比较排序如果需要输出全局有序的文件的时候,就需要对之前所有的输出文件 和 当前内存中的数据结构中的数据进行 merge sort, 进行全局排序

基本上和我们开始的问题差不多。不同的是需要用同一个键聚合元素,也就是使用定义好的func进行聚合。比如你的运算符是reduceByKey(+),这个func是加法运算。如果两个键相同,您将首先找到所有相同的键进行reduce(+)操作,计算一个总结果,然后输出data (k,Result)元素。

tshufflewriter中使用ExternalSorter对内存中的数据进行排序。ExternalSorter维护两个集合,PartitionedAppendOnlyMap和PartitionedPairBuffer,这两个集合都使用哈希表数据结构。如果需要聚合,请使用PartitionedAppendOnlyMap(它支持查找键。如果之前已经存储了同一个键的K-V个元素,则需要进行聚合,然后存储聚合后的K-V);否则,使用PartitionedPairBuffer(仅添加K-V元素)。

我们可以看看上图。PartitionedAppendOnlyMap中的K是(partitionId,K)的元组,所以我们先按partitionId排序,如果partitionId相同,再按hash(key)排序。

首先看AppendOnlyMap。这个只是一个散列表,其中k是一个(PatitionId,hash(Key))元组。当你想放(K,V)的时候,先hash(K)找到存储位置。如果存储位置已经被占用,则使用二次探测方法来寻找下一个空空闲位置。对于图中的K6,在第三次搜索中找到K4后面的空空位置,放进去就行了。Get(K6)类似。三次找到K6,取出下一个V6,用第一个值做func,放回V6。

让我们来看看ExternalAppendOnlyMap结构。这就是如果所有的元素都放不进内存,就涉及到外部排序。

上图可以看到整个原理图,逻辑很简单。内存不足时,先溢出四次,输出到文件的元素都是有序的。在读取时,它们都是按顺序读取的,最后,它们与内存中剩余的数据进行全局合并。

合并过程是每个文件读取一部分数据(StreamBuffer)放入mergeHeap,当前内存中的PartitionedAppendOnlyMap也进行排序,形成sortedMap放入mergeHeap。这个堆是PriorityQueue,排序方式是自定义的,即取出堆元素StreamBuffer的头元素进行大小比较。

val heap = new mutable。PriorityQueue[Iter]()(新排序[Iter] { //使用comparator.compare的反转,因为PriorityQueue使最大覆盖定义比较(x: Iter,y:Iter):Int =-comparator . compare(x . head . _ 1,y.head._1) })

这样每次在堆顶弹出StreamBuffer的head元素就是世界上最小的元素(记住是按照(partitionId,hash(Key))排序的)。如果需要聚合,将这些具有相同键的元素逐个放入mergeBuffers中。放入mergeBuffers的第一个StreamBuffer称为minBuffer,所以minKey是minBuffer中第一条记录的键。当merge-combine时,与minKey具有相同键的记录被聚合,然后输出。

如果不需要聚合,很简单。只需返回从堆顶部的StreamBuffer中弹出的head元素。

在读取结束时,整个全局归并后从读取迭代器中读取的数据是按照partitionId从小到大排序的数据。读取时,按照分区进行分段,记录每个分区文件的初始写入位置,并将这些位置数据写入索引文件。

UnsafeShuffleWriter 实现细节

UnsafeShuffleWriter维护了一个ShuffleExternalSorter用于外部排序。外部排序是对数据进行部分排序并输出到磁盘,然后对数据进行全局排序。既然这也是外部排序,那它和SortShuffleWriter有什么区别呢?这里只根据记录的分区id,先在内存ShuffleInMemorySorter中对排序后的数据进行排序,将排序后的数据序列化、压缩输出到临时文件的一段,并记录每个分区段的寻道位置,这样以后就可以单独读取每个分区的数据,解压缩、反序列化后就可以正常读取读取流。

整个过程就是不断地在ShuffleInMemorySorter中插入数据,如果没有内存,就申请内存,如果没有内存,就溢出到文件中,最后合并成一个按照分区id全局排序的大文件。

SortShuffleWriter和UnsafeShuffleWriter的比较

区分

无保障作家

SortShuffleWriter

sortord

只是最后分区级别的排序。

分区先排序,相同的分区键排序。

聚合

没有反序列化,没有聚合

支持聚合

使用UnsafeShuffleWriter的条件不指定聚合或键排序。因为在排序指针中没有对键进行编码,所以只对分区级别的排序后的原始数据先进行序列化,不需要逆序。在其对应的元数据被排序后,序列化程序需要支持重定位,并在指定位置读取对应的数据。KryoSerializer和spark sql自定义序列化程序支持该特性。分区数必须小于1677216,因为分区号是用24位表示的。因为每个分区使用27位来表示记录偏移量,所以记录不能大于该值。内存分类并输出文件。

让我们看看对记录排序的例子。一个标准的排序步骤需要为记录存储一组指针,并使用quicksort交换指针,直到所有记录都排序完毕。基于顺序扫描的特点,排序通常可以获得较好的缓存命中率。但是,对一组指针进行排序的缓存命中率非常低,因为每次比较操作都需要解引用两个指针,这两个指针对应内存中两个随机位置的数据。

那么,如何在排序中提高缓存局部性呢?方法之一是通过指针顺序存储每条记录的排序键。我们用8个字节(分区id作为键,以及指向数据的实际指针)来表示一段数据,并把它放在一个排序数组中。每次比较排序的时候,我们只需要线性查找每一对pointer-key,这样就不会出现随机扫描的情况。这样,在对所有记录的部分进行排序时,直接对数据进行排序就好了,大大提高了性能。

当然,数据在这里排序。UnsafeShuffleWriter使用RadixSort。这个很简单,就不介绍了。如果不同,请参考http://bubkoo.com/2014/01/15/sort-algorithm/radix-sort/.的这份文件

以上是申请内存的流程。应用的内存作为一个页面记录在分配的页面中。溢出时,释放这些内存,就有一个当前使用的currentPage。如果还不够,继续申请。

可以看看上图。每次在页面中插入一条记录时,都要将Partionid+Page number+Page中的Offset作为一个元素插入LongArray中。最后读取数据时,先用RadixSort对LongArray进行排序,然后依次根据指针元素对原始数据进行索引,这样分区层次就有序了。

创建溢出文件时,UnsafeShuffleInMemorySorter生成一个数据迭代器,它将返回一个按分区id排序的迭代器。这个迭代器粒度的每个元素都是一个指针,对应于数据结构PackedRecordPointer。这个PackedRecordPointer定义的数据结构是[24位分区号][13位内存页号][27位页内偏移量]。然后根据这个指针就可以得到真正的记录,在进入UnsafeShuffleExternalSorter的开始就已经序列化了,所以在这里就变成了一个写字节数组。文件中不同分区的数据由文件段表示,相应的信息存储在SpillInfo数据结构中。

合并文件

每个溢出文件的分区索引保存在SpillInfo数据结构中。在任务结束之前,我们必须执行mergeSpills操作。如果fastMergeEnabled且压缩方法支持压缩数据的串联,则同一分区的压缩数据可以直接简单地连接在一起,无需解压缩和反序列化。使用高效的数据复制技术,比如NIO的transferTo,可以避免解压缩和缓冲区复制。

Shuffle文件寻址MapOutputTracker

MapOutputTracker是Spark架构中的一个模块,这是一个主从架构。管理小磁盘文件的地址。

MapOutputTrackerMaster是主对象,存在于驱动程序中。

MapOutputTrackerWorker是从属对象,存在于Excutor中。

BlockManager

BlockManager是Spark架构中的一个模块,是主从架构。

主对象BlockManagerMaster存在于驱动程序中。

当广播变量和缓存数据被使用或缓存数据在集群中被删除时,BlockManagerMaster将通知BlockManagerSlave转移或删除数据。

从对象BlockManagerWorker存在于Excutor中。

BlockManagerWorker将与BlockManagerWorker通信。

驱动程序端的块管理器或执行程序端的块管理器包含四个对象:

1.DiskStore:负责磁盘管理。

2.MemoryStore:负责内存管理。

3.ConnectionManager:负责连接其他BlockManagerWorker。

4.BlockTransferService:负责数据传输。

混洗文件寻址图

免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。

作者:美站资讯,如若转载,请注明出处:https://www.meizw.com/n/253292.html

发表回复

登录后才能评论