在 LinkedIn,我們非常依賴離線數據分析來進行數據驅動的決策。多年來,Apache Spark 已經成為 LinkedIn 的主要計算引擎,以滿足這些數據需求。憑借其獨特的功能,Spark 為 LinkedIn 的許多關鍵業務提供支持,包括數據倉庫、數據科學、AI/ML、A/B 測試和指標報告。需要大規模數據分析的用例數量也在快速增長。從 2017 年到現在,LinkedIn 的 Spark 使用量同比增長了大約 3 倍。因此,LinkedIn 的 Spark 引擎現在運行在一個龐大的基礎設施之上。我們的生產集群有 10,000 多個節點,Spark 作業現在占集群計算資源使用量的 70% 以上,每日處理的數據量達數十 PB。解決擴展性挑戰以確保我們的 Spark 計算基礎設施可持續發展是 LinkedIn Spark 團隊工作的核心。
盡管 Apache Spark 有很多好處,這使得它在 LinkedIn 和整個行業中都很受歡迎,但在我們的數據規模下使用 Spark 時,我們仍然遇到了一些挑戰。正如我們在 Spark + AI Summit 2020 演講中所述,這些挑戰涉及多個層面,包括計算資源管理、計算引擎可擴展性和用戶生產率。我們將在這篇博文中關注 Spark shuffle 可擴展性挑戰,并介紹 Magnet,一種新穎的基于推送模式的 shuffle 服務(push-based shuffle service)。
Shuffle 基礎知識
數據 shuffle 是 MapReduce 計算范式中的一項重要操作,為 Apache Spark 和許多其他現代大數據計算引擎提供動力。shuffle 操作基本上是通過相應階段的 map 和 reduce 任務之間的 all-to-all 連接來傳輸中間數據。通過 shuffle,數據將根據每個記錄的分區鍵中的值在所有的 shuffle 分區上進行適當的分區。如圖 1 所示:
雖然 shuffle 操作的基本概念很簡單,但不同的計算引擎采用了不同的方法來實現它。在 LinkedIn,我們在 Apache YARN 之上運行 Spark,并利用 Spark 的External Shuffle Service (ESS) 來進行 shuffle 操作。如圖 2 所示:
通過這種設置,計算集群中的每個節點都部署了一個 Spark ESS 實例。當 Spark executors 運行 shuffle 的 mapper 任務時,這些任務會在本地磁盤上生成 shuffle 文件。每個 shuffle 文件由多個 shuffle 塊組成,每個塊代表屬于相應 shuffle 分區的數據。生成這些 shuffle 文件后,本地 Spark ESS 實例知道在哪里可以找到這些 shuffle 文件以及不同 mapper 任務生成的各個 shuffle 塊。當 Spark 執行器開始運行同一個 shuffle 的 reducer 任務時,這些任務將從 Spark driver 獲取有關在哪里可以找到 shuffle 塊的信息作為它們的任務輸入。然后這些 reducer 會向遠程 Spark ESS 實例發送請求,嘗試獲取它們對應的 shuffle 塊。Spark ESS 收到此類請求后,將從本地磁盤讀取相應的 shuffle 塊并將數據發送回 reducer。
挑戰
Spark 現有的 shuffle 機制在性能和容錯要求之間取得了很好的平衡。然而,當在我們的規模下運行 Spark shuffle 時,我們經歷了多個挑戰,這使得 shuffle 操作成為我們基礎設施中的擴展和性能瓶頸。
可靠性問題
第一個挑戰是可靠性問題。在我們的生產集群中,由于計算節點數據量大和 shuffle 工作負載的規模,我們注意到集群高峰時段的 shuffle 服務可用性問題。這可能會導致 shuffle fetch 失敗,從而導致昂貴的 stage 重試,這可能非常具有破壞性,因為它們會導致工作流 SLA 違規和作業失敗。圖 3 進一步說明了這一點,其中顯示了由于 shuffle 導致的每日 Spark stage 失敗的數量。到 2019 年底,這一趨勢越來越嚴重,每天有數百到一千多個 stage 因為這個失敗。
效率問題
第二個挑戰是效率問題。在 LinkedIn,我們將 shuffle 文件存儲在 HDD 上。由于 reducer 的 shuffle fetch 請求是隨機到達的,因此 shuffle 服務也會隨機訪問 shuffle 文件中的數據。如果單個 shuffle 塊大小較小,則 shuffle 服務產生的小隨機讀取會嚴重影響磁盤吞吐量,從而延長 shuffle fetch 等待時間。這也在下圖中進行了說明。正如下圖所示,在 2020 年 3 月至 8 月之間,我們生產集群的 10-20% 的計算資源浪費在 shuffle 上,在等待遠程 shuffle 數據時處于空閑狀態。
擴展性問題
第三個挑戰是擴展問題。由于 external shuffle service 是我們基礎架構中的共享服務,因此一些對 shuffle services 錯誤調優的作業也會影響其他作業。當一個作業錯誤地配置導致產生許多小的 shuffle blocks 將會給 shuffle 服務帶來壓力時,它不僅會給自身帶來性能下降,還會使共享相同 shuffle 服務的所有相鄰作業的性能下降。這可能會導致原本正常運行的作業出現不可預測的運行時延遲,尤其是在集群高峰時段。
Magnet shuffle service
為了解決這些問題,我們設計并實現了 Magnet,這是一種新穎的基于推送(push-based)的 shuffle 服務。Magnet 項目在今年早些時候作為 VLDB 2020 上發表的工業跟蹤論文首次向社區亮相,您可以在此處閱讀我們的 VLDB 論文:Magnet: Push-based Shuffle Service for Large-scale Data Processing。最近,我們正在將 Magnet 回饋給 Apache Spark 社區。這篇博文的其余部分將介紹 Magnet 背后的高級設計及其在生產中的性能,但感興趣的讀者可以到 SPARK-30602 中獲取有關該工作的更新以及 SPIP 文檔以獲取實現級別的詳細信息。
Push-based shuffle
Magnet shuffle 服務背后的核心思想是基于推送的 shuffle 概念,其中 mapper 生成的 shuffle 塊也被推送到遠程 shuffle 服務,以按每個 shuffle 分區進行合并。push-based shuffle 的 shuffle 寫入路徑如下圖所示:
在 map 任務生成其 shuffle 文件后,它準備將 shuffle 塊推送到遠程 ESS。它將 shuffle 文件中的連續塊組裝成 MB 大小的塊,并將該組數據推到相應的 ESS。大于特定大小的 Shuffle 塊將被跳過,因此我們不會推送可能來自大型傾斜分區的塊。map 任務以一致的方式確定這個分組和相應的 ESS 目的地,從而將屬于同一個 shuffle 分區的不同 mappers 的塊推到同一 ESS。分組完成后,這些塊的傳輸將移交給專用線程池,然后 map 任務就完成了。通過這種方式,我們將任務執行線程與塊傳輸線程解耦,在 I/O 密集型數據傳輸和 CPU 密集型任務執行之間實現了更好的并行性。ESS 接受遠程推送的 shuffle 塊,并將相同分區的 shuffle 數據合并到相應的 shuffle 文件中。這是以盡力而為的方式完成的,這并不能保證所有塊都被合并。但是,ESS 確實保證在合并期間不會發生數據重復或損壞。
在基于推送的 shuffle 數據讀取路徑上,reduce 任務可以從合并的 shuffle 文件和 map 任務生成的原始 shuffle 文件中獲取其任務輸入(如上圖)。ESS 在從合并的 shuffle 文件中讀取時可以執行大的順序 I/O 而不是小的隨機 I/O,從而顯著提高 I/O 效率。利用這一點,reduce 任務更愿意從合并的 shuffle 文件中獲取它們的輸入。由于塊推送/合并過程是盡力而為的,reduce 任務可以使用未合并的塊來填充合并的 shuffle 文件中的任何漏洞。如果合并的 shuffle 文件變得不可用,他們甚至可以完全回退到獲取未合并的塊。Magnet 的高效磁盤 I/O 模式進一步為構建高性能 Spark 集群提供了更大的靈活性,因為它更少依賴 SSD 來實現良好的 shuffle 性能。
Spark driver 負責協調 map 和 reduce 任務中基于推送的 shuffle。在 shuffle 寫入路徑上,Spark driver 為給定 shuffle 的 map 任務確定要使用的 ESS 列表。這個 ESS 列表作為任務上下文的一部分發送到 Spark executors,這使 map 任務能夠在塊組和遠程 ESS 目的地之間提出上述一致的映射。Spark 驅動程序進一步協調 map 和 reduce 階段之間的轉換。一旦所有的 map 任務都完成了,Spark 驅動程序會等待一段可配置的時間,然后通知所有選擇的 ESS 進行這次 shuffle 以完成合并操作。當 ESS 收到終結請求時,它停止接受來自給定 shuffle 的任何新塊。它還向驅動程序響應每個最終 shuffle 分區的元數據列表,其中包括有關合并的 shuffle 文件的位置和大小信息以及指示哪些塊已合并的位圖。一旦 Spark 驅動程序從所有 ESS 接收到此類元數據,它就會啟動 reduce 階段。此時,Spark 驅動程序擁有 shuffle 數據位置的完整視圖,現在在合并的 shuffle 文件和原始 shuffle 文件之間進行了 2 次復制。Spark 驅動程序利用此信息來協調 reduce 任務的輸入位置。此外,合并的 shuffle 文件的位置為 reduce 任務創建了自然的位置偏好。Spark 驅動程序利用該信息可以在存儲了 shuffle 信息的機器上啟動 reduce 任務,如下圖所示:
push-based shuffle 的優勢
Push-based shuffle 為 Spark shuffle 帶來了幾個關鍵好處。
提高磁盤 I/O 效率
使用 push-based shuffle,shuffle 服務在訪問 shuffle 文件中的 shuffle 數據時,從小的隨機讀取切換到大的順序讀取,顯著提高了磁盤 I/O 效率,特別是對于基于 HDD 的 shuffle 存儲。在 shuffle 寫路徑上,即使對小塊進行兩次 shuffle 數據寫入,整體的 I/O 效率還是有提升的。這是因為小的隨機寫入可以從多個級別的緩存中受益,例如操作系統頁面緩存和磁盤緩沖區。因此,小隨機寫入可以實現比小隨機讀取高得多的吞吐量。改進的磁盤 I/O 效率的效果反映在本博文后面顯示的性能數據中。關于 I/O 效率提升的更詳細分析包含在我們的 VLDB 論文中。
緩解 shuffle 的可靠性/可擴展性問題
Spark 原生 shuffle 操作要成功,需要每個 reduce 任務成功地從所有 map 任務中獲取每個相應的 shuffle 塊,這在擁有數千個節點的繁忙集群中通常無法滿足。Magnet 通過多種方式實現了更好的 shuffle 可靠性:
?Magnet 采用盡力而為的方法來合并塊。塊推送/合并過程中的失敗不會影響 shuffle 的過程。
?通過基于推送的 shuffle,Magnet 有效地生成了 shuffle 中間數據的第二個副本。只有在無法從原始 shuffle 文件或合并的 shuffle 文件中獲取 shuffle 塊時,才會發生 shuffle fetch 失敗。
通過 reduce 任務的位置感知調度,它們通常在合并后的 shuffle 文件所在的機器上啟動,這允許它們繞過 ESS 讀取 shuffle 數據。這使得 reduce 任務對 ESS 可用性或性能問題更具彈性,從而緩解前面提到的可擴展性問題。
在塊推送過程中處理 stragglers
在 Spark 的普通 shuffle 操作中,由于多個任務通常是并發運行的,任務中的掉隊者(即一些任務運行速度明顯慢于其他任務)的影響可以被其他任務隱藏。使用基于推送的 shuffle,如果在塊推送操作中有任何落后者,它可能會暫停執行很長時間的作業。這是因為塊推送操作介于 shuffle map 和 reduce 階段之間。當存在落后者時,可能根本沒有任務在運行。然而,通過提前終止技術,Magnet 可以在塊推送過程中有效地處理掉隊者。Magnet 不是等待 push 過程完全完成,而是限制它在 shuffle map 和 reduce 階段之間等待的時間。Magnet 的盡力而為的特性使其能夠容忍由于提前終止而未合并的塊。如下圖所示:
與 Spark 原生集成
Magnet 與 Spark 原生集成,這帶來了多種好處:
?Magnet 不依賴于其他外部系統。這有助于簡化 Magnet shuffle 服務的部署、監控和生產。
?通過與 Spark 的 shuffle 系統的原生集成,Magnet shuffle 服務中的元數據可以暴露給 Spark 驅動程序。這使 Spark 驅動程序能夠實現更好的性能(通過任務的位置感知調度)和更好的容錯(通過回退到原始 shuffle 塊)。
?Magnet 與現有的 Spark 功能(例如自適應查詢執行)配合得很好。Spark AQE 的承諾之一是能夠動態優化 skew join,這也需要對 shuffle 進行特殊處理。Spark AQE 會在多個 reducer 任務之間劃分一個傾斜的 shuffle 分區,每個任務只從 mapper 任務的一個子范圍中獲取 shuffle 塊。由于合并后的 shuffle 文件不再保持每個單獨的 shuffle 塊的原始邊界,因此無法按照 Spark AQE 要求的方式劃分合并后的 shuffle 文件。由于 Magnet 保留原始 shuffle 文件和合并后的 shuffle 文件,因此它可以委托 AQE 處理偏斜分區,同時優化非偏斜分區的 shuffle 操作。
性能對比
我們在 LinkedIn 上使用真實的生產作業評估了 Magnet 的性能,我們看到了非常不錯的結果。在下表中,我們展示了運行一個 ML 特征生成作業的性能結果,該作業具有數十個 shuffle 階段和接近 2 TB 的 shuffle 數據。與 Spark 中的原生 shuffle 相比,Magnet 取得了非常好的性能結果。請注意,Magnet 將 shuffle fetch 等待時間減少了 98%。這可以通過 Magnet 的高效隨機磁盤 I/O 和減少任務的位置感知調度來實現。此外,這個作業并不完全使用 Spark SQL,因為它混合使用了 Spark SQL 和非 SQL 代碼組成其計算邏輯。在優化 shuffle 操作時,Magnet 只承擔很少的工作本身。
Total shuffle fetch wait time (min)Total executor task runtime (min)End-to-end job runtime (min)
Vanilla Spark shuffle206365077142
Magnet shuffle445 (-98%)29928 (-41%)31 (-26%)
我們還在 LinkedIn 引入了含有大量 shuffle 的作業。我們的一個生產集群中估計有 15% 的 shuffle 工作負載已遷移到 Magnet。在這些作業中,我們看到 shuffle fetch 等待時間、任務總運行時間和作業端到端運行時間也相應的減少。如下圖所示,啟用 Magnet 的 Spark 作業平均減少了 3-4 倍的隨機提取等待時間。此外,我們已經看到本地訪問的 shuffle 數據量增加了大約 10 倍,這表明基于推送的 shuffle 大大改善了數據局部性。最后,我們已經看到作業運行時間在集群高峰時段變得更加穩定。隨著我們加入更多的作業,Magnet 將更多的 shuffle 工作負載轉換為優化路徑,從而減輕了 shuffle 服務的壓力,并為集群帶來更多好處。另一方面,Magnet 可能會將 shuffle 臨時存儲需求加倍。我們正在通過為 shuffle 文件切換到 zstd 壓縮編解碼器來緩解這種情況,與默認壓縮編解碼器相比,這有可能將 shuffle 文件大小減少 50%。
結論和未來工作
在這篇博文中,我們介紹了 Magnet shuffle 服務,這是 Apache Spark 的下一代 shuffle 架構。Magnet 提高了 Spark 中 shuffle 操作的整體效率、可靠性和可擴展性。最近,我們也看到了業界針對 shuffle 過程提出的其他解決方案,比如 Cosco、Riffle、Zeus 和 Sailfish。我們在 VLDB 論文中對 Magnet 和其他這些解決方案進行了比較,尤其是 Cosco、Riffle 和 Sailfish。
未來,我們還將考慮在其他部署環境和計算引擎中提供基于 Magnet 推送的 shuffle。我們當前的集群部署模式為存儲和計算是在一起的。隨著 LinkedIn 正在向 Azure 遷移,我們也在評估計算和存儲分離的集群中基于推送的 shuffle 的方法。此外,我們目前基于推送的 shuffle 的設計主要針對批處理引擎,我們也在考慮它對流引擎的適用性。
感謝
需要一個專門的團隊才能將 Magnet 規模的項目帶來曙光。除了 Min Shen、Ye Zhou 和 Chandni Singh 的努力外,該項目還得到了 Venkata Krishnan Sowrirajan 和 Mridul Muralidharan 的重大貢獻。Erik Krogen、Ron Hu、Minchu Yang 和 Zoe Lin 為 Magnet 的生產部署和可觀察性改進做出了貢獻。特別感謝 Yuval Degani 構建的 GridBench——這個工具使得理解各種因素對作業運行時的影響變得非常容易。特別感謝我們的合作伙伴團隊,尤其是 Jan Bob 和 Qun Li 的團隊,他們是 Magnet 的早期使用者。
像 Magnet 這樣的大型基礎設施工作需要管理層做出重大而持續的支持。Sunitha Beeram、Zhe Zhang、Vasanth Rajamani、Eric Baldeschwieler、Kapil Surlakar 和 Igor Perisic:感謝您的堅定支持和指導。Magnet 的設計也得益于與 Sriram Rao 和 Shirshanka Das 的評論和深入討論。
Magnet 得到 Apache Spark 社區的大力支持。感謝與 Databricks 的合作以及來自眾多社區成員的評論。
責任編輯:haq
-
數據
+關注
關注
8文章
7134瀏覽量
89387 -
Shuffle
+關注
關注
0文章
5瀏覽量
1705
原文標題:Magnet:即將隨 Apache Spark 3.2 發布的高性能外部 Shuffle 服務
文章出處:【微信號:DBDevs,微信公眾號:數據分析與開發】歡迎添加關注!文章轉載請注明出處。
發布評論請先 登錄
相關推薦
評論