Связать задачу Celery, которая возвращает список в группу в середине цепочкиPython

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 Связать задачу Celery, которая возвращает список в группу в середине цепочки

Сообщение Anonymous »

Этот вопрос такой же, как и этот:
Как связать задачу Celery, возвращающую список, в группу?
за исключением того, что мне нужно, чтобы это произошло в середине цепочки, а принятое решение работает только в том случае, если промежуточная задача является последним «звеном» в цепочке.

Вот тот же слегка измененный пример, воспроизводящий проблему:

Код: Выделить всё

from random import random

from celery import

@app.task
def get_list(amount):
return [i for i in range(amount)]

@app.task
def process_item(item):
return [f'id-{item}', random() > .5]

@app.task
def dmap(it, callback):
# Map a callback over an iterator and return as a group
callback = subtask(callback)
return group(callback.clone([arg,]) for arg in it)()

@app.task
def handle_results(results):
for result in results:
if result[1] == None:
continue

return result[1] # return the first True value

def foo():
return chain(
get_list.s(10),
dmap.s(process_item.s()),
handle_results.s() # >> lst = [i for i in range(amount)]
>>> chain(group(process_item.s(i) for i in lst), handle_results.s())
тогда это сработает. Я не понимаю, что на самом деле нужно передать от одного члена цепочки к другому... поскольку результат group(...):

Код: Выделить всё

>>> from app.manager_tasks import process_item
>>> group(process_item.s(e) for e in [1, 2, 3, 4])
group([app.manager_tasks.process_item(1), process_item(2), process_item(3), process_item(4)])
>>> group(process_item.s(e) for e in [1, 2, 3, 4]).delay()

который сам по себе является GroupResult (с вызовом задержки), в противном случае это просто группа. Поскольку dmap сам по себе является подписью, я предполагаю, что именно поэтому внутри него нужно вызывать метод Delay() для цепочки.. 🤔
Если я вызываю результат, как это сделано в других примерах stackoverflow (та же ссылка, что и в первом примере), у меня остается GroupResult, который успешен только в том случае, если это последний член цепочки (, .delay(), .apply_async()). Если я вызываю .get() для GroupResult, чтобы получить что-то сериализуемое, я получаю следующую ошибку: RuntimeError: Никогда не вызывайте result.get() внутри задачи! Что и представляет мне с загадкой; как мне это сделать?

Я довольно озадачен этим… но я также новичок в сельдерее. Очень признателен за любые советы о том, как я мог/должен решить эту проблему!

Еще немного предыстории: я собираюсь использовать эту цепочку повторно как часть другой цепочки, которая находится в начале верхний уровень, определяющий этапы конвейера.

Подробнее здесь: https://stackoverflow.com/questions/609 ... e-of-chain
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «Python»