Обновление Spring Integration до 6.3.4 - errorChannel больше не обрабатывает сообщения после исключенияJAVA

Программисты JAVA общаются здесь
Ответить
Anonymous
 Обновление Spring Integration до 6.3.4 - errorChannel больше не обрабатывает сообщения после исключения

Сообщение Anonymous »

Недавно я обновился с версии Spring-integration (1.5.4) и Spring-cloud-stream(1.3.3.RELEASE) до Spring-integration-core 6.3.4. &
spring-cloud-stream(4.1.1) После обновления я заметил, что мой errorChannel перестает обрабатывать сообщения, когда в потоке возникает исключение. Ранее, в версии 1.5.4, такое поведение работало должным образом — ошибки обрабатывались без прерывания обработки сообщений.
Я явно не определил errorChannel в своей конфигурации, поэтому полагаюсь на поведение по умолчанию. Для тестирования я намеренно создаю исключение, чтобы посмотреть, как оно обрабатывается. Вот упрощенная версия моего кода и репозитория для демонстрационного проекта:
Журнал создания errorChannel

Код: Выделить всё

2024-11-10 00:16:05.036  INFO 18784 --- [           main] c.p.O.M.Application                      : No active profile set, falling back to default profiles: default 2024-11-10 00:16:05.046  INFO 18784 --- [           main] ationConfigEmbeddedWebApplicationContext : Refreshing org.springframework.boot.context.embedded.AnnotationConfigEmbeddedWebApplicationContext@525d79f0: startup date [Sun Nov 10 00:16:05 GMT 2024]; parent: org.springframework.context.annotation.AnnotationConfigApplicationContext@1fa121e2 2024-11-10 00:16:05.337  INFO 18784 --- [           main] o.s.i.config.IntegrationRegistrar        : No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created. 2024-11-10 00:16:05.756  INFO 18784 --- [           main] o.s.cloud.context.scope.GenericScope     : BeanFactory id=e62e62b0-2a73-3813-9f0c-5895d78d6bf5 2024-11-10 00:16:05.762  INFO 18784 --- [           main] faultConfiguringBeanFactoryPostProcessor : No bean named 'errorChannel' has been explicitly defined.  Therefore, a default PublishSubscribeChannel will be created.
Последовательность действий в канале ошибок

Код: Выделить всё

@Bean
public IntegrationFlow errorFlow() {
return IntegrationFlows.from("errorChannel")
.log(Level.TRACE, this.getClass().getName() + ".errorFlow")
// Filter multiple messages for the same input messages so that the error is only handled once
.filter(Message.class, message -> {
Message failedMessage = message;

if (failedMessage instanceof ErrorMessage &&
((ErrorMessage) failedMessage).getPayload() instanceof MessagingException) {
failedMessage = ((MessagingException) ((ErrorMessage) failedMessage).getPayload()).getFailedMessage();
}

AmqpUniqueId amqpUniqueId = new AmqpUniqueId(
(String) failedMessage.getHeaders().get("amqp_consumerTag"),
(Long) failedMessage.getHeaders().get("amqp_deliveryTag"));

return this.messageIdStore.putIfAbsent(amqpUniqueId, ZonedDateTime.now(ZoneId.of("Z"))) == null;
})
.handle(message -> {
Message failedMessage = message;

if (failedMessage instanceof ErrorMessage &&
((ErrorMessage) failedMessage).getPayload() instanceof MessagingException) {
failedMessage = ((MessagingException) ((ErrorMessage) failedMessage).getPayload()).getFailedMessage();
}
Channel channel = (Channel) failedMessage.getHeaders().get(AmqpHeaders.CHANNEL);
Long deliveryTag = (Long) failedMessage.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
OrderStatus orderStatus = OrderStatus.FAILED;

if (channel != null) {
orderStatus = OrderStatus.RETRYING;

try {
channel.basicNack(deliveryTag, false, false);
} catch (IOException exception) {
log.error("channel operation failed", exception);
}
} else {
log.error("can't access channel: " + message.toString());
}

Long orderId = getOrderIdFromFailedMessage(failedMessage);
if (orderId != null) {
try {
this.database.setOrderStatus(orderId, orderStatus.getId(), null);
} catch (SQLException exception) {
log.error("An Error occurred while updating order status", exception);
}
} else {
log.error("unable to obtain order ID: " + message.toString());
}
})
.get();
}
Мой поток

Код: Выделить всё

@Bean
public IntegrationFlow orderRequestFlow() {
return IntegrationFlow.from("OrderRequestListener-out-0")
.log(Level.TRACE, this.getClass().getName() + ".orderRequestFlow")
.enrichHeaders(h -> h.header("source", "orderRequestFlow"))
.handle((message, responseHeaders) -> {
return jsonMessageConverter.convert(message, OrderMessage.class, responseHeaders);
})
.routeToRecipients(exhaustedRetriesRouter -> exhaustedRetriesRouter
.recipientMessageSelector(GenericFlow.EXHAUSTED_RETRIES_CHANNEL,
message -> isExhaustedMessage(message)
).defaultSubFlowMapping(orderAttemptSubFlow -> orderAttemptSubFlow
.transform(orderMessage ->  {
try {
// Testing Exception Handling
if(orderMessage != null)
throw new SQLException("OrderMessage is not null");

return this.orderRequestFactory.getOrderDetails(orderMessage);
} catch (final SQLException exception) {
throw new MessagingException("Transform failed", exception);
}
})
.routeToRecipients(unableToOrderRouter -> unableToOrderRouter
Вопросы:

[*]Изменилось ли поведение или реализация errorChannel в Spring Integration 6.x?

[*]Нужно ли мне явно определять или настраивать errorChannel в последней версии?

[*]Нужны ли какие-либо дополнительные настройки, чтобы errorChannel продолжал обрабатывать исключения, не прекращая обработку сообщений?

< /ol>
Мы будем очень признательны за любые рекомендации по обработке ошибок и обеспечению согласованного поведения обработки сообщений с помощью Spring Integration 6.x.

Подробнее здесь: https://stackoverflow.com/questions/791 ... g-messages
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «JAVA»