Каждая задача имитирует некоторые вычисления, и я хочу эффективно распределить эти задачи между несколькими работниками, чтобы максимизировать использование ресурсов.Вот минимальный воспроизводимый пример моего кода:
Код: Выделить всё
from dask.distributed import Client, as_completed
from tqdm import tqdm
import time
import random
# Dummy computational function
def compute_task(data):
# Simulate some computation
time.sleep(random.uniform(0.01, 0.02)) # Simulate computation time
return data * data
# Function to process a chunk of data
def process_chunk(chunk):
results = []
for item in chunk:
result = compute_task(item)
results.append(result)
return results
def main(scheduler_address, num_tasks=1000000, chunk_size=100, max_concurrent_tasks=1000):
client = Client(scheduler_address)
print(f"Connected to Dask scheduler at {scheduler_address}")
try:
# Generate dummy data
data = list(range(num_tasks))
total_chunks = (num_tasks + chunk_size - 1) // chunk_size
# Create a generator for chunks
def chunk_generator():
for i in range(0, len(data), chunk_size):
yield data[i:i + chunk_size]
chunks = chunk_generator()
active_futures = []
# Initial submission of tasks
for _ in range(min(max_concurrent_tasks, total_chunks)):
try:
chunk = next(chunks)
future = client.submit(process_chunk, chunk)
active_futures.append(future)
except StopIteration:
break
completed_chunks = 0
with tqdm(total=total_chunks, desc="Processing data") as pbar:
for completed_future in as_completed(active_futures):
results = completed_future.result()
# Here we could do something with the results
pbar.update(1)
completed_chunks += 1
# Submit new tasks to keep the pipeline full
try:
chunk = next(chunks)
future = client.submit(process_chunk, chunk)
active_futures.append(future)
except StopIteration:
pass
# Remove completed future from the list
active_futures.remove(completed_future)
print("Processing complete.")
finally:
client.close()
print("Client closed.")
if __name__ == "__main__":
main(scheduler_address='tcp://localhost:8786')
- compute_task: фиктивная функция, имитирующая вычислительную работу
переход в режим ожидания на короткое случайное время и возврат квадрата
входных данных. - process_chunk: применяет Compute_task к каждому элементу в фрагменте.
- Основная функция:
Создает список чисел в виде фиктивных данных. - Разбивает данные на фрагменты. li>
- Отправляет задачи работникам, стремясь сохранить определенное количество задач
(tasks_per_worker) в очереди на каждого работника. - Обрабатывает результаты по мере выполнения задач и пытается пополнить
очереди рабочих.
Проблема:
Несмотря на такую настройку, у работников быстро заканчиваются задачи и они простаивают. И рабочий пул лишается задач. Кажется, что моя логика отправки и пополнения задач не обеспечивает достаточной занятости работников, что приводит к неэффективному использованию ресурсов. Работники обрабатывают задачи быстрее, чем отправляются новые задачи, в результате чего они простаивают в ожидании новых задач.
Мои вопросы:
- Как я могу улучшить логику отправки задач, чтобы работники Dask
оставались занятыми до тех пор, пока все данные не будут обработаны? - Есть ли более эффективный способ распределения задач среди работников, чтобы
максимизировать пропускную способность и использование ресурсов?
Любые указания или предложения будут с благодарностью приняты!
Подробнее здесь: https://stackoverflow.com/questions/789 ... event-them