Лучшие практики обработки ошибок в синхронных потоках интеграции SpringJAVA

Программисты JAVA общаются здесь
Ответить
Anonymous
 Лучшие практики обработки ошибок в синхронных потоках интеграции Spring

Сообщение Anonymous »

Мы используем Spring Integration для обработки сообщений Kafka, их обработки, сохранения в базе данных и публикации в другой Kafka. Для этого мы используем набор потоков интеграции Spring, большинство из которых являются синхронными.
Основная структура: Теперь ezbPipelineFlow выглядит примерно так:

Код: Выделить всё

    @Bean
public IntegrationFlow ezbPipelineFlow() {
return IntegrationFlow
.from(Kafka
.messageDrivenChannelAdapter(ezbConnection.getConsumerFactory(), EZB_TOPIC)
.errorChannel("errorChannel"))
.transformWith(spec -> spec
.transformer(ezbMessageTransformer::transformToEzbMessage)
.requiresReply(false))
// other synchronous filters / handlers / enrichers
.channel(veibMessageChannel)
.nullChannel();
}
И мой трансформер Pojo:

Код: Выделить всё

  public EzbMessage transformToEzbMessage(String ezbMessage) {
try {
return jsonMapper.readValue(ezbMessage, EzbMessage.class);
} catch (Exception ex) {
torLoggingCatalog.signalEzbMessageParsingError(ezbMessage).withException(ex).log();
return null;
}
}
До обновления до Spring 4, с его зависимостью от Spring Integration 7.0 (из 6.5.5), этот код работал нормально. Если в одном из моих преобразователей возникало исключение, оно немедленно перехватывалось и обрабатывалось путем регистрации соответствующего исключения. Затем, вернув значение null и установив для requireReply значение false на преобразователе, мы смогли корректно завершить поток. Мне понравился этот подход, поскольку он позволил мне выполнить действие по обработке исключения, максимально приближенное к реальному исключению.
К сожалению, после обновления это больше не работает. Поэтому я думал о том, как вместо этого обработать это исключение. Один из способов — просто обернуть исключение и затем зарегистрировать его с помощью какого-то глобального обработчика исключений, аналогичного @ExceptionHandler для RestControllers. По сути, это определяется каналом ошибок интеграции Spring и могло бы работать так, что решило бы мою проблему.

Код: Выделить всё

  @Bean
IntegrationFlow errorFlow(DirectChannel errorChannel) {
return IntegrationFlow.from(errorChannel)
.handle() // do logging based on exception type
.get();
}

Код: Выделить всё

  public EzbMessage transformToEzbMessage(String ezbMessage) {
try {
return objectMapper.readValue(ezbMessage, EzbMessage.class);
} catch (Exception ex) {
throw new EzbMessageParsingException(ezbMessage, "Failed to transform from string to EzbMessage", ex);
}
}
Проблема согласно https://docs.spring.io/spring-integrati ... page-title, это работает только для асинхронных потоков (например, мой канал публикации-подписки?). Когда в моем синхронном потоке возникает исключение, оно просто передается вызывающей стороне, которая, как я предполагаю, является моим KafkaChannelAdapter.
Мой вопрос: Как мне обрабатывать эти исключения корректно и в одном централизованном месте как для моих синхронизирующих, так и для асинхронных потоков? Могу ли я написать какой-нибудь глобальный обработчик исключений? Могу ли я отправлять исключения синхронизации в один и тот же поток ошибок?
Я, конечно, мог бы сделать свои потоки асинхронными, но это имеет множество других недостатков, например: что усложняет тестирование.
И второй вопрос: изменилось ли что-то в настройке requireReply()? Я использую это неправильно? Я не нашел явного упоминания в журнале изменений.

Подробнее здесь: https://stackoverflow.com/questions/798 ... tion-flows
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «JAVA»