Обрабатывать сообщения в очереди после нескольких других очередейJAVA

Программисты JAVA общаются здесь
Ответить
Anonymous
 Обрабатывать сообщения в очереди после нескольких других очередей

Сообщение Anonymous »

У меня есть несколько очередей и потребителей RabbitMQ в монорепозитории. Я пытаюсь добавить новую очередь/потребителя, которая должна работать после двух других потребителей. Каждый Потребитель отвечает за обработку сообщения, которое содержит набор идентификаторов коллекций MongoDB. Каждый экземпляр handleMessage() перебирает эти идентификаторы, извлекает запись из MongoDB, вызывает внешнюю службу и записывает значение обратно в коллекцию MongoDB. Предыдущий процесс отвечает за отправку этих сообщений в каждую очередь, поэтому каждый потребитель будет обрабатывать одни и те же идентификаторы одновременно.

Код: Выделить всё

ConsumerA.groovy (queue: consumer.a
):

Код: Выделить всё

void handleMessage(List mongoIds) {
for (MongoId mongoId : mongoIds) {
MongoRecord mongoRecord = mongoRepository.findById(mongoId)

mongoRecord.propertyA = client.callExternalService()

mongoRepository.save(mongoRecord)
}
}

Код: Выделить всё

ConsumerB.groovy (queue: consumer.b
):

Код: Выделить всё

void handleMessage(List mongoIds) {
for (MongoId mongoId : mongoIds) {
MongoRecord mongoRecord = mongoRepository.findById(mongoId)

mongoRecord.propertyB = client.callExternalService()

mongoRepository.save(mongoRecord)
}
}
* В основном это псевдокод; имеются нулевая проверка, пакетная обработка, массовые обновления и т. д.; неэффективность/ошибки в этом коде отсутствуют в моем реальном коде.
Третий потребитель запрашивает эту коллекцию Mongo и использует значения из mongoRecord.propertyA и mongoRecord.propertyB для вычисления свойстваC:

Код: Выделить всё

ConsumerC.groovy (queue:consumer.c
):

Код: Выделить всё

void handleMessage(List mongoIds) {
for (MongoId mongoId : mongoIds) {
MongoRecord mongoRecord = mongoRepository.findById(mongoId)

mongoRecord.propertyC = client.callExternalService(mongoRecord.propertyA, mongoRecord.propertyB)

mongoRepository.save(mongoRecord)
}
}
Как отправить то же сообщение, которое было отправлено на Consumer.a и Consumer.b, только на Consumer.c, только после того, как Consumer.a и Consumer.b обработали сообщение?
Раньше это была простая цепочка; Consumer.a обработает сообщение и свяжет его с Consumer.b через ConvertAndSend('consumer.c', mongoIds) (или аналогичный), но было обнаружено дополнительное бизнес-требование, которое требуется, чтобы два потребителя обработали перед третьим, и я действительно не знаю, как с этим справиться.

Подробнее здесь: https://stackoverflow.com/questions/793 ... her-queues
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «JAVA»