Пропустить или обработать ошибку в Mutiny Multi.createFrom().iterable в Quarkus и реактивном программированииJAVA

Программисты JAVA общаются здесь
Ответить Пред. темаСлед. тема
Anonymous
 Пропустить или обработать ошибку в Mutiny Multi.createFrom().iterable в Quarkus и реактивном программировании

Сообщение Anonymous »

Я реализую функцию чтения сообщений из очереди в SQS в AWS, их обработки и вставки в таблицу в dynamoDB.
Я использую Quarkus и реактивное программирование с Mutiny, чтобы выполнить все необходимые мне операции, прежде чем вставлять сообщение в таблицу.
Проблема заключается в том, что сообщение приходит в закодированном виде, когда я пытаюсь декодировать сообщение и сообщение имеет неверный формат. Я выдаю NonRetryableException.class. Когда это происходит, весь раздел кода останавливается. Например, если я читаю 10 сообщений из очереди и сообщение имеет неверный формат в списке сообщений номер 5, остальные 5 отсутствующих сообщений не будут обработаны, поскольку функция остановлена.
Я пытался изолировать метод декодирования сообщений, но это не сработало.
Вот как устроен мой метод.

Код: Выделить всё

private Uni processMessagesFromQueue(List messages) {
if (messages.isEmpty()) {
return Uni.createFrom().nullItem();
}
Log.infof("Number of messages to start to process: %s", messages.size());
Multi.createFrom().iterable(messages)
.onItem().transformToUniAndMerge(mapper::deserializeMessage) //HERE IS THE ERROR
.collect().asList()
.flatMap(service::filterDuplicates)
.map(mapper::transformDataToBeInsertedInDDB)
.flatMap(service::filterExistingIds)
.invoke(service::saveDataInBatch)
.onFailure(NonRetryableException.class).invoke(throwable -> Log.errorf("Error processing the messages %s: ", throwable.getMessage()))
.subscribe()
.with(item -> Log.info("The messages have been processed successfully"));

queueService.deleteMessagesBatch(queueUrl, messages).subscribe().with(
item -> Log.info("The messaages have been processed and deleted successfully"),
throwable -> Log.error("Error deleting the messages: ")
);
deserializeMessage: этот метод возвращает Uni, если
обрабатывается. В случае ошибки возвращается NonRetryableException.
filterDuulates: проверяет дубликаты в таблице dynamoDb и возвращает
Uni
transformDataToBeInsertedInDDB. : сопоставить Uni

Подробнее здесь: https://stackoverflow.com/questions/787 ... s-and-reac
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение
  • Неожиданное поведение потока с subscribeOn(Schedulers.boundedElastic()) в реактивном программировании Spring Boot
    Anonymous » » в форуме JAVA
    0 Ответы
    17 Просмотры
    Последнее сообщение Anonymous
  • Как преобразовать объект в Mono в реактивном программировании
    Anonymous » » в форуме JAVA
    0 Ответы
    14 Просмотры
    Последнее сообщение Anonymous
  • Как преобразовать объект в Mono в реактивном программировании
    Anonymous » » в форуме JAVA
    0 Ответы
    7 Просмотры
    Последнее сообщение Anonymous
  • Как преобразовать объект в Mono в реактивном программировании
    Anonymous » » в форуме JAVA
    0 Ответы
    10 Просмотры
    Последнее сообщение Anonymous
  • Quarkus with Mutiny: поиск советов по запуску блокировки кода и контекста/управления потоками?
    Anonymous » » в форуме JAVA
    0 Ответы
    15 Просмотры
    Последнее сообщение Anonymous

Вернуться в «JAVA»