Задача работает локально, но возникают ошибки в кластере Dask: «SystemError: ошибка возвращается без установленного исклPython

Программы на Python
Ответить
Anonymous
 Задача работает локально, но возникают ошибки в кластере Dask: «SystemError: ошибка возвращается без установленного искл

Сообщение Anonymous »

У меня есть следующие коды, которые передают массив задаче и отправляют его в кластер Dask. Кластер Dask работает в Docker с несколькими работниками Dask. Docker начинается с:

Код: Выделить всё

scheduler:
docker run -d \
--name dask-scheduler \
-p 8786:8786 \
-p 8787:8787 \
ghcr.io/dask/dask:2025.11.0-py3.12  \
dask-scheduler

workers:
docker run -d \
--name dask-worker-1 \
-p 8789:8789 \
-p 9000:9000 \
ghcr.io/dask/dask:2025.11.0-py3.12  \
dask-worker tcp://scheduler_ip:8786 --dashboard-address 8789 --contact-address tcp:/workder_ip1:9000  --listen-address tcp://0.0.0.0:9000
Клиентская среда:
  • numpy 2.35
  • python 3.11.8.final.0.
Среды планировщика и рабочих:
  • numpy 2.34
  • python 3.12.12.final.0.

Код: Выделить всё

from dask.distributed import Client, prog
client = Client("tcp://*******:8786")

n_total_number = 100
aj = []
bj=[]
cj=[]

for xi in range(n_total_number):
gnd = np.random.rand(3)
aj.append(gnd[0])
bj.append(gnd[1])
cj.append(gnd[2])

ajn = client.scatter(aj)
bjn = client.scatter(bj)
cjn = client.scatter(cj)

def process_task(inx, ajn, bjn, cjn):
m = ajn[inx]
n = bjn[inx]
q = cjn[inx]

return m*n*q

result_futures = []

for i in range(len(aj)):
future = client.submit(
process_task,
i,             # The index (passed by value)
ajn,
bjn,
cjn
)
result_futures.append(future)

final_results = client.gather(result_futures)
Это работает локально, но при запуске кодов появляется следующая ошибка:

Код: Выделить всё

---------------------------------------------------------------------------
SystemError                               Traceback (most recent call last)
Cell In[6], line 1
----> 1 final_results = client.gather(result_futures)
3 print("\n--- Gathered Results ---")
4 for result in final_results:

File d:\*****\venv\Lib\site-packages\distributed\client.py:2559, in Client.gather(self, futures, errors, direct, asynchronous)
2556     local_worker = None
2558 with shorten_traceback():
-> 2559     return self.sync(
2560         self._gather,
2561         futures,
2562         errors=errors,
2563         direct=direct,
2564         local_worker=local_worker,
2565         asynchronous=asynchronous,
2566     )

Cell In[4], line 20, in process_task()
19 def process_task(inx, ajn, bjn, cjn):
---> 20     m = ajn[inx]
21     n = bjn[inx]
22     q = cjn[inx]

SystemError: error return without exception set
Он не работает с массивом, но работает для скалярного десятичного типа, изменив функциюprocess_task следующим образом:

Код: Выделить всё

def process_task(ajn, bjn, cjn):
m = ajn
n = bjn
q = cjn

return m*n*q

result_futures = []

for i in range(len(aj)):
future = client.submit(
process_task,
i,             # The index (passed by value)
ajn[i],
bjn[i],
cjn[i]
)
result_futures.append(future)
Пожалуйста, помогите.

Спасибо.
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «Python»