Как связать задачу Celery, возвращающую список, в группу?
за исключением того, что мне нужно, чтобы это произошло в середине цепочки, а принятое решение работает только в том случае, если промежуточная задача является последним «звеном» в цепочке.
Вот тот же слегка измененный пример, воспроизводящий проблему:
Код: Выделить всё
from random import random
from celery import
@app.task
def get_list(amount):
return [i for i in range(amount)]
@app.task
def process_item(item):
return [f'id-{item}', random() > .5]
@app.task
def dmap(it, callback):
# Map a callback over an iterator and return as a group
callback = subtask(callback)
return group(callback.clone([arg,]) for arg in it)()
@app.task
def handle_results(results):
for result in results:
if result[1] == None:
continue
return result[1] # return the first True value
def foo():
return chain(
get_list.s(10),
dmap.s(process_item.s()),
handle_results.s() # >> lst = [i for i in range(amount)]
>>> chain(group(process_item.s(i) for i in lst), handle_results.s())
Код: Выделить всё
>>> from app.manager_tasks import process_item
>>> group(process_item.s(e) for e in [1, 2, 3, 4])
group([app.manager_tasks.process_item(1), process_item(2), process_item(3), process_item(4)])
>>> group(process_item.s(e) for e in [1, 2, 3, 4]).delay()
Если я вызываю результат, как это сделано в других примерах stackoverflow (та же ссылка, что и в первом примере), у меня остается GroupResult, который успешен только в том случае, если это последний член цепочки (
Код: Выделить всё
()
Я довольно озадачен этим… но я также новичок в сельдерее. Очень признателен за любые советы о том, как я мог/должен решить эту проблему!
Еще немного предыстории: я собираюсь использовать эту цепочку повторно как часть другой цепочки, которая находится в начале верхний уровень, определяющий этапы конвейера.
Подробнее здесь: https://stackoverflow.com/questions/609 ... e-of-chain