在 MapReduce 框架中, Shuffle 階段是連接 Map 與 Reduce 之間的橋梁, Map 階段通過 Shuffle 過程將數據輸出到 Reduce 階段中。由于 Shuffle 涉及磁盤的讀寫和網絡 I/O,因此 Shuffle 性能的高低直接影響整個程序的性能。Spark 也有 Map 階段和 Reduce 階段,因此也會出現 Shuffle 。
Spark Shuffle
Spark Shuffle 分為兩種:一種是基于 Hash 的 Shuffle;另一種是基于 Sort 的 Shuffle。先介紹下它們的發展歷程,有助于我們更好的理解 Shuffle:
在 Spark 1.1 之前, Spark 中只實現了一種 Shuffle 方式,即基于 Hash 的 Shuffle 。在 Spark 1.1 版本中引入了基于 Sort 的 Shuffle 實現方式,并且 Spark 1.2 版本之后,默認的實現方式從基于 Hash 的 Shuffle 修改為基于 Sort 的 Shuffle 實現方式,即使用的 ShuffleManager 從默認的 hash 修改為 sort。在 Spark 2.0 版本中, Hash Shuffle 方式己經不再使用。
Spark 之所以一開始就提供基于 Hash 的 Shuffle 實現機制,其主要目的之一就是為了避免不需要的排序,大家想下 Hadoop 中的 MapReduce,是將 sort 作為固定步驟,有許多并不需要排序的任務,MapReduce 也會對其進行排序,造成了許多不必要的開銷。
在基于 Hash 的 Shuffle 實現方式中,每個 Mapper 階段的 Task 會為每個 Reduce 階段的 Task生成一個文件,通常會產生大量的文件(即對應為 M*R 個中間文件,其中, M 表示 Mapper階段的 Task 個數, R 表示 Reduce 階段的 Task 個數) 伴隨大量的隨機磁盤 I/O 操作與大量的內存開銷。
為了緩解上述問題,在 Spark 0.8.1 版本中為基于 Hash 的 Shuffle 實現引入了 ShuffleConsolidate 機制(即文件合并機制),將 Mapper 端生成的中間文件進行合并的處理機制。通過配置屬性 spark.shuffie.consolidateFiles=true,減少中間生成的文件數量。通過文件合并,可以將中間文件的生成方式修改為每個執行單位為每個 Reduce階段的 Task 生成一個文件。
執行單位對應為:每個 Mapper 端的 Cores 數/每個 Task分配的 Cores 數(默認為 1) 。最終可以將文件個數從 MR 修改為 EC/T*R,其中,E 表示 Executors 個數, C 表示可用 Cores 個數, T 表示 Task 分配的 Cores 數。
Spark1.1 版本引入了 Sort Shuffle:
基于 Hash 的 Shuffle 的實現方式中,生成的中間結果文件的個數都會依賴于 Reduce 階段的 Task 個數,即 Reduce 端的并行度,因此文件數仍然不可控,無法真正解決問題。為了更好地解決問題,在 Spark1.1 版本引入了基于 Sort 的 Shuffle 實現方式,并且在 Spark 1.2 版本之后,默認的實現方式也從基于 Hash 的 Shuffle,修改為基于 Sort 的 Shuffle 實現方式,即使用的 ShuffleManager 從默認的 hash 修改為 sort。
在基于 Sort 的 Shuffle 中,每個 Mapper 階段的 Task 不會為每 Reduce 階段的 Task 生成一個單獨的文件,而是全部寫到一個數據(Data)文件中,同時生成一個索引(Index)文件, Reduce 階段的各個 Task 可以通過該索引文件獲取相關的數據。
避免產生大量文件的直接收益就是降低隨機磁盤 I/0 與內存的開銷。最終生成的文件個數減少到 2M ,其中 M 表示 Mapper 階段的 Task 個數,每個 Mapper 階段的 Task 分別生成兩個文件(1 個數據文件、 1 個索引文件),最終的文件個數為 M 個數據文件與 M 個索引文件。因此,最終文件個數是 2M 個。
從 Spark 1.4 版本開始,在 Shuffle 過程中也引入了基于 Tungsten-Sort 的 Shuffie 實現方式,通 Tungsten 項目所做的優化,可以極大提高 Spark 在數據處理上的性能。(Tungsten 翻譯為中文是鎢絲)
注:在一些特定的應用場景下,采用基于 Hash 實現 Shuffle 機制的性能會超過基于 Sort 的 Shuffle 實現機制。
一張圖了解下 Spark Shuffle 的迭代歷史:
Spark Shuffle 迭代歷史
為什么 Spark 最終還是放棄了 HashShuffle ,使用了 Sorted-Based Shuffle?
我們可以從 Spark 最根本要優化和迫切要解決的問題中找到答案,使用 HashShuffle 的 Spark 在 Shuffle 時產生大量的文件。當數據量越來越多時,產生的文件量是不可控的,這嚴重制約了 Spark 的性能及擴展能力,所以 Spark 必須要解決這個問題,減少 Mapper 端 ShuffleWriter 產生的文件數量,這樣便可以讓 Spark 從幾百臺集群的規模瞬間變成可以支持幾千臺,甚至幾萬臺集群的規模。
但使用 Sorted-Based Shuffle 就完美了嗎,答案是否定的,Sorted-Based Shuffle 也有缺點,其缺點反而是它排序的特性,它強制要求數據在 Mapper 端必須先進行排序,所以導致它排序的速度有點慢。好在出現了 Tungsten-Sort Shuffle ,它對排序算法進行了改進,優化了排序的速度。Tungsten-SortShuffle 已經并入了 Sorted-Based Shuffle,Spark 的引擎會自動識別程序需要的是 Sorted-BasedShuffle,還是 Tungsten-Sort Shuffle。
下面詳細剖析每個 Shuffle 的底層執行原理:
一、Hash Shuffle 解析
以下的討論都假設每個 Executor 有 1 個 cpu core。
1. HashShuffleManager
shuffle write 階段,主要就是在一個 stage 結束計算之后,為了下一個 stage 可以執行 shuffle 類的算子(比如 reduceByKey),而將每個 task 處理的數據按 key 進行“劃分”。所謂“劃分”,就是對相同的 key 執行 hash 算法,從而將相同 key 都寫入同一個磁盤文件中,而每一個磁盤文件都只屬于下游 stage 的一個 task。在將數據寫入磁盤之前,會先將數據寫入內存緩沖中,當內存緩沖填滿之后,才會溢寫到磁盤文件中去。
下一個 stage 的 task 有多少個,當前 stage 的每個 task 就要創建多少份磁盤文件。比如下一個 stage 總共有 100 個 task,那么當前 stage 的每個 task 都要創建 100 份磁盤文件。如果當前 stage 有 50 個 task,總共有 10 個 Executor,每個 Executor 執行 5 個 task,那么每個 Executor 上總共就要創建 500 個磁盤文件,所有 Executor 上會創建 5000 個磁盤文件。由此可見,未經優化的 shuffle write 操作所產生的磁盤文件的數量是極其驚人的。
shuffle read 階段,通常就是一個 stage 剛開始時要做的事情。此時該 stage 的每一個 task 就需要將上一個 stage 的計算結果中的所有相同 key,從各個節點上通過網絡都拉取到自己所在的節點上,然后進行 key 的聚合或連接等操作。
由于 shuffle write 的過程中,map task 給下游 stage 的每個 reduce task 都創建了一個磁盤文件,因此 shuffle read 的過程中,每個 reduce task 只要從上游 stage 的所有 map task 所在節點上,拉取屬于自己的那一個磁盤文件即可。
shuffle read 的拉取過程是一邊拉取一邊進行聚合的。每個 shuffle read task 都會有一個自己的 buffer 緩沖,每次都只能拉取與 buffer 緩沖相同大小的數據,然后通過內存中的一個 Map 進行聚合等操作。聚合完一批數據后,再拉取下一批數據,并放到 buffer 緩沖中進行聚合操作。以此類推,直到最后將所有數據到拉取完,并得到最終的結果。
HashShuffleManager 工作原理如下圖所示:
未優化的HashShuffleManager工作原理
2. 優化的 HashShuffleManager
為了優化 HashShuffleManager 我們可以設置一個參數:spark.shuffle.consolidateFiles,該參數默認值為 false,將其設置為 true 即可開啟優化機制,通常來說,如果我們使用 HashShuffleManager,那么都建議開啟這個選項。
開啟 consolidate 機制之后,在 shuffle write 過程中,task 就不是為下游 stage 的每個 task 創建一個磁盤文件了,此時會出現shuffleFileGroup的概念,每個 shuffleFileGroup 會對應一批磁盤文件,磁盤文件的數量與下游 stage 的 task 數量是相同的。
一個 Executor 上有多少個 cpu core,就可以并行執行多少個 task。而第一批并行執行的每個 task 都會創建一個 shuffleFileGroup,并將數據寫入對應的磁盤文件內。
當 Executor 的 cpu core 執行完一批 task,接著執行下一批 task 時,下一批 task 就會復用之前已有的 shuffleFileGroup,包括其中的磁盤文件,也就是說,此時 task 會將數據寫入已有的磁盤文件中,而不會寫入新的磁盤文件中。
因此,consolidate 機制允許不同的 task 復用同一批磁盤文件,這樣就可以有效將多個 task 的磁盤文件進行一定程度上的合并,從而大幅度減少磁盤文件的數量,進而提升 shuffle write 的性能。
假設第二個 stage 有 100 個 task,第一個 stage 有 50 個 task,總共還是有 10 個 Executor(Executor CPU 個數為 1),每個 Executor 執行 5 個 task。那么原本使用未經優化的 HashShuffleManager 時,每個 Executor 會產生 500 個磁盤文件,所有 Executor 會產生 5000 個磁盤文件的。但是此時經過優化之后,每個 Executor 創建的磁盤文件的數量的計算公式為:cpu core的數量 * 下一個stage的task數量,也就是說,每個 Executor 此時只會創建 100 個磁盤文件,所有 Executor 只會創建 1000 個磁盤文件。
這個功能優點明顯,但為什么 Spark 一直沒有在基于 Hash Shuffle 的實現中將功能設置為默認選項呢,官方給出的說法是這個功能還欠穩定。
優化后的 HashShuffleManager 工作原理如下圖所示:
優化后的HashShuffleManager工作原理
基于 Hash 的 Shuffle 機制的優缺點
優點:
可以省略不必要的排序開銷。
避免了排序所需的內存開銷。
缺點:
生產的文件過多,會對文件系統造成壓力。
大量小文件的隨機讀寫帶來一定的磁盤開銷。
數據塊寫入時所需的緩存空間也會隨之增加,對內存造成壓力。
二、SortShuffle 解析
SortShuffleManager 的運行機制主要分成三種:
普通運行機制;
bypass 運行機制,當 shuffle read task 的數量小于等于spark.shuffle.sort.bypassMergeThreshold參數的值時(默認為 200),就會啟用 bypass 機制;
Tungsten Sort 運行機制,開啟此運行機制需設置配置項 spark.shuffle.manager=tungsten-sort。開啟此項配置也不能保證就一定采用此運行機制(后面會解釋)。
1. 普通運行機制
在該模式下,數據會先寫入一個內存數據結構中,此時根據不同的 shuffle 算子,可能選用不同的數據結構。如果是 reduceByKey 這種聚合類的 shuffle 算子,那么會選用 Map 數據結構,一邊通過 Map 進行聚合,一邊寫入內存;
如果是 join 這種普通的 shuffle 算子,那么會選用 Array 數據結構,直接寫入內存。接著,每寫一條數據進入內存數據結構之后,就會判斷一下,是否達到了某個臨界閾值。如果達到臨界閾值的話,那么就會嘗試將內存數據結構中的數據溢寫到磁盤,然后清空內存數據結構。
在溢寫到磁盤文件之前,會先根據 key 對內存數據結構中已有的數據進行排序。排序過后,會分批將數據寫入磁盤文件。默認的 batch 數量是 10000 條,也就是說,排序好的數據,會以每批 1 萬條數據的形式分批寫入磁盤文件。
寫入磁盤文件是通過 Java 的 BufferedOutputStream 實現的。BufferedOutputStream 是 Java 的緩沖輸出流,首先會將數據緩沖在內存中,當內存緩沖滿溢之后再一次寫入磁盤文件中,這樣可以減少磁盤 IO 次數,提升性能。
一個 task 將所有數據寫入內存數據結構的過程中,會發生多次磁盤溢寫操作,也就會產生多個臨時文件。最后會將之前所有的臨時磁盤文件都進行合并,這就是merge 過程,此時會將之前所有臨時磁盤文件中的數據讀取出來,然后依次寫入最終的磁盤文件之中。
此外,由于一個 task 就只對應一個磁盤文件,也就意味著該 task 為下游 stage 的 task 準備的數據都在這一個文件中,因此還會單獨寫一份索引文件,其中標識了下游各個 task 的數據在文件中的 start offset 與 end offset。
SortShuffleManager 由于有一個磁盤文件 merge 的過程,因此大大減少了文件數量。比如第一個 stage 有 50 個 task,總共有 10 個 Executor,每個 Executor 執行 5 個 task,而第二個 stage 有 100 個 task。由于每個 task 最終只有一個磁盤文件,因此此時每個 Executor 上只有 5 個磁盤文件,所有 Executor 只有 50 個磁盤文件。
普通運行機制的 SortShuffleManager 工作原理如下圖所示:
普通運行機制的SortShuffleManager工作原理
2. bypass 運行機制
Reducer 端任務數比較少的情況下,基于 Hash Shuffle 實現機制明顯比基于 Sort Shuffle 實現機制要快,因此基于 Sort Shuffle 實現機制提供了一個帶 Hash 風格的回退方案,就是 bypass 運行機制。對于 Reducer 端任務數少于配置屬性spark.shuffle.sort.bypassMergeThreshold設置的個數時,使用帶 Hash 風格的回退計劃。
bypass 運行機制的觸發條件如下:
shuffle map task 數量小于spark.shuffle.sort.bypassMergeThreshold=200參數的值。
不是聚合類的 shuffle 算子。
此時,每個 task 會為每個下游 task 都創建一個臨時磁盤文件,并將數據按 key 進行 hash 然后根據 key 的 hash 值,將 key 寫入對應的磁盤文件之中。當然,寫入磁盤文件時也是先寫入內存緩沖,緩沖寫滿之后再溢寫到磁盤文件的。最后,同樣會將所有臨時磁盤文件都合并成一個磁盤文件,并創建一個單獨的索引文件。
該過程的磁盤寫機制其實跟未經優化的 HashShuffleManager 是一模一樣的,因為都要創建數量驚人的磁盤文件,只是在最后會做一個磁盤文件的合并而已。因此少量的最終磁盤文件,也讓該機制相對未經優化的 HashShuffleManager 來說,shuffle read 的性能會更好。
而該機制與普通 SortShuffleManager 運行機制的不同在于:第一,磁盤寫機制不同;第二,不會進行排序。也就是說,啟用該機制的最大好處在于,shuffle write 過程中,不需要進行數據的排序操作,也就節省掉了這部分的性能開銷。
bypass 運行機制的 SortShuffleManager 工作原理如下圖所示:
bypass運行機制的SortShuffleManager工作原理
3. Tungsten Sort Shuffle 運行機制
Tungsten Sort 是對普通 Sort 的一種優化,Tungsten Sort 會進行排序,但排序的不是內容本身,而是內容序列化后字節數組的指針(元數據),把數據的排序轉變為了指針數組的排序,實現了直接對序列化后的二進制數據進行排序。由于直接基于二進制數據進行操作,所以在這里面沒有序列化和反序列化的過程。內存的消耗大大降低,相應的,會極大的減少的 GC 的開銷。
Spark 提供了配置屬性,用于選擇具體的 Shuffle 實現機制,但需要說明的是,雖然默認情況下 Spark 默認開啟的是基于 SortShuffle 實現機制,但實際上,參考 Shuffle 的框架內核部分可知基于 SortShuffle 的實現機制與基于 Tungsten Sort Shuffle 實現機制都是使用 SortShuffleManager,而內部使用的具體的實現機制,是通過提供的兩個方法進行判斷的:
對應非基于 Tungsten Sort 時,通過 SortShuffleWriter.shouldBypassMergeSort 方法判斷是否需要回退到 Hash 風格的 Shuffle 實現機制,當該方法返回的條件不滿足時,則通過 SortShuffleManager.canUseSerializedShuffle 方法判斷是否需要采用基于 Tungsten Sort Shuffle 實現機制,而當這兩個方法返回都為 false,即都不滿足對應的條件時,會自動采用普通運行機制。
因此,當設置了 spark.shuffle.manager=tungsten-sort 時,也不能保證就一定采用基于 Tungsten Sort 的 Shuffle 實現機制。
要實現 Tungsten Sort Shuffle 機制需要滿足以下條件:
Shuffle 依賴中不帶聚合操作或沒有對輸出進行排序的要求。
Shuffle 的序列化器支持序列化值的重定位(當前僅支持 KryoSerializer Spark SQL 框架自定義的序列化器)。
Shuffle 過程中的輸出分區個數少于 16777216 個。
實際上,使用過程中還有其他一些限制,如引入 Page 形式的內存管理模型后,內部單條記錄的長度不能超過 128 MB (具體內存模型可以參考 PackedRecordPointer 類)。另外,分區個數的限制也是該內存模型導致的。
所以,目前使用基于 Tungsten Sort Shuffle 實現機制條件還是比較苛刻的。
編輯:jq
-
cpu
+關注
關注
68文章
10879瀏覽量
212194 -
數據
+關注
關注
8文章
7081瀏覽量
89181 -
SPARK
+關注
關注
1文章
105瀏覽量
19927 -
Shuffle
+關注
關注
0文章
5瀏覽量
1703
原文標題:Spark 的兩種核心 Shuffle 詳解(建議收藏)
文章出處:【微信號:DBDevs,微信公眾號:數據分析與開發】歡迎添加關注!文章轉載請注明出處。
發布評論請先 登錄
相關推薦
評論