Я явно не определил errorChannel в своей конфигурации, поэтому полагаюсь на поведение по умолчанию. Для тестирования я намеренно создаю исключение, чтобы посмотреть, как оно обрабатывается. Вот упрощенная версия моего кода:
Журнал об ошибке создания канала
Код: Выделить всё
2024-11-10 00:16:05.036 INFO 18784 --- [ main] c.p.O.M.Application : No active profile set, falling back to default profiles: default 2024-11-10 00:16:05.046 INFO 18784 --- [ main] ationConfigEmbeddedWebApplicationContext : Refreshing org.springframework.boot.context.embedded.AnnotationConfigEmbeddedWebApplicationContext@525d79f0: startup date [Sun Nov 10 00:16:05 GMT 2024]; parent: org.springframework.context.annotation.AnnotationConfigApplicationContext@1fa121e2 2024-11-10 00:16:05.337 INFO 18784 --- [ main] o.s.i.config.IntegrationRegistrar : No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created. 2024-11-10 00:16:05.756 INFO 18784 --- [ main] o.s.cloud.context.scope.GenericScope : BeanFactory id=e62e62b0-2a73-3813-9f0c-5895d78d6bf5 2024-11-10 00:16:05.762 INFO 18784 --- [ main] faultConfiguringBeanFactoryPostProcessor : No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created.Код: Выделить всё
@Bean
public IntegrationFlow errorFlow() {
return IntegrationFlows.from("errorChannel")
.log(Level.TRACE, this.getClass().getName() + ".errorFlow")
// Filter multiple messages for the same input messages so that the error is only handled once
.filter(Message.class, message -> {
Message failedMessage = message;
if (failedMessage instanceof ErrorMessage &&
((ErrorMessage) failedMessage).getPayload() instanceof MessagingException) {
failedMessage = ((MessagingException) ((ErrorMessage) failedMessage).getPayload()).getFailedMessage();
}
AmqpUniqueId amqpUniqueId = new AmqpUniqueId(
(String) failedMessage.getHeaders().get("amqp_consumerTag"),
(Long) failedMessage.getHeaders().get("amqp_deliveryTag"));
return this.messageIdStore.putIfAbsent(amqpUniqueId, ZonedDateTime.now(ZoneId.of("Z"))) == null;
})
.handle(message -> {
Message failedMessage = message;
if (failedMessage instanceof ErrorMessage &&
((ErrorMessage) failedMessage).getPayload() instanceof MessagingException) {
failedMessage = ((MessagingException) ((ErrorMessage) failedMessage).getPayload()).getFailedMessage();
}
Channel channel = (Channel) failedMessage.getHeaders().get(AmqpHeaders.CHANNEL);
Long deliveryTag = (Long) failedMessage.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
OrderStatus orderStatus = OrderStatus.FAILED;
if (channel != null) {
orderStatus = OrderStatus.RETRYING;
try {
channel.basicNack(deliveryTag, false, false);
} catch (IOException exception) {
log.error("channel operation failed", exception);
}
} else {
log.error("can't access channel: " + message.toString());
}
Long orderId = getOrderIdFromFailedMessage(failedMessage);
if (orderId != null) {
try {
this.database.setOrderStatus(orderId, orderStatus.getId(), null);
} catch (SQLException exception) {
log.error("An Error occurred while updating order status", exception);
}
} else {
log.error("unable to obtain order ID: " + message.toString());
}
})
.get();
}
Код: Выделить всё
@Bean
public IntegrationFlow orderRequestFlow() {
return IntegrationFlow.from("OrderRequestListener-out-0")
.log(Level.TRACE, this.getClass().getName() + ".orderRequestFlow")
.enrichHeaders(h -> h.header("source", "orderRequestFlow"))
.handle((message, responseHeaders) -> {
return jsonMessageConverter.convert(message, OrderMessage.class, responseHeaders);
})
.routeToRecipients(exhaustedRetriesRouter -> exhaustedRetriesRouter
.recipientMessageSelector(GenericFlow.EXHAUSTED_RETRIES_CHANNEL,
message -> isExhaustedMessage(message)
).defaultSubFlowMapping(orderAttemptSubFlow -> orderAttemptSubFlow
.transform(orderMessage -> {
try {
// Testing Exception Handling
if(orderMessage != null)
throw new SQLException("OrderMessage is not null");
return this.orderRequestFactory.getOrderDetails(orderMessage);
} catch (final SQLException exception) {
throw new MessagingException("Transform failed", exception);
}
})
.routeToRecipients(unableToOrderRouter -> unableToOrderRouter
[*]Изменилось ли поведение или реализация errorChannel в Spring Integration 6.x?
[*]Нужно ли мне явно определять или настраивать errorChannel в последней версии?
[*]Нужны ли какие-либо дополнительные настройки, чтобы errorChannel продолжал обрабатывать исключения, не прекращая обработку сообщений?
< /ol>
Мы будем очень признательны за любые рекомендации по обработке ошибок и обеспечению согласованного поведения обработки сообщений с помощью Spring Integration 6.x.
Подробнее здесь: https://stackoverflow.com/questions/791 ... g-messages
Мобильная версия