Основная структура:
Код: Выделить всё
ezbKafka -> ezbPipelineFlow -> veibPublishSubscribeChannel -> VeibFlow -> KafkaКод: Выделить всё
dloxKafka -> dloxPipelineFlow -> veibPublishSubscribeChannel -> VeibFlow -> Kafka
Код: Выделить всё
@Bean
public IntegrationFlow ezbPipelineFlow() {
return IntegrationFlow
.from(Kafka
.messageDrivenChannelAdapter(ezbConnection.getConsumerFactory(), EZB_TOPIC)
.errorChannel("errorChannel"))
.transformWith(spec -> spec
.transformer(ezbMessageTransformer::transformToEzbMessage)
.requiresReply(false))
// other synchronous filters / handlers / enrichers
.channel(veibMessageChannel)
.nullChannel();
}
Код: Выделить всё
public EzbMessage transformToEzbMessage(String ezbMessage) {
try {
return jsonMapper.readValue(ezbMessage, EzbMessage.class);
} catch (Exception ex) {
torLoggingCatalog.signalEzbMessageParsingError(ezbMessage).withException(ex).log();
return null;
}
}
К сожалению, после обновления это больше не работает. Поэтому я думал о том, как вместо этого обработать это исключение. Один из способов — просто обернуть исключение и затем зарегистрировать его с помощью какого-то глобального обработчика исключений, аналогичного @ExceptionHandler для RestControllers. По сути, это определяется каналом ошибок интеграции Spring и могло бы работать так, что решило бы мою проблему.
Код: Выделить всё
@Bean
IntegrationFlow errorFlow(DirectChannel errorChannel) {
return IntegrationFlow.from(errorChannel)
.handle() // do logging based on exception type
.get();
}
Код: Выделить всё
public EzbMessage transformToEzbMessage(String ezbMessage) {
try {
return objectMapper.readValue(ezbMessage, EzbMessage.class);
} catch (Exception ex) {
throw new EzbMessageParsingException(ezbMessage, "Failed to transform from string to EzbMessage", ex);
}
}
Мой вопрос: Как мне обрабатывать эти исключения корректно и в одном централизованном месте как для моих синхронизирующих, так и для асинхронных потоков? Могу ли я написать какой-нибудь глобальный обработчик исключений? Могу ли я отправлять исключения синхронизации в один и тот же поток ошибок?
Я, конечно, мог бы сделать свои потоки асинхронными, но это имеет множество других недостатков, например: что усложняет тестирование.
И второй вопрос: изменилось ли что-то в настройке requireReply()? Я использую это неправильно? Я не нашел явного упоминания в журнале изменений.
Подробнее здесь: https://stackoverflow.com/questions/798 ... tion-flows
Мобильная версия