Обработка ошибок в конвейере прямых маршрутов/seda в Apache CamelJAVA

Программисты JAVA общаются здесь
Ответить
Anonymous
 Обработка ошибок в конвейере прямых маршрутов/seda в Apache Camel

Сообщение Anonymous »

Я пытаюсь структурировать свое приложение Camel как набор многократно используемых маршрутов, соответствующих принципу единой ответственности. Моя цель — сохранить конкретные маршруты чистыми и понятными, описывая только счастливый путь, одновременно извлекая обработку общих ошибок в абстрактный BaseRouteBuilder.
Структура конвейера маршрутов
Мое приложение состоит из нескольких «типов» маршрутов, которые вызывают друг друга:
  • technical-sender

    отправитель

    процесс

    получатель

    технический получатель







Краткое описание каждого типа:
  • технический отправитель: точка входа, например. rest().post("/demo").to("direct:demo-sender") или rest().openApi("demo-v1.json")
  • отправитель: демаршалинг, проверка авторизации, базовая проверка, вызовы("direct:demo-process"), маршалинг ответа
  • процесс: отображает запрос, вызывает("direct:demo-receiver"), отображает ответ
  • receiver: устанавливает заголовки, маршалирует, добавляет авторизацию, вызывает("direct:demo-technical-receiver"), обрабатывает повторные попытки
  • технический получатель: производитель низкого уровня, например from("direct:demo-technical-receiver").to("http:...")
Каждый тип имеет свой собственный абстрактный RouteBuilder. Конкретные RouteBuilder определяют только счастливый путь (без обработки ошибок, специфичных для маршрута, если это не требуется).
Как правило, существует множество маршрутов процесса (для каждого процесса/типа сообщения), в то время как маршруты отправителя/получателя являются общими (обычно по одному на внешнюю систему), а технические маршруты являются еще более общими (обычно по одному на каждый протокол, например REST/SOAP/FTP).
Проблема
Счастливый путь работает отлично, но я борюсь с обработкой ошибок.
На любом этапе любого маршрута (проверка, сопоставление, демаршалинг и т. д.) может возникнуть исключение. Внешние вызовы также могут завершиться неудачно.
Мне нужен общий механизм обработки ошибок, который:
  • Перехватывает исключение на маршруте, где оно происходит
  • Обертывает его в «контейнерное» исключение (например, IntegrationException), используя либо:
    • оболочка по умолчанию (DefaultExceptionWrapper) или
    • обертку для конкретного маршрута (ErrorWrapper)
  • Распространяет обернутую ошибку обратно по конвейеру
  • Разрешает маршрутам дополнительно дополнять ее (например, маршрут процесса)
  • Наконец, отправитель сопоставляет ее с внешним контрактом API (код статуса HTTP + тело ответа)
Если не требуется никакой специальной обработки ошибок, я хочу, чтобы все оставалось в абстрактных базовых сборщиках, не загромождая счастливые пути блоками doTry() или choice().
Что я пробовал/почему это не удалось
Я потратил несколько дней, пробуя разные подходы (например, onException(Exception.class) и onCompletion().onFailureOnly().modeBeforeConsumer()), но меня блокируют следующие проблемы:
  • onException выполняется только один раз для каждого Exchange. Даже если я установил handled(false) и обмен возвращается к вызывающему маршруту (например, получатель → процесс), onException(Exception.class) не запускается снова.
  • Повторное создание исключения или установка Exchange.setException(...) в блоке onException всегда запускает FatalFallbackErrorHandler и завершает обработку маршрута.
  • Если я устанавливаю errorHandler(noErrorHandler()), чтобы разрешить распространение ошибки на маршрут вызывающего абонента, он также отключает onException на этом маршруте, включая повторную доставку.
  • onCompletion работает на стороне Exchange и не может использоваться для управления ответом клиенту.
Кто-нибудь может помочь?
Демо-проект
Я создал демонстрационный проект на GitHub для имитации этой структуры:

https://github.com/bvv-jmedved/error-hanling-demo
Чего хотелось бы добиться: если смоделированный HTTP 500 в техническом приемнике возвращается из внешней системы, клиент должен получить 502 Bad Gateway и это тело:
{
"errors": [
{ "code": "500", "message": "Something wrong occurred in external system" }
]
}

Код (для иллюстрации)
=== errorhanlingdemo/ErrorHanlingDemoApplication.java ===
@SpringBootApplication
public class ErrorHanlingDemoApplication {

public static void main(String[] args) {
SpringApplication.run(ErrorHanlingDemoApplication.class, args);
}

}


=== errorhanlingdemo/builder/common/BaseProcessRouteBuilder.java ===
public abstract class BaseProcessRouteBuilder extends BaseRouteBuilder {
}


=== errorhanlingdemo/builder/common/BaseReceiverRouteBuilder.java ===
public abstract class BaseReceiverRouteBuilder extends BaseRouteBuilder {
}


=== errorhanlingdemo/builder/common/BaseRouteBuilder.java ===
public abstract class BaseRouteBuilder extends RouteBuilder {

@Override
public final void configure() {

onCompletion().onFailureOnly().modeBeforeConsumer()
.log("Handling onCompletion in route: ${routeId} with body: ${body}")
.process(getFailureProcessor());

errorHandler(defaultErrorHandler()
.logStackTrace(false)
.logExhausted(false));

config();
}

protected Processor getFailureProcessor() {
return exchange -> {};
}

protected abstract void config();
}


=== errorhanlingdemo/builder/common/BaseSenderRouteBuilder.java ===
public abstract class BaseSenderRouteBuilder extends BaseRouteBuilder {
}


=== errorhanlingdemo/builder/common/BaseTechnicalReceiverRouteBuilder.java ===
public abstract class BaseTechnicalReceiverRouteBuilder extends BaseRouteBuilder {
}


=== errorhanlingdemo/builder/common/BaseTechnicalSenderRouteBuilder.java ===
public abstract class BaseTechnicalSenderRouteBuilder extends BaseRouteBuilder {
}


=== errorhanlingdemo/builder/process/DemoProcessRouteBuilder.java ===
@Component
public class DemoProcessRouteBuilder extends BaseProcessRouteBuilder {
@Override
protected void config() {

from("seda:demo-process")
.routeId("demo-process")
.log("Processor. Processing request: ${body}")
.to("seda:demo-receiver?exchangePattern=InOut")
.log("Processor. Processing response: ${body}");
}
}


=== errorhanlingdemo/builder/receiver/DemoReceiverRouteBuilder.java ===
@Component
public class DemoReceiverRouteBuilder extends BaseReceiverRouteBuilder {
@Override
protected void config() {
onException(HttpOperationFailedException.class)
.maximumRedeliveries(2).redeliveryDelay(0)
.onRedelivery( exchange -> {
System.out.println("Receiver. Getting new token");
})
.handled(false);
;

from("seda:demo-receiver")
.routeId("demo-receiver")
.log("Receiver. Preparing call with request: ${body}")
.to("direct:technicalreceiver")
.log("Receiver. Handling response: ${body}")
;
}
}


=== errorhanlingdemo/builder/sender/DemoSenderRouteBuilder.java ===
@Component
public class DemoSenderRouteBuilder extends BaseSenderRouteBuilder {

@Override
public void config() {

from("seda:demo-sender")
.routeId("demo-sender")
.log("Sender. Validating authorizationSending message: ${body}")
.log("Sender. Unmarshalling request: ${body}")
.to("seda:demo-process?exchangePattern=InOut")
.log("Sender. Marshalling response: ${body}");
}

}


=== errorhanlingdemo/builder/sender/model/DemoError.java ===
public record DemoError(
List errors
) {
public record Error(
String code,
String message
) {
}
}


=== errorhanlingdemo/builder/technicalreciever/DemoTechnicalReceiverRouteBuilder.java ===
@Component
public class DemoTechnicalReceiverRouteBuilder extends BaseTechnicalReceiverRouteBuilder {
@Override
protected void config() {
errorHandler(noErrorHandler());

from("direct:technicalreceiver")
.routeId("technicalreceiver")
.log("Technical receiver. Calling target system with message: ${body}")
.throwException(new HttpOperationFailedException(
"http://fake-receiver",
500,
"Something wrong occurred in external system",
null,
null,
"{\"Status\":\"Server Failed\"}"
))
.log("Technical receiver. Cleaning headers")
;

}
}


=== errorhanlingdemo/builder/technicalsender/DemoTechnicalSender.java ===
@Component
public class DemoTechnicalSender extends BaseTechnicalSenderRouteBuilder {
@Override
protected void config() {
rest()
.post("/demo")
.to("seda:demo-sender");
}
}


=== errorhanlingdemo/Exception/IntegrationError.java ===
public record IntegrationError(
String code,
String message
) {
}


=== errorhanlingdemo/Exception/IntegrationException.java ===
@Getter
public class IntegrationException extends RuntimeException {
private final int statusCode;
private final List errors;

public IntegrationException(
int statusCode,
String errorCode,
String message,
Exception cause) {
super(message, cause);
List integrationErrors = List.of(
new IntegrationError(errorCode, message));
this.statusCode = statusCode;
this.errors = integrationErrors;
}

public IntegrationException(
int statusCode,
List errors,
String message,
Exception cause) {
super(message, cause);
this.statusCode = statusCode;
this.errors = List.copyOf(errors);
}
}


Подробнее здесь: https://stackoverflow.com/questions/798 ... ache-camel
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «JAVA»