Как я могу занять сотрудников Dask при обработке больших наборов данных, чтобы у них не заканчивались задачи?Python

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 Как я могу занять сотрудников Dask при обработке больших наборов данных, чтобы у них не заканчивались задачи?

Сообщение Anonymous »

Я пытаюсь обработать большой набор данных (около 1 миллиона задач) с помощью распределенных вычислений Dask на Python. (Я получаю данные из базы данных для их обработки и получаю около 1 миллиона строк). Здесь я только что сделал более простую версию своего кода:
Каждая задача имитирует некоторые вычисления, и я хочу эффективно распределить эти задачи между несколькими работниками, чтобы максимизировать использование ресурсов.Вот минимальный воспроизводимый пример моего кода:

Код: Выделить всё

from dask.distributed import Client, as_completed
from tqdm import tqdm
import time
import random

# Dummy computational function
def compute_task(data):
# Simulate some computation
time.sleep(random.uniform(0.01, 0.02))  # Simulate computation time
return data * data

# Function to process a chunk of data
def process_chunk(chunk):
results = []
for item in chunk:
result = compute_task(item)
results.append(result)
return results

def main(scheduler_address, num_tasks=1000000, chunk_size=100, max_concurrent_tasks=1000):
client = Client(scheduler_address)
print(f"Connected to Dask scheduler at {scheduler_address}")

try:
# Generate dummy data
data = list(range(num_tasks))
total_chunks = (num_tasks + chunk_size - 1) // chunk_size

# Create a generator for chunks
def chunk_generator():
for i in range(0, len(data), chunk_size):
yield data[i:i + chunk_size]

chunks = chunk_generator()
active_futures = []

# Initial submission of tasks
for _ in range(min(max_concurrent_tasks, total_chunks)):
try:
chunk = next(chunks)
future = client.submit(process_chunk, chunk)
active_futures.append(future)
except StopIteration:
break

completed_chunks = 0
with tqdm(total=total_chunks, desc="Processing data") as pbar:
for completed_future in as_completed(active_futures):
results = completed_future.result()
# Here we could do something with the results
pbar.update(1)
completed_chunks += 1

# Submit new tasks to keep the pipeline full
try:
chunk = next(chunks)
future = client.submit(process_chunk, chunk)
active_futures.append(future)
except StopIteration:
pass

# Remove completed future from the list
active_futures.remove(completed_future)

print("Processing complete.")

finally:
client.close()
print("Client closed.")

if __name__ == "__main__":
main(scheduler_address='tcp://localhost:8786')
Объяснение:
  • compute_task: фиктивная функция, имитирующая вычислительную работу
    переход в режим ожидания на короткое случайное время и возврат квадрата
    входных данных.
  • process_chunk: применяет Compute_task к каждому элементу в фрагменте.
  • Основная функция:

    Создает список чисел в виде фиктивных данных.
  • Разбивает данные на фрагменты. li>
  • Отправляет задачи работникам, стремясь сохранить определенное количество задач
    (tasks_per_worker) в очереди на каждого работника.
  • Обрабатывает результаты по мере выполнения задач и пытается пополнить
    очереди рабочих.

Проблема:
Несмотря на такую ​​настройку, у работников быстро заканчиваются задачи и они простаивают. И рабочий пул лишается задач. Кажется, что моя логика отправки и пополнения задач не обеспечивает достаточной занятости работников, что приводит к неэффективному использованию ресурсов. Работники обрабатывают задачи быстрее, чем отправляются новые задачи, в результате чего они простаивают в ожидании новых задач.
Мои вопросы:
  • Как я могу улучшить логику отправки задач, чтобы работники Dask
    оставались занятыми до тех пор, пока все данные не будут обработаны?
  • Есть ли более эффективный способ распределения задач среди работников, чтобы
    максимизировать пропускную способность и использование ресурсов?
Я подозреваю, что накладные расходы в моей логике отправки задач и управления вызывают задержки. Управление очередями для каждого работника и указание исполнителей в client.submit может привести к ненужной сложности и задержкам. Подумываю о том, чтобы позволить Dask обрабатывать назначение работника, удалив параметр Workers, но я не знаю, как соответствующим образом скорректировать свой код.
Любые указания или предложения будут с благодарностью приняты!

Подробнее здесь: https://stackoverflow.com/questions/789 ... event-them
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «Python»