adayan0 2018-07-20
ShuffleManager的主要职责是shuffle过程的执行、计算和处理。包括HashShuffleManager和SortShuffleManager。1.2版本以前的Spark使用HashShuffleManager,1.2版本以后使用SortShuffleManager。
1.未经优化的HashShuffleManager
在shuffle write阶段,也就是一个stage结束之后,每个task对自己处理的数据进行哈希,根据哈希结果,将相同key的数据写入同一个磁盘文件,每个磁盘文件属于下一个stage的一个task。
写入数据时,首先将数据写入内存缓冲中,缓冲填满后,溢写到磁盘中。
假设下一个stage的task的数量为100,当前stage的每个task都将创建100个磁盘文件,假设当前stage的task数量为50,则将创建5000个磁盘文件。这样数量巨大的磁盘文件会导致大量IO影响性能。
接下来介绍shuffle read的过程,当一个stage开始时,这个stage的每个task会去上游stage的所有task所在节点通过网络拉取属于自己的磁盘文件,然后进行聚合或连接等操作。
shuffle read是一边拉取一边聚合的,每个shuffle read task有自己的buffer,每次拉取和buffer同样大数据,在内存中使用Map进行聚合。聚合完一批数据后再拉取下一批数据。
2.优化的HashShuffleManager
通过设置spark.shuffle.consolidateFiles参数HashShuffleManager的优化。
在shuffle write阶段,第一批并行执行的task会创建shuffleFileGroup,并将数据写入。当这批rask执行完后,下一批task复用这些shuffleFileGroup。
也就是说,假设每个Executor只有一个CPU core,每个Executor分配5个task,但由于CPU core的数量每次只能并行执行一个task,假设有10个Executor,并发执行的task数量为10,创建10个shuffleFileGroup,下一批task复用这些shuffleFileGroup。假设下游stage有100个task,则创建磁盘文件数量为1000个。
3.SortShuffleManager(普通运行机制)
在shuffle write阶段,首先将数据写入一个内存数据结构,如果达到某个临界阈值,就溢写到磁盘中。
溢写到磁盘之前,先对key进行排序。排序过后,分批将数据写到磁盘。这样可以减少磁盘IO数量,提高性能。
上游的每个task再将数据写入内存数据结构时会发生多次磁盘溢写操作,所以会产生多个临时磁盘文件。接下来进行merge,对所有磁盘文件合并。最终,每个task对应一个磁盘文件。还需要写一份索引文件,标识下游各个task的数据在文件中的start offset 和end offset。
假设上游stage有50个task,不管下游有多少task,磁盘文件数量为50。
4.SortShuffleManager(bypass运行机制)
bypass机制出发条件:
(1)shuffle write task数量小于spark.shuffle.sort.bypassMergeThreshold。
(2)不是聚合类shuffle算子。
上游task为每个下游task创建一个临时磁盘文件,然后将所有磁盘文件合并,并创建索引文件。