Я использую Quarkus и реактивное программирование с Mutiny, чтобы выполнить все необходимые мне операции, прежде чем вставлять сообщение в таблицу.
Проблема заключается в том, что сообщение приходит в закодированном виде, когда я пытаюсь декодировать сообщение и сообщение имеет неверный формат. Я выдаю NonRetryableException.class. Когда это происходит, весь раздел кода останавливается. Например, если я читаю 10 сообщений из очереди и сообщение имеет неверный формат в списке сообщений номер 5, остальные 5 отсутствующих сообщений не будут обработаны, поскольку функция остановлена.
Я пытался изолировать метод декодирования сообщений, но это не сработало.
Вот как устроен мой метод.
Код: Выделить всё
private Uni processMessagesFromQueue(List messages) {
if (messages.isEmpty()) {
return Uni.createFrom().nullItem();
}
Log.infof("Number of messages to start to process: %s", messages.size());
Multi.createFrom().iterable(messages)
.onItem().transformToUniAndMerge(mapper::deserializeMessage) //HERE IS THE ERROR
.collect().asList()
.flatMap(service::filterDuplicates)
.map(mapper::transformDataToBeInsertedInDDB)
.flatMap(service::filterExistingIds)
.invoke(service::saveDataInBatch)
.onFailure(NonRetryableException.class).invoke(throwable -> Log.errorf("Error processing the messages %s: ", throwable.getMessage()))
.subscribe()
.with(item -> Log.info("The messages have been processed successfully"));
queueService.deleteMessagesBatch(queueUrl, messages).subscribe().with(
item -> Log.info("The messaages have been processed and deleted successfully"),
throwable -> Log.error("Error deleting the messages: ")
);
обрабатывается. В случае ошибки возвращается NonRetryableException.
filterDuulates: проверяет дубликаты в таблице dynamoDb и возвращает
Uni
transformDataToBeInsertedInDDB. : сопоставить Uni
Подробнее здесь: https://stackoverflow.com/questions/787 ... s-and-reac