分布式框架下的數據處理與模型推理實踐
概念
Ray 是一個開源的高性能分布式計算框架,旨在簡化大規模分布式應用的開發和運行。它提供了靈活的任務調度、資源管理以及并行計算能力,使開發者能夠輕松實現從單機到多節點的計算擴展。Ray 支持多種場景,包括分布式機器學習訓練、強化學習、超參數優化、大規模數據處理和實時模型部署等。Ray 的核心概念是通過統一的 API,使開發者專注于邏輯開發,而不必關心底層的分布式實現細節。
通過模塊化設計,Ray 集成了多個功能強大的庫,如Ray Data(數據處理)、 Ray Train(分布式訓練)、Ray Tune(超參數優化)、Ray Serve(模型部署)、RLlib(強化學習)等,為開發者提供了一站式的分布式計算解決方案。無論是構建 AI 應用還是解決復雜并行計算問題,Ray 都是一種高效且易用的選擇。Ray 的模塊化設計將復雜功能分解為獨立子系統,常用模塊包括:
- (1)Ray Core:基礎的分布式任務調度和資源管理。
- (2)Ray Data:高效的分布式數據處理模塊。
- (3)Ray Train:支持主流框架的分布式機器學習訓練工具。
- (4)Ray Tune:超參數優化庫,支持大規模調優任務。
- (5)Ray Serve:用于實時部署 AI 模型的高性能工具。
- (6)RLlib:分布式強化學習庫,適用于復雜環境中的智能決策任務。
數據處理與模型推理
Ray Data 的分布式數據處理
在大模型訓練過程中的數據處理階段,往往有著海量結構化與非結構化數據,這些數據難以快速、有效的處理,造成數據處理效率低下、數據質量不高等問題。智算數據平臺基于Ray分布式框架的計算能力,提供了大量常用的分布式數據處理算子,并且實現文本、圖片、音頻、視頻等多種類型數據的流水線處理,為模型訓練提供高質量預訓練數據。除了基本的數據處理算子外,數據平臺還將通過引入多模態大模型,補齊了處理多模態數據能力。
在大模型訓練中,數據預處理的效率直接影響訓練速度和模型性能。Ray Data 是一個強大的分布式數據處理模塊,該模塊支持多種文件類型的讀寫,如json、parquet等格式的文本、圖片、視頻等。Ray Data存儲數據的格式是基于Apache Arrow,一種高效的列式數據格式,相比Python數據處理常用的pandas,在讀取和寫入大型文件時會更加高效。集成了map,filter等分布式計算算子,提供了高效的數據處理,適用于海量數據的處理任務。此外,Ray Data還支持批處理功能,加速數據處理。在和Actor配合使用的場景下,可以減少請求次數,提高了處理效率。
功能亮點
支持多數據格式:可以處理文本、圖片、音頻、視頻等多模態數據。
分布式操作:通過內置的算子(如 map、filter、groupby 等),實現數據分片和高效并行計算。
與主流工具集成:兼容 Pandas、Spark,方便數據遷移和開發。
示例:分布式數據預處理代碼
import ray
from ray.data import read_parquet
ray.init() # 初始化 Ray
# 加載數據集
dataset = read_parquet("s3://bucket/data.parquet")
# 數據清洗和特征處理
processed_data = (
dataset.map(lambda x: {"text_length": len(x["text"])})
.filter(lambda x: x["text_length"] > 50)
)
# 分布式存儲
processed_data.write_parquet("s3://bucket/processed_data.parquet")
Ray Data 的設計理念是利用分布式集群的算力,在保留開發者熟悉的數據操作方式的同時,提升數據處理的速度和擴展性。
GPU 分配與分布式推理
在模型推理場景下,特別是針對大模型的批量推理,資源分配和高效利用至關重要。Ray 提供了靈活的 @ray.remote 裝飾器,支持任務的分布式調度以及 GPU 資源的高效管理。
示例:使用llava模型進行分布式推理代碼
from transformers import LlavaNextProcessor, LlavaNextForConditionalGeneration
from PIL import Image
import torch
import ray
import os
# 初始化 Ray
ray.init()
@ray.remote(num_gpus=1) # 請求 1 個 GPU
def run_model_on_gpu(picture):
processor = LlavaNextProcessor.from_pretrained("llava-hf/llama3-llava-next-8b-hf")
model = LlavaNextForConditionalGeneration.from_pretrained("llava-hf/llama3-llava-next-8b-hf", torch_dtype=torch.float16, device_map="auto")
# prepare image and text prompt, using the appropriate prompt template
image = Image.open(picture)
# Define a chat histiry and use `apply_chat_template` to get correctly formatted prompt
# Each value in "content" has to be a list of dicts with types ("text", "image")
conversation = [
{
"role": "user",
"content": [
{"type": "text", "text": "描述這張圖片的內容"},
{"type": "image"},
],
},
]
prompt = processor.apply_chat_template(conversation, add_generation_prompt=True)
inputs = processor(images=image, text=prompt, return_tensors="pt").to(model.device)
# autoregressively complete prompt
output = model.generate(**inputs, max_new_tokens=1000)
return processor.decode(output[0], skip_special_tokens=True)
pic_list = ["/root/llava/img1.png","/root/llava/img2.png"]
results = ray.get([run_model_on_gpu.remote(pic) for pic in pic_list])
print(results)
# 關閉 Ray
ray.shutdown()
在運行時,我們使用nvidia-smi命令,可以查看到有兩個ray子進程在并發處理推理任務。
最后輸出結果如下,通過以下結果與原圖的對比,我們可以直觀地看到使用 Ray 進行分布式推理的流程和效果。
輸出結果
具體而言,Ray 能夠高效地將推理任務拆分到多個 GPU 上進行并行處理,每個 GPU 僅需處理分配的部分數據,這大幅提高了推理效率和整體吞吐量。
例如,在多張圖片的分布式推理任務中,每一張圖片被分發到不同的 GPU 節點上,分別執行推理操作。最終,結果被匯總生成完整的推理輸出。在這一過程中:
推理結果:圖片描述的準確性保持一致,基于分布式的推理不會影響結果的質量。
性能提升:推理時間縮短了一半以上,分布式計算的效率得到驗證。
可擴展性:只要增加更多的 GPU 資源,Ray 的調度器即可自動分配更多的計算任務,實現:超越單機多卡的算力水平
當然,以上只是Ray分布式的一些基礎應用,為了說明Ray在分布式推理中的可行性。我們也可以使用如Ray Data模塊的map_batch去做離線的批量分布式推理,提高數據的處理效率;再比如通過Ray Server模塊去部署大模型服務,做到支持實時推理,支持多用戶并發請求等更多功能。
隨著 NPU(如華為 Ascend 系列)等新型加速硬件的崛起,Ray 也擴展了對異構設備的支持。在集群中,Ray 也可以識別并調度到帶有 NPU 的節點運行任務,利用國產芯片的高效性能支持大規模 AI 應用。
總結
Ray 作為新一代分布式計算框架,通過模塊化設計和簡單易用的接口,極大地降低了開發分布式應用的門檻。在數據處理方面,Ray Data 提供了強大的分布式數據處理能力,可以高效地處理海量多模態數據。在模型推理方面,Ray Core的Remote函數和資源調度功能使開發者能夠充分利用集群中的 GPU,快速構建分布式推理服務。
隨著 AI 的快速發展,Ray 已成為開發者構建大規模分布式系統的核心工具。在未來的應用中,無論是處理復雜的數據管道還是優化模型推理性能,Ray 都將發揮越來越重要的作用。
本文轉載自 ??AI遇見云??,作者: 景泓斐
