Shuffle原理及对应的Consolidation优化机制是怎样的

技术Shuffle原理及对应的Consolidation优化机制是怎样的这篇文章给大家介绍Shuffle原理及对应的Consolidation优化机制是怎样的,内容非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所

本文介绍了Shuffle原理和相应的合并优化机制。内容非常详细。有兴趣的朋友可以参考一下。希望对你有帮助。

一、什么是Shuffle?

Shuffle是MapTask和ReduceTask之间的桥梁。MapTask的输出必须经过Shuffle过程,然后才能成为ReduceTask的输入。在分布式集群中,ReduceTask需要跨节点拉取MapTask的输出,涉及数据的网络传输和磁盘IO。因此,Shuffle的质量将直接影响整个应用程序的性能。通常,我们将shuffle过程分为两部分:MapTask端的输出变成shuffle write,而数据ShuffleWrite,ReduceTask端称为Shuffle。

Spark的Shuffle过程基本类似于MapReduce的Shuffle过程原理,有些概念可以直接应用。例如,在Shuffle过程中,提供数据的一端称为Map端,在Map端生成数据的每个任务称为Mapper,相应地,接收数据的一端称为Reduce端。在Reduce端拉取数据的每一个任务都被称为Reducer,Shuffle过程本质上就是用分区设备将Map端获得的数据进行划分,并将数据发送到相应Reducer的过程。

在前一篇文章中,我们已经说过,Spark任务中Stage的划分依据是RDD的宽度依赖;在狭义的从属关系中,父RDD和子RDD分区之间的关系是一对一的。或者当父RDD的分区仅对应于子RDD的分区时,父RDD和子RDD分区之间的关系是多对一。不会有洗牌。父母RDD的一部分归属于孩子RDD的一部分。在广泛的依赖性中,父RDD和子RDD分区之间的关系是一对多的。会有洗牌。父RDD的一个分区的数据转到子RDD的不同分区。

在现实场景中,90%的调音都发生在洗牌阶段,所以这种调音非常重要。

二、普通的Spark HashShuffle原理

先来看一张图,普通的Shuffle原理是怎么工作的,下面我就用这张图给大家讲解一下基本的Shuffle原理:

Shuffle原理及对应的Consolidation优化机制是怎样的

普通Shuffle执行过程:

1..上面有ResultTask、ShuffleMapTask和两个结果任务。Shuflemaptask会根据结果任务数创建对应的桶,桶数为33。

2.其次,ShuffleMapTask生成的结果将按照设定的划分算法填充到每个桶中。这里的分区算法可以自定义。当然默认算法是根据密钥哈希到不同的桶,最后会是ShuffleBlockFIle。

3.ShuffleBlockFIle位置信息由3输出。ShuffleMapTask作为映射状态被发送到DAGScheduler中的映射输出跟踪器的主机。

3.当Shuffleptask启动时,它会根据自身任务的id及其依赖的Shuffleptask的id读取MapOutputTracker中ShuffleBlockFIle的位置信息,最后从远程或本地块管理器中获取对应的ShuffleBlockFIle作为ResultTask的输入进行处理。

如果,的ShuffleMapTask和ResultTask太多,就会生成N*N的小文件,这会导致ShuffleWrite在磁盘文件的创建和磁盘的IO上花费大量的性能,给系统带来很大的压力。我上面画的画不是很好。在这里,我将用文字表达出来:

案例A:如果有4个shufflemptask和4个ResultTask,我的机器只有2个cpu内核,默认每个任务都需要一个cpu运行,所以我的4个shufflemptask会分两批运行,同时只运行两个任务。第一批任务将生成2*4个ShuffleBlockFIle文件,而第二批任务仍将运行。

会生成2*4的ShuffleBlockFIle文件,这样会产生16个小文件。

   另一种情形B:我还是有4个ShuffleMapTask和4个ResultTask,我的机器只有4个cpu或者更多的cpu核数,我的4个ShuffleMapTask就会在同一个批次运行,还是会产生4*4=16个小文件。

存在的问题: 

1.Shuffle前在磁盘上会产生海量的小文件,分布式模式ResultTask去拉取数据时,会产生大量会有过多的小文件创建和磁盘IO操作。

2.可能导致OOM,大量耗时低效的 IO 操作 ,导致写磁盘时的对象过多,读磁盘时候的对象也过多,这些对象存储在堆内存中,会导致堆内存不足,相应会导致频繁的GC,GC会导致OOM。由于内存中需要保存海量文件操作句柄和临时信息,如果数据处理的规模比较庞大的话,内存不可承受,会出现 OOM 等问题。

二、开启Consolidation机制的Spark  HashShuffle原理

    鉴于上面基本Shuffle存在的不足, 在后面的Spark0.81版本开始就引入了Consolidation机制,由参数spark.shuffle.consolidateFiles控制。将其设置为true即可开启优化机制,下面我们就看下优化后的Shuffle是如何处理的:

Shuffle原理及对应的Consolidation优化机制是怎样的

优化的Shuffle原理:

   相当于对于上面的“情形B”做了优化,把在同一core上运行的多个ShuffleMapTask输出的合并到同一个文件,这样文件数目就变成了 cores*ResultTask个ShuffleBlockFile文件了,这里一定要注意同一个批次运行的ShuffleMapTask一定是写的不同的文件,只有不同批次的ShuffleMapTask才会写相同的文件,当第一批ShuffleMapTask运行完成后,后面在同一个cpu core上运行的TShuffleMapTask才会去写上一个在这个cpu core上运行ShuffleMapTask写的那个ShuffleBlockFile文件。

    至此Spark HashShuffle原理及其Consolidation机制讲解完毕,但是如果 Reducer 端的并行任务或者是数据分片过多的话则 Core * Reducer Task 依旧过大,也会产生很多小文件。

    上面的原理都是基于的HashShuffleManager。而Spark1.2.x以后,HashShuffleManager不再是Spark默认的Shuffle Manager,Spark1.2.x以后,Spark默认的Shuffle Manager是SortShuffleManager。在Spark2.0以后 HashShuffleManager已经被弃用。

关于Shuffle原理及对应的Consolidation优化机制是怎样的就分享到这里了,希望

内容来源网络,如有侵权,联系删除,本文地址:https://www.230890.com/zhan/132966.html

(0)

相关推荐

  • Composer执行降级操作的方法是什么

    技术Composer执行降级操作的方法是什么这篇文章主要介绍“Composer执行降级操作的方法是什么”,在日常操作中,相信很多人在Composer执行降级操作的方法是什么问题上存在疑惑,小编查阅了各式资料,整理出简单好

    攻略 2021年10月22日
  • 怎么用OBS和WebSockets播放视频流

    技术怎么用OBS和WebSockets播放视频流这篇文章主要为大家展示了“怎么用OBS和WebSockets播放视频流”,内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让小编带领大家一起研究并学习一下“怎么用OB

    攻略 2021年10月26日
  • CVE20178464 link代码执行漏洞是如何被攻击的?

    技术CVE–2017–8464 LNK 代码执行漏洞是怎么攻击的CVE–2017–8464 LNK 代码执行漏洞是怎么攻击的,很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可

    攻略 2021年12月20日
  • C++怎么将帮助函数和它们支持的类放在同一命名空间

    技术C++怎么将帮助函数和它们支持的类放在同一命名空间这篇文章主要讲解了“C++怎么将帮助函数和它们支持的类放在同一命名空间”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“

    攻略 2021年11月25日
  • 使用Redis之前5个必须了解的事情有哪些

    技术使用Redis之前5个必须了解的事情有哪些这篇文章给大家介绍使用Redis之前5个必须了解的事情有哪些,内容非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助。使用Redis开发应用程序是一个很愉快的过程,

    攻略 2021年11月10日
  • IE7.JS怎样解决IE兼容性问题

    技术IE7.JS怎样解决IE兼容性问题这篇文章给大家介绍 IE7.JS怎样解决IE兼容性问题,内容非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助。和大家重点讨论一下如何使用IE7.JS解决IE兼容性问题,I

    攻略 2021年11月25日