Разница в измерении задержки (torch.)Multiprocessing.QueuePython

Программы на Python
Ответить
Anonymous
 Разница в измерении задержки (torch.)Multiprocessing.Queue

Сообщение Anonymous »

Я пишу программу, которая требует конвейерной передачи тензоров графического процессора между процессами.
Я знал, что использование torch.Multiprocessing автоматически получит для меня CudaIpcMemHandle и отправит его через канал под капотом Multiprocessing.Queue.
Я хотел измерить задержку каждого элемента, проходящего через очередь.
Я реализовал этот код main_high.py :
Этот метод заключается в том, что когда производитель помещает элемент в очередь, он также добавляет временную метку time.perf_counter_ns() в кортеж, отправляемый через очередь.

Код: Выделить всё

import csv
import os
import time
from typing import Dict, List, Tuple

import torch
import torch.multiprocessing as mp

def producer(queue: mp.Queue, num_items: int) -> None:
# float32 uses 4 bytes per element -> 5 MiB tensor.
num_elements = (5 * 1024 * 1024) // 4
tensor_bytes = num_elements * torch.tensor([], dtype=torch.float32).element_size()
print(f"[producer] creating a new tensor per item, size={tensor_bytes} bytes")

for item_index in range(num_items):
tensor = torch.rand(num_elements, dtype=torch.float32, device="cuda")
# Ensure tensor materialization is complete before starting transfer timing.
torch.cuda.synchronize()

ts_start = time.perf_counter_ns()
print(f"start time, item {item_index}: {ts_start} ns")
queue.put(("data", item_index, tensor, ts_start))

del tensor

queue.put(("done",))

def consumer(in_queue: mp.Queue) -> None:
item_count = 0
producer_done = 0
latencies_ms = {}
while True:
item = in_queue.get()
ts_end = time.perf_counter_ns()
if item[0] == "done":
break

_, item_index, tensor, ts_start = item

latency_ms = (ts_end - ts_start) / 1e6
latencies_ms[item_index] = latency_ms

item_count += 1
del tensor

with open("latencies_high.csv", "w", newline="") as f:
writer = csv.writer(f)
writer.writerow(["item_index", "latency_ms"])
for item_index, latency_ms in sorted(latencies_ms.items()):
writer.writerow([item_index, latency_ms])

def main() -> None:
mp.set_start_method("spawn", force=True)

transfer_queue = mp.Queue()

prod = mp.Process(
target=producer,
args=(transfer_queue, 50),
)
cons = mp.Process(target=consumer, args=(transfer_queue, ))
prod.start()
cons.start()

prod.join()
cons.join()

if __name__ == "__main__":
main()
Проблема в том, что измеренная задержка каждого элемента очень велика:

Код: Выделить всё

item_index,latency_ms
0,130.301841
1,128.7352
2,128.617645
3,128.545633
.
.
.
48,127.844094
49,127.786926
Обычно IPC с Pipes не должен занимать сотни миллисекунд. Итак, я нашел еще одну тему переполнения стека, которая также измеряет этот компонент.
Вот воспроизводимый фрагмент main_low.py

Код: Выделить всё

import csv
import os
import time
from typing import Dict, List, Tuple

import torch
import torch.multiprocessing as mp

def producer(queue: mp.Queue, time_end_queue: mp.Queue, num_items: int) -> None:
# float32 uses 4 bytes per element -> 5 MiB tensor.
num_elements = (5 * 1024 * 1024) // 4
tensor_bytes = num_elements * torch.tensor([], dtype=torch.float32).element_size()

latencies_ms = {}
start_times = {}
for item_index in range(num_items):
tensor = torch.rand(num_elements, dtype=torch.float32, device="cuda")
# Ensure tensor materialization is complete before starting transfer timing.
torch.cuda.synchronize()

ts_start = time.perf_counter_ns()
start_times[item_index] = ts_start
queue.put(("data", item_index, tensor, ))
print(f"added item {item_index} to queue, queue size: {queue.qsize()}")

index, end_ts = time_end_queue.get()
latency_ms = (end_ts - start_times[index]) / 1e6
# print(f"[producer {producer_index}] item {item_index} latency: {latency_ms:.3f} ms")
latencies_ms[item_index] = latency_ms
del tensor

queue.put(("done",))

with open("latencies_low.csv", "w", newline="") as f:
writer = csv.writer(f)
writer.writerow(["item_index", "latency_ms"])
for item_index, latency_ms in sorted(latencies_ms.items()):
writer.writerow([item_index, latency_ms])

def consumer(in_queue: mp.Queue, time_end_queue: mp.Queue) -> None:
item_count = 0
producer_done = 0
while True:
item = in_queue.get()
kind = item[0]
if kind == "done":
break

_, item_index, tensor = item
item_count += 1
end_ts = time.perf_counter_ns()
time_end_queue.put((item_index, end_ts))
del tensor

def main() -> None:
mp.set_start_method("spawn", force=True)

transfer_queue, time_end_queue = mp.Queue(), mp.Queue()

prod = mp.Process(
target=producer,
args=(transfer_queue, time_end_queue, 50),
)
cons = mp.Process(target=consumer, args=(transfer_queue, time_end_queue))

prod.start()
cons.start()

prod.join()
cons.join()

if __name__ == "__main__":
main()
И результаты оказались более точными:

Код: Выделить всё

item_index,latency_ms
0,106.522214
1,0.569158
2,0.391128
3,0.333409
.
.
.
47,0.310437
48,0.306123
49,0.307088
Вот мои проблемы с этой разницей:
  • Я не могу придумать какой-либо конкретной причины, почему должна быть такая огромная разница в измерениях.
  • В любом случае должна быть временная метка, отправляемая через mp.Queue. Таким образом, проблема не должна заключаться в выборе целого числа временной метки.
  • Еще одно сомнение заключается в том, что часы между процессами не синхронизированы. Но я проверил, что time.perf_counter_ns() использует CLOCK_MONOTONIC. Так что точность не должна быть такой высокой.


Подробнее здесь: https://stackoverflow.com/questions/799 ... sing-queue
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «Python»