Apache Spark是一個業界領先的平臺,用于大規模數據的分布式提取、轉換和加載( ETL )工作負載。隨著深度學習( DL )的發展,許多 Spark 從業者試圖將 DL 模型添加到他們的數據處理管道中,以涵蓋各種用例,如銷售預測、內容推薦、情緒分析和欺詐檢測。
然而,結合 DL 培訓和推理,從歷史上看,大規模數據一直是 Spark 用戶面臨的挑戰。大多數 DL 框架都是為單節點環境設計的,它們的分布式訓練和推理 API 通常是經過深思熟慮后添加的。
為了解決單節點 DL 環境和大規模分布式環境之間的脫節,有多種第三方解決方案,如 Horovod-on-Spark、TensorFlowOnSpark 和 SparkTorch,但由于這些解決方案不是在 Spark 中本地構建的,因此用戶必須根據自己的需求評估每個平臺。
隨著 Spark 3.4 的發布,用戶現在可以訪問內置的 API,用于分布式模型訓練和大規模模型推理,如下所述。
分布式培訓
對于分布式培訓,有一個新的 TorchDistributor PyTorch 的 API,它遵循 spark-tensorflow-distributorTensorFlow 的 API。這些 API 通過利用 Spark 的屏障執行模式,在 Spark executors 上生成分布式 DL 集群節點,從而簡化了將分布式 DL 模型訓練代碼遷移到 Spark 的過程。
一旦 Spark 啟動了 DL 集群,控制權就基本上通過main_fn傳遞給TorchDistributorAPI
如下面的代碼所示,使用這個新的 API 在 Spark 上運行標準的分布式 DL 培訓只需要進行最小的代碼更改。
from pyspark.ml.torch.distributor import TorchDistributor def main_fn(checkpoint_dir): # standard distributed PyTorch code ... # Set num_processes = NUM_WORKERS * NUM_GPUS_PER_WORKER output_dist = TorchDistributor(num_processes=2, local_mode=False, use_gpu=True).run(main_fn, checkpoint_dir)
一旦啟動,運行在執行器上的流程就依賴于其各自 DL 框架的內置分布式訓練 API 。將現有的分布式訓練代碼移植到 Spark 應該很少或不需要修改。然后,這些進程可以在訓練期間相互通信,還可以直接訪問與 Spark 集群相關的分布式文件系統(圖 1 )。
圖 1 。分布式培訓使用TorchDistributorAPI
然而,這種遷移的方便性也意味著這些 API 不使用 Spark RDD 或 DataFrames 進行數據傳輸。雖然這消除了在 Spark 和 DL 框架之間轉換或序列化數據的任何需要,但它也要求在啟動訓練作業之前完成任何 Spark 預處理并持久化到存儲中。主要訓練功能可能還需要適于從分布式文件系統而不是本地存儲讀取。
分布式推理
對于分布式推理,有一個新的predict_batch_udfAPI ,它建立在Spark Pandas UDF以便為 DL 模型推斷提供更簡單的接口。 pandas 與基于行的 UDF 相比, UDF 提供了一些優勢,包括通過Apache Arrow以及通過Pandas。有關詳細信息,請參閱Introducing Pandas UDF for PySpark.
然而,盡管 pandas UDF API 可能是 ETL 用例的一個很好的解決方案,但它仍然不適合 DL 推理用例。首先, pandas UDF API 將數據表示為 pandas 系列或數據幀,這同樣適用于執行 ETL 操作,如選擇、排序、數學轉換和聚合。
然而,大多數 DL 框架都期望NumPy數組或標準 Python 數組作為輸入,這些數組通常由自定義張量變量包裝。因此,pandas UDF 實現至少需要將傳入的 pandas 數據轉換為 NumPy 數組。不過,根據用例和數據集的不同,準確的轉換可能會有很大的差異。
其次, pandas UDF API 通常在數據分區上運行,數據分區的大小由數據集的原始寫入者或分布式文件系統決定。因此,很難對傳入的數據進行適當的批處理以進行優化計算。
最后,仍然存在在 Spark 執行器和任務之間加載 DL 模型的問題。在正常的 Spark ETL 工作中,工作負載遵循函數編程范式,其中可以對數據應用無狀態函數。然而,對于 DL 推理,預測函數通常需要從磁盤加載其 DL 模型權重。
Spark 具有通過任務序列化和廣播變量將變量從驅動程序序列化到執行器的能力。然而,這兩者都依賴于 Python pickle 序列化,這可能不適用于所有 DL 模型。此外,如果操作不當,加載和序列化非常大的模型可能會帶來極高的性能成本。
解決當前限制
為了解決這些問題predict_batch_udf引入了以下方面的標準化代碼:
將 Spark 數據幀轉換為 NumPy 數組,因此最終用戶 DL 推理代碼不需要從 pandas 數據幀進行轉換。
為 DL 框架批處理傳入的 NumPy 數組。
在執行器上加載模型,避免了任何模型序列化問題,同時利用 Sparkspark.python.worker.reuse配置以在 Spark 執行器中緩存模型。
下面的代碼演示了這個新的 API 如何隱藏將 DL 推理代碼轉換為 Spark 的復雜性。用戶只需定義make_predict_fn函數,使用標準的 DL API 加載模型并返回predict作用然后predict_batch_udf函數生成一個標準PandasUDF,負責處理幕后的其他一切。
from pyspark.ml.functions import predict_batch_udf def make_predict_fn(): # load model from checkpoint import torch device = torch.device("cuda") model = Net().to(device) checkpoint = load_checkpoint(checkpoint_dir) model.load_state_dict(checkpoint['model']) # define predict function in terms of numpy arrays def predict(inputs: np.ndarray) -> np.ndarray: torch_inputs = torch.from_numpy(inputs).to(device) outputs = model(torch_inputs) return outputs.cpu().detach().numpy() return predict # create standard PandasUDF from predict function mnist = predict_batch_udf(make_predict_fn, input_tensor_shapes=[[1,28,28]], return_type=ArrayType(FloatType()), batch_size=1000) df = spark.read.parquet("/path/to/test/data") preds = df.withColumn("preds", mnist('data')).collect()
請注意,此 API 使用標準 Spark DataFrame 進行推斷,因此執行器將從分布式文件系統讀取數據并將該數據傳遞給predict函數(圖 2 )。這也意味著,根據需要,數據的任何處理都可以與模型預測一起進行。
還要注意,這是一個data-parallel體系結構,其中每個執行器加載模型并對數據集的各自部分進行預測,因此模型必須適合執行器內存。
圖 2 :分布式推理使用predict_batch_udfAPI
Spark 深度學習的端到端示例
-
NVIDIA
+關注
關注
14文章
5075瀏覽量
103530 -
人工智能
+關注
關注
1794文章
47642瀏覽量
239646 -
深度學習
+關注
關注
73文章
5512瀏覽量
121410
發布評論請先 登錄
相關推薦
評論