使用Ray創建高效的深度學習數據管道
用于訓練深度學習模型的GPU功能強大但價格昂貴。為了有效利用GPU,開發者需要一個高效的數據管道,以便在GPU準備好計算下一個訓練步驟時盡快將數據傳輸到GPU。使用Ray可以大大提高數據管道的效率。
1、訓練數據管道的結構
首先考慮下面的模型訓練偽代碼。
for step in range(num_steps):
sample, target = next(dataset) # 步驟1
train_step(sample, target) # 步驟2
在步驟1中,獲取下一個小批量的樣本和標簽。在步驟2中,它們被傳遞給train_step函數,該函數會將它們復制到GPU上,執行前向傳遞和反向傳遞以計算損失和梯度,并更新優化器的權重。
接下來詳細了解一下步驟1。當數據集太大無法放入內存時,步驟1將從磁盤或網絡中獲取下一個小批量數據。此外,步驟1還涉及一定量的預處理——輸入數據必須轉換為數字張量或張量集合,然后再饋送給模型。在某些情況下,在將它們傳遞給模型之前,張量上還會應用其他轉換(例如歸一化、繞軸旋轉、隨機打亂等)。
如果工作流程是嚴格按順序執行的,即先執行步驟1,然后再執行步驟2,那么模型將始終需要等待下一批數據的輸入、輸出和預處理操作。GPU將無法得到有效利用,它將在加載下一個小批量數據時處于空閑狀態。
為了解決這個問題,可以將數據管道視為生產者——消費者的問題。數據管道生成小批量數據并寫入有界緩沖區。模型/GPU從緩沖區中消費小批量數據,執行前向/反向計算并更新模型權重。如果數據管道能夠以模型/GPU消費的速度快速生成小批量數據,那么訓練過程將會非常高效。
圖片
2、Tensorflow tf.data API
Tensorflow tf.data API提供了一組豐富的功能,可用于高效創建數據管道,使用后臺線程獲取小批量數據,使模型無需等待。僅僅預先獲取數據還不夠,如果生成小批量數據的速度比GPU消費數據的速度慢,那么就需要使用并行化來加快數據的讀取和轉換。為此,Tensorflow提供了交錯功能以利用多個線程并行讀取數據,以及并行映射功能使用多個線程對小批量數據進行轉換。
由于這些API基于多線程,因此可能會受到Python全局解釋器鎖(GIL)的限制。Python GIL限制了Python解釋器一次只能運行單個線程的字節碼。如果在管道中使用純TensorFlow代碼,通常不會受到這種限制,因為TensorFlow核心執行引擎在GIL的范圍之外工作。但是,如果使用的第三方庫沒有發布GIL或者使用Python進行大量計算,那么依賴多線程來并行化管道就不可行。
3、使用多進程并行化數據管道
考慮以下生成器函數,該函數模擬加載和執行一些計算以生成小批量數據樣本和標簽。
def data_generator():
for _ in range(10):
# 模擬獲取
# 從磁盤/網絡
time.sleep(0.5)
# 模擬計算
for _ in range(10000):
pass
yield (
np.random.random((4, 1000000, 3)).astype(np.float32),
np.random.random((4, 1)).astype(np.float32)
)
接下來,在虛擬的訓練管道中使用該生成器,并測量生成小批量數據所花費的平均時間。
generator_dataset = tf.data.Dataset.from_generator(
data_generator,
output_types=(tf.float64, tf.float64),
output_shapes=((4, 1000000, 3), (4, 1))
).prefetch(tf.data.experimental.AUTOTUNE)
st = time.perf_counter()
times = []
for _ in generator_dataset:
en = time.perf_counter()
times.append(en - st)
# 模擬訓練步驟
time.sleep(0.1)
st = time.perf_counter()
print(np.mean(times))
據觀察,平均耗時約為0.57秒(在配備Intel Core i7處理器的Mac筆記本電腦上測量)。如果這是一個真實的訓練循環,GPU的利用率將相當低,它只需花費0.1秒進行計算,然后閑置0.57秒等待下一個批次數據。
為了加快數據加載速度,可以使用多進程生成器。
from multiprocessing import Queue, cpu_count, Process
def mp_data_generator():
def producer(q):
for _ in range(10):
# 模擬獲取
# 從磁盤/網絡
time.sleep(0.5)
# 模擬計算
for _ in range(10000000):
pass
q.put((
np.random.random((4, 1000000, 3)).astype(np.float32),
np.random.random((4, 1)).astype(np.float32)
))
q.put("DONE")
queue = Queue(cpu_count()*2)
num_parallel_processes = cpu_count()
producers = []
for _ in range(num_parallel_processes):
p = Process(target=producer, args=(queue,))
p.start()
producers.append(p)
done_counts = 0
while done_counts < num_parallel_processes:
msg = queue.get()
if msg == "DONE":
done_counts += 1
else:
yield msg
queue.join()
現在,如果測量等待下一個小批次數據所花費的時間,得到的平均時間為0.08秒。速度提高了近7倍,但理想情況下,希望這個時間接近0。
如果進行分析,可以發現相當多的時間都花在了準備數據的反序列化上。在多進程生成器中,生產者進程會返回大型NumPy數組,這些數組需要進行準備,然后在主進程中進行反序列化。能否在進程之間傳遞大型數組時提高效率?
4、使用Ray并行化數據管道
這就是Ray發揮作用的地方。Ray是一個用于在Python中運行分布式計算的框架。它帶有一個共享內存對象存儲區,可在不同進程間高效地傳輸對象。特別的是,在不進行任何序列化和反序列化的情況下,對象存儲區中的Numpy數組可在同一節點上的worker之間共享。Ray還可以輕松實現數據加載在多臺機器上的擴展,并使用Apache Arrow高效地序列化和反序列化大型數組。
Ray帶有一個實用函數from_iterators,可以創建并行迭代器,開發者可以用它包裝data_generator生成器函數。
import ray
def ray_generator():
num_parallel_processes = cpu_count()
return ray.util.iter.from_iterators(
[data_generator]*num_parallel_processes
).gather_async()
使用ray_generator,測量等待下一個小批量數據所花費的時間為0.02秒,比使用多進程處理的速度提高了4倍。