Обратный вызов аккорда сельдерея получает сигнатуры задач вместо результатов – как правильно создать динамический заголоPython

Программы на Python
Ответить
Anonymous
 Обратный вызов аккорда сельдерея получает сигнатуры задач вместо результатов – как правильно создать динамический заголо

Сообщение Anonymous »

У меня есть рабочий процесс Celery, в котором я в конечном итоге хочу запустить задачу с ограниченной скоростью для каждого элемента в результате предыдущей задачи (список), а затем агрегировать их результаты с помощью аккорда. Однако мой заголовок аккорда представляет собой одну задачу, которая возвращает список подписей, поэтому Celery вообще не планирует их. Таким образом, моя задача агрегатора (обратный вызов аккорда) также видит только сигнатуры задач, а не их возвращаемые значения.
Например, у меня может быть что-то вроде:

Код: Выделить всё

@app.task()
def create_subtasks(items):
subtasks = []
for item in items:
subtasks.append(mul.s(item, 2))
subtasks.append(mul.s(item, 4))
return subtasks

@app.task
def yield_list():
# in reality this depends on previous tasks
return [1, 10, 100, 1000]

@app.task(rate_limit='5/m')
def mul(x, y):
return x * y

@app.task
def aggregate(elements):
def flatten(lst):
for item in lst:
if isinstance(item, list):
yield from flatten(item)
else:
yield item

elements = list(flatten(elements))
return elements

# ...

# the actual workflow is more complicated, yield_list.s() is not the first task in the actual workflow, and more tasks are following after the chord
result = (yield_list.s() | chord(
create_subtasks.s(), # this returns all 8 serialized signatures for the 'mul' tasks, but does not run them
aggregate.s() # so the aggregator of course only sees the serialized signatures as well
)).apply_async()
Результат агрегата.s() выглядит следующим образом:

Код: Выделить всё

[
{'task': 'worker.tasks.mul', 'args': [
1,
2
], 'kwargs': {}, 'options': {}, 'subtask_type': None, 'immutable': False
},
{'task': 'worker.tasks.mul', 'args': [
1,
4
], 'kwargs': {}, 'options': {}, 'subtask_type': None, 'immutable': False
},
...
]
Вместо этого мне хотелось бы:

Код: Выделить всё

[2, 4, 20, 40, 200, 400, 2000, 4000]
Я понимаю, что заголовок аккорда — это всего лишь единичная задача (

Код: Выделить всё

create_subtasks
) и что Celery не автоматически рассматривает возвращенный список подписей как задачи, которые необходимо создать. Поэтому мне ясно, что заголовок аккорда должен представлять собой список (или группу) в время создания аккорда, а не после того, как одна задача вернет их.
Я думал или безуспешно пытался:
  • Сделать create_subtasks обычной функцией Python: не успел сделать эту работу, так как для этого нужен результат yield_list, который является задачей Celery, поэтому я не могу запустить create_subtasks до фактического рабочего процесса

    Код: Выделить всё

    list_of_mul_tasks = create_subtasks(items)
    
    result = yield_list.s() | chord(
    group(list_of_mul_tasks), # in theory this would now schedule all 8 tasks
    # but since create_subtasks depends on yield_list, it is not possible to actually create the subtasks in advance
    aggregate.s()
    ).apply_async()
    
  • Сделайте create_subtasks задачей Celery, которая напрямую планирует аккорд: конечно, я мог бы вызвать .apply_async() из create_subtasks нравится

    Код: Выделить всё

    def create_subtasks(items):
    subtasks = []
    for item in items:
    subtasks.append(mul.s(item, 2))
    subtasks.append(mul.s(item, 4))
    
    # construct chord inside create_subtasks
    subtasks_result = chord(group(tasks), aggregate.s()).apply_async()
    
    # could return the GroupResult's id or something
    return subtasks_result.id
    
    Проблема, которую я вижу в этом решении, заключается в том, что основной рабочий процесс уже будет продвигаться вперед, не дожидаясь завершения аккорда (и я понимаю, что вызывать .get() изнутри — плохая практика). последующая задача, например, агрегатор - и я не уверен, что опрос для group.ready() также считается хорошей практикой), и я не могу поместить остальную часть своего рабочего процесса в create_subtasks (аналогично этот ответ), так как это уже довольно сложно, и позже будет несколько точек синхронизации с некоторыми параллельными задачами.
tldr:
  • Признак: обратный вызов аккорда получает (сериализованные) сигнатуры вместо фактических результатов.
  • Причина: заголовок аккорда всего один задача, возвращающая эти подписи, и Celery никогда не планирует их как параллельные задачи.
  • Решение: Каким-то образом убедитесь, что заголовок аккорда представляет собой собственные задачи, а не отдельную задачу, возврат которой code> — список задач.
Среда:
  • redis-сервер: 7.4.2
  • сельдерей: 5.5.0rc4
  • python: 3.12.8
Поскольку я относительно новичок в Celery, возможно, мне не хватает очевидного решения или подхода к этой проблеме. Я также не смог найти другие связанные вопросы по Stackoverflow или полезные темы в обсуждениях GitHub Celery. Я начинаю сомневаться, возможно ли это вообще... Любые подсказки о том, как лучше всего действовать в таком случае и как с этим бороться, очень приветствуются.
Заранее спасибо!< /п>

Подробнее здесь: https://stackoverflow.com/questions/793 ... w-to-prope
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «Python»