背景簡介
Apache Spark(下文簡稱Spark)是一種開源集群計算引擎,支持批/流計算、SQL分析、機器學習、圖計算等計算范式,以其強大的容錯能力、可擴展性、函數式API、多語言支持(SQL、Python、Java、Scala、R)等特性在大數據計算領域被廣泛使用。其中,Spark SQL 是 Spark 生態系統中的一個重要組件,它允許用戶以結構化數據的方式進行數據處理,提供了強大的查詢和分析功能。
隨著SSD和萬兆網卡普及以及IO技術的提升,CPU計算逐漸成為Spark 作業的瓶頸,而IO瓶頸則逐漸消失。 有以下幾個原因,首先,因為 JVM 提供的 CPU 指令級的優化如 SIMD要遠遠少于其他 Native 語言(如C/C++,Rust)導致基于 JVM 進行 CPU 指令的優化比較困難。其次,NVMe SSD緩存技術和AQE帶來的自動優化shuffle極大的減輕了IO延遲。最后,Spark的謂詞下推優化跳過了不需要的數據,進一步減少了IO開銷。
基于此背景,Databricks(Spark背后的商業公司)在2022年SIGMOD會議上發表論文《Photon: A Fast Query Engine for Lakehouse Systems》,其核心思想是使用C++、向量化執行等技術來執行Spark物理計劃,在客戶工作負載上獲得了平均3倍、最大10倍的性能提升,這證明Spark向量化及本地化是后續值得優化的方向。 Spark3.0(2020年6月發布)開始支持了數據的列式處理,英偉達也提出了利用GPU加速Spark的方案,利用GPU的列式計算和并發能力加速Join、Sort、Aggregate等常見的ETL操作。
DPU(Data Processing Unit) 作為未來計算的三大支柱之一,其設計旨在提供強大的計算能力,以加速各種數據處理任務。DPU的硬件加速能力,尤其在數據計算、數據過濾等計算密集型任務上,為處理海量數據提供了新的可能。通過高度定制和優化的架構,DPU能夠在處理大規模數據時顯著提升性能,為數據中心提供更高效、快速的計算體驗,從而滿足現代數據處理需求的挑戰。但是目前DPU對Spark生態不能兼容,Spark計算框架無法利用DPU的計算優勢。
中科馭數HADOS 異構計算加速軟件平臺(下文簡稱HADOS)是一款敏捷異構軟件平臺,能夠為網絡、存儲、安全、大數據計算等場景進行提速。對于大數據計算場景,HADOS可以認為是一個異構執行庫,提供了數據類型、向量數據結構、表達式計算、IO和資源管理等功能。 為了發揮Spark與DPU各自的優勢,基于HADOS平臺,我們開發了RACE算子卸載引擎,既能夠發揮Spark優秀的分布式調度能力又可以發揮DPU的向量化執行能力。
我們通過實驗發現,將Spark SQL的計算任務通過RACE卸載到DPU上, 預期可以把原生SparkSQL的單表達式的執行效率提升至9.97倍,TPC-DS單Query提升最高4.56倍。本文將介紹如何基于 DPU和RACE來加速 Spark SQL的查詢速度,為大規模數據分析和處理提供更可靠的解決方案。
整體架構
整個解決方案可以參考下圖:
? 最底層硬件資源層是DPU硬件,是面向數據中心的專用處理器,其設計旨在提供強大的計算能力,以加速各種數據處理任務,尤其是優化Spark等大數據框架的執行效率。通過高度定制和優化的架構,DPU能夠在處理大規模數據時顯著提升性能,為數據中心提供更高效、快速的計算體驗。
? DPU加速層底層是HADOS異構計算加速軟件平臺,是中科馭數推出的專用計算敏捷異構軟件開發平臺。HADOS數據查詢加速庫通過提供基于列式數據的查詢接口,供數據查詢應用。支持Java、Scala、C和C++語言的函數調用,主要包括列數據管理、數據查詢運行時函數、任務調度引擎、函數運算代價評估、內存管理、存儲管理、硬件管理、DMA引擎、日志引擎等模塊,目前對外提供數據管理、查詢函數、硬件管理、文件存儲相關功能API。
? DPU加速層中的RACE層,其最核心的能力就是修改執行計劃樹,通過 Spark Plugin 的機制,將Spark 執行計劃攔截并下發給 DPU來執行,跳過原生 Spark 不高效的執行路徑。整體的執行框架仍沿用 Spark 既有實現,包括消費接口、資源和執行調度、查詢計劃優化、上下游集成等。
? 最上層是面向用戶的原生Spark,用戶可以直接使用已有的業務邏輯,無感享受DPU帶來的性能提升
目前支持的算子覆蓋Spark生產環境常用算子,包括Scan、Filter、Project、Union、Hash Aggregation、Sort、Join、Exchange等。表達式方面,我們開發了目前生產環境常用的布爾函數、Sum/Count/AVG/Max/Min等聚合函數。
其中RACE層的架構如下:
下面我們著重介紹RACE層的核心功能。
核心功能模塊
RACE與Spark的集成
RACE作為Spark的一個插件,實現了SparkPlugin接口,與Spark的集成分為Driver端和Executor端。
? 在Driver端, 通過Spark Catalyst擴展點插入自定義的規則,實現對查詢語句解析過程、優化過程以及物理計劃轉換過程的控制。
? 在Executor端, 插件在Executor的初始化過程中完成DPU設備的初始化工作。
Plan Conversion
Spark SQL在優化 Physical Plan時,會應用一批規則,RACE通過插入的自定義規則可以攔截到優化后的Physical Plan,如果發現當前算子上的所有表達式可以下推給DPU,那么替換Spark原生算子為相應的可以在DPU上執行的自定義算子,由HADOS將其下推給DPU 來執行并返回結果。
Fallback
Spark支持的Operator和Expression非常多,在RACE研發初期,無法 100% 覆蓋 Spark 查詢執行計劃中的算子和表達式,因此 RACE必須有Fallback機制,支持Spark 查詢執行計劃中部分算子不運行在DPU上。
對于DPU無法執行的算子,RACE安排 Fallback 回正常的 Spark 執行路徑進行計算。例如,下圖中展示了插件對原生計劃樹的修改情況,可以下推給DPU的算子都替換成了對應的"Dpu"開頭的算子,不能下推的算子仍然保留。除此之外,會自動插入行轉列算子或者列轉行算子來適配數據格式的變化。
當然了,不管是行轉列算子還是列轉行算子,都是開銷比較大的算子,隨著RACE支持的算子和表達式越來越多,Fallback的情況會逐漸減少。
Strategy
當查詢計劃中存在未卸載的算子時,因為這樣引入了行列轉換算子,由于其帶來了額外的開銷,導致即使對于卸載到DPU上的算子,其性能得到提升,而對于整個查詢來說,可能會出現比原生Spark更慢的情況。 針對這種情況,最穩妥的方式就是整個Query全部回退到CPU,這至少不會比原生Spark慢,這是很重要的。
由于Spark3.0加入了AQE的支持,規則通常攔截到的是一個個QueryStage,它是Physical Plan的一部分而非完整的 Physical Plan。 RACE的策略是獲取AQE規則介入之前的整個Query的 Physical Plan,然后分析該Physical Plan中的算子是否全部可卸載。如果全部可以卸載,則對QueryStage進行Plan Conversion, 如果不能全部卸載,則跳過Plan Conversion轉而直接交給Spark處理。
我們在實際測試過程中發現,一些算子例如Take操作,它需要處理的數據量非常小,那么即使發生Fallback,也不會有很大的行列轉換開銷,通過白名單機制忽略這種算子,防止全部回退到CPU,達到加速目的。
Metrics
RACE會收集DPU執行過程中的指標統計,然后上報給Spark的Metrics System做展示,以方便Debug和系統調優。
Native Read&Write
SparkSQL的Scan算子支持列式讀取,但是Spark的向量與DPU中定義的向量不兼容,需要在JVM中進行一次列轉行然后拷貝到DPU中,這會造成巨大的IO開銷。我們主要有以下優化:
1. 減少行列轉換:對于Parquet格式等列式存儲格式的文件讀取,SparkSQL采用的是按列讀取的方式,即Scan算子是列式算子,但是后續數據過濾等數據處理算子均是基于行的算子,SparkSQL必須把列式數據轉換為行式數據,這會導致額外的計算開銷。而本方案由于都是列式計算的算子,因此無需這種行列轉換。
2. 減少內存拷貝: RACE卸載Scan算子到HADOS平臺,HADOS平臺的DPUScan算子以Native庫的方式加載磁盤數據直接復制到DPU,省去了JVM到DPU的拷貝開銷
3. 謂詞下推支持:DPUScan也支持ColumnPruning規則,這個規則會確保只有真正使用到的字段才會從這個數據源中提取出來。支持兩種Filter:PartitionFilters和PushFilters。PartitionFilters可以過濾掉無用的分區, PushFilters把字段直接下推到Parquet文件中去
4. 同時,文件的寫出也進行了類似的優化
注意,這些優化仍然需要對數據進行一次復制,DPU直接讀取磁盤是一個后續的優化方向。
加速效果
TPC-DS 單Query加速
單機單線程local模式場景,在1T數據集下,TPC-DS語句中有5條語句E2E時間提升比例超過2倍,最高達到4.56倍:
運算符加速效果
運算符的性能提升,DPU運算符相比Spark原生的運算符的加速比最高達到9.97。
算子加速效果
TPC-DS的測試中,向對于原生Spark解決方案,本方案Filter算子性能最高提高到了43倍,哈希聚合算子提升了13倍。這主要是因為我們節省了列式數據轉換為行式數據的開銷以及DPU運算的加速。
CPU資源使用情況
CPU資源從平均60%下降到5%左右
原生Spark方案CPU使用情況:
基于RACE和DPU加速后,CPU使用情況:
總結與展望
通過把Spark的計算卸載到DPU加速器上,在用戶原有代碼無需變更的情況下,端到端的性能可以得到2-5倍的提升,某些算子能達到43倍性能提升,同時CPU資源使用率從60%左右下降到5%左右,顯著提升了原生SparkSQL的執行效率。DPU展現了強大的計算能力,對于端到端的分析,會有一些除去算子之外的因素影響整體運行時間,包括磁盤IO,網絡Shuffle以及調度的Overhead。這些影響因素將來可以逐步去做特定的優化,例如:
1. 算子的Pipeline執行
原生Spark的算子Pipeline執行以及CodeGen都是Spark性能提升的關鍵技術,當前,我們卸載到DPU中的計算還沒有支持Pipeline以及CodeGen。未來這兩個技術的加入,是繼續提升Spark的執行效率的一個方向。
2. 讀數據部分,通過DPU卡直讀磁盤數據來做優化
我們還可以通過DPU卡直接讀取硬盤數據,省去主機DDR到DPU卡DDR的數據傳輸時間,以達到性能提升的效果,可以參考英偉達的GPU對磁盤讀寫的優化,官方數據CSV格式的文件讀取可優化20倍左右。
3. RDMA技術繼續提升Shuffle性能
對于Shuffle占比很高的作業,可以通過內存Shuffle以及RDMA技術,來提升整個Shuffle的過程,目前已經實現內存Shuffle,未來我們還可以通過RDMA技術直讀遠端內存數據,從而完成整個Shuffle鏈路的優化。
審核編輯 黃宇
-
DPU
+關注
關注
0文章
365瀏覽量
24215 -
SPARK
+關注
關注
1文章
105瀏覽量
19928 -
RACE
+關注
關注
0文章
2瀏覽量
2354
發布評論請先 登錄
相關推薦
評論