Интеграция Django + Dask: использование и прогресс?Python

Программы на Python
Ответить
Anonymous
 Интеграция Django + Dask: использование и прогресс?

Сообщение Anonymous »

О производительности и передовом опыте

Обратите внимание: весь код для вопроса ниже доступен на Github.
Не стесняйтесь ознакомьтесь с проектом! https://github.com/b-long/moose-dj-uv/pull/3

Я пытаюсь потренироваться простая интеграция Django + Dask, где одно представление запускает длительный процесс, а другое представление может проверять статус этой работы. Позже я мог бы улучшить это таким образом, чтобы get_task_status (или какая-либо другая функция просмотра Django) могла возвращать выходные данные работы.
Я использую time .sleep(2), чтобы намеренно имитировать длительную работу. Кроме того, важно видеть общий статус работы как «работает». С этой целью я также использую time.sleep() в своем тесте, что выглядит очень глупо.
Вот код представления:

Код: Выделить всё

from uuid import uuid4
from django.http import JsonResponse
from dask.distributed import Client
import time

# Initialize Dask client
client = Client(n_workers=8, threads_per_worker=2)

NUM_FAKE_TASKS = 25

# Dictionary to store futures with task_id as key
task_futures = {}

def long_running_process(work_list):
def task_function(task):
time.sleep(2)
return task

futures = [client.submit(task_function, task) for task in work_list]
return futures

async def start_task(request):
work_list = []

for t in range(NUM_FAKE_TASKS):
task_id = str(uuid4())  # Generate a unique ID for the task
work_list.append(
{"address": f"foo--{t}@example.com", "message": f"Mail task: {task_id}"}
)

futures = long_running_process(work_list)
dask_task_id = futures[0].key  # Use the key of the first future as the task ID

# Store the futures in the dictionary with task_id as key
task_futures[dask_task_id] = futures

return JsonResponse({"task_id": dask_task_id})

async def get_task_status(request, task_id):
futures = task_futures.get(task_id)

if futures:
if not all(future.done() for future in futures):
progress = 0
return JsonResponse({"status": "running", "progress": progress})
else:
results = client.gather(futures, asynchronous=False)

# Calculate progress, based on futures that are 'done'
progress = int((sum(future.done() for future in futures) / len(futures)) * 100)

return JsonResponse(
{
"task_id": task_id,
"status": "completed",
"progress": progress,
"results": results,
}
)
else:
return JsonResponse({"status": "error", "message": "Task not found"})

Я написал тест, который выполняется примерно за 5,5 секунд:

Код: Выделить всё

from django.test import Client
from django.urls import reverse
import time

def test_immediate_response_with_dask():
client = Client()
response = client.post(reverse("start_task_dask"), data={"data": "foo"})
assert response.status_code == 200
assert "task_id" in response.json()

task_id = response.json()["task_id"]
response2 = client.get(reverse("get_task_status_dask", kwargs={"task_id": task_id}))
assert response2.status_code == 200
r2_status = response2.json()["status"]
assert r2_status == "running"

attempts = 0
max_attempts = 8

while attempts <  max_attempts:
time.sleep(1)
try:
response3 = client.get(
reverse("get_task_status_dask", kwargs={"task_id": task_id})
)
assert response3.status_code == 200

r3_status = response3.json()["status"]
r3_progress = response3.json()["progress"]

assert r3_progress >= 99
assert r3_status == "completed"
break  # Exit the loop if successful
except Exception:
attempts += 1
if attempts == max_attempts:
raise  # Raise the last exception if all attempts failed

Мой вопрос: существует ли более эффективный способ реализации того же API? Что делать, если NUM_FAKE_TASKS = 10000?
Я трачу циклы зря?
Изменить: как просмотреть процент прогресса?Спасибо @GuillaumeEB за подсказку.
Итак, мы знаем, что блокируется следующее:

Код: Выделить всё

client.gather(futures, asynchronous=False)
Но, похоже, это тоже ведет себя не так, как ожидалось:

Код: Выделить всё

client.gather(futures, asynchronous=True)
Есть ли способ использовать client.persist() или client.compute(), чтобы увидеть постепенный прогресс?
Я знаю, что не могу сохранить список , и использование client.compute(futures) также кажется вести себя неправильно (перескакивание прогресса с 0 на 100).

Подробнее здесь: https://stackoverflow.com/questions/791 ... d-progress
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «Python»