Структура конвейера маршрутов
Мое приложение состоит из нескольких «типов» маршрутов, которые вызывают друг друга:
- technical-sender
отправитель
процесс
получатель
технический получатель
Краткое описание каждого типа:
- технический отправитель: точка входа, например. rest().post("/demo").to("direct:demo-sender") или rest().openApi("demo-v1.json")
- отправитель: демаршалинг, проверка авторизации, базовая проверка, вызовы("direct:demo-process"), маршалинг ответа
- процесс: отображает запрос, вызывает("direct:demo-receiver"), отображает ответ
- receiver: устанавливает заголовки, маршалирует, добавляет авторизацию, вызывает("direct:demo-technical-receiver"), обрабатывает повторные попытки
- технический получатель: производитель низкого уровня, например from("direct:demo-technical-receiver").to("http:...")
Как правило, существует множество маршрутов процесса (для каждого процесса/типа сообщения), в то время как маршруты отправителя/получателя являются общими (обычно по одному на внешнюю систему), а технические маршруты являются еще более общими (обычно по одному на каждый протокол, например REST/SOAP/FTP).
Проблема
Счастливый путь работает отлично, но я борюсь с обработкой ошибок.
На любом этапе любого маршрута (проверка, сопоставление, демаршалинг и т. д.) может возникнуть исключение. Внешние вызовы также могут завершиться неудачно.
Мне нужен общий механизм обработки ошибок, который:
- Перехватывает исключение на маршруте, где оно происходит
- Обертывает его в «контейнерное» исключение (например, IntegrationException), используя либо:
- оболочка по умолчанию (DefaultExceptionWrapper) или
- обертку для конкретного маршрута (ErrorWrapper)
- Распространяет обернутую ошибку обратно по конвейеру
- Разрешает маршрутам дополнительно дополнять ее (например, маршрут процесса)
- Наконец, отправитель сопоставляет ее с внешним контрактом API (код статуса HTTP + тело ответа)
Что я пробовал/почему это не удалось
Я потратил несколько дней, пробуя разные подходы (например, onException(Exception.class) и onCompletion().onFailureOnly().modeBeforeConsumer()), но меня блокируют следующие проблемы:
- onException выполняется только один раз для каждого Exchange. Даже если я установил handled(false) и обмен возвращается к вызывающему маршруту (например, получатель → процесс), onException(Exception.class) не запускается снова.
- Повторное создание исключения или установка Exchange.setException(...) в блоке onException всегда запускает FatalFallbackErrorHandler и завершает обработку маршрута.
- Если я устанавливаю errorHandler(noErrorHandler()), чтобы разрешить распространение ошибки на маршрут вызывающего абонента, он также отключает onException на этом маршруте, включая повторную доставку.
- onCompletion работает на стороне Exchange и не может использоваться для управления ответом клиенту.
Демо-проект
Я создал демонстрационный проект на GitHub для имитации этой структуры:
https://github.com/bvv-jmedved/error-hanling-demo
Чего хотелось бы добиться: если смоделированный HTTP 500 в техническом приемнике возвращается из внешней системы, клиент должен получить 502 Bad Gateway и это тело:
{
"errors": [
{ "code": "500", "message": "Something wrong occurred in external system" }
]
}
Код (для иллюстрации)
=== errorhanlingdemo/ErrorHanlingDemoApplication.java ===
@SpringBootApplication
public class ErrorHanlingDemoApplication {
public static void main(String[] args) {
SpringApplication.run(ErrorHanlingDemoApplication.class, args);
}
}
=== errorhanlingdemo/builder/common/BaseProcessRouteBuilder.java ===
public abstract class BaseProcessRouteBuilder extends BaseRouteBuilder {
}
=== errorhanlingdemo/builder/common/BaseReceiverRouteBuilder.java ===
public abstract class BaseReceiverRouteBuilder extends BaseRouteBuilder {
}
=== errorhanlingdemo/builder/common/BaseRouteBuilder.java ===
public abstract class BaseRouteBuilder extends RouteBuilder {
@Override
public final void configure() {
onCompletion().onFailureOnly().modeBeforeConsumer()
.log("Handling onCompletion in route: ${routeId} with body: ${body}")
.process(getFailureProcessor());
errorHandler(defaultErrorHandler()
.logStackTrace(false)
.logExhausted(false));
config();
}
protected Processor getFailureProcessor() {
return exchange -> {};
}
protected abstract void config();
}
=== errorhanlingdemo/builder/common/BaseSenderRouteBuilder.java ===
public abstract class BaseSenderRouteBuilder extends BaseRouteBuilder {
}
=== errorhanlingdemo/builder/common/BaseTechnicalReceiverRouteBuilder.java ===
public abstract class BaseTechnicalReceiverRouteBuilder extends BaseRouteBuilder {
}
=== errorhanlingdemo/builder/common/BaseTechnicalSenderRouteBuilder.java ===
public abstract class BaseTechnicalSenderRouteBuilder extends BaseRouteBuilder {
}
=== errorhanlingdemo/builder/process/DemoProcessRouteBuilder.java ===
@Component
public class DemoProcessRouteBuilder extends BaseProcessRouteBuilder {
@Override
protected void config() {
from("seda:demo-process")
.routeId("demo-process")
.log("Processor. Processing request: ${body}")
.to("seda:demo-receiver?exchangePattern=InOut")
.log("Processor. Processing response: ${body}");
}
}
=== errorhanlingdemo/builder/receiver/DemoReceiverRouteBuilder.java ===
@Component
public class DemoReceiverRouteBuilder extends BaseReceiverRouteBuilder {
@Override
protected void config() {
onException(HttpOperationFailedException.class)
.maximumRedeliveries(2).redeliveryDelay(0)
.onRedelivery( exchange -> {
System.out.println("Receiver. Getting new token");
})
.handled(false);
;
from("seda:demo-receiver")
.routeId("demo-receiver")
.log("Receiver. Preparing call with request: ${body}")
.to("direct:technicalreceiver")
.log("Receiver. Handling response: ${body}")
;
}
}
=== errorhanlingdemo/builder/sender/DemoSenderRouteBuilder.java ===
@Component
public class DemoSenderRouteBuilder extends BaseSenderRouteBuilder {
@Override
public void config() {
from("seda:demo-sender")
.routeId("demo-sender")
.log("Sender. Validating authorizationSending message: ${body}")
.log("Sender. Unmarshalling request: ${body}")
.to("seda:demo-process?exchangePattern=InOut")
.log("Sender. Marshalling response: ${body}");
}
}
=== errorhanlingdemo/builder/sender/model/DemoError.java ===
public record DemoError(
List errors
) {
public record Error(
String code,
String message
) {
}
}
=== errorhanlingdemo/builder/technicalreciever/DemoTechnicalReceiverRouteBuilder.java ===
@Component
public class DemoTechnicalReceiverRouteBuilder extends BaseTechnicalReceiverRouteBuilder {
@Override
protected void config() {
errorHandler(noErrorHandler());
from("direct:technicalreceiver")
.routeId("technicalreceiver")
.log("Technical receiver. Calling target system with message: ${body}")
.throwException(new HttpOperationFailedException(
"http://fake-receiver",
500,
"Something wrong occurred in external system",
null,
null,
"{\"Status\":\"Server Failed\"}"
))
.log("Technical receiver. Cleaning headers")
;
}
}
=== errorhanlingdemo/builder/technicalsender/DemoTechnicalSender.java ===
@Component
public class DemoTechnicalSender extends BaseTechnicalSenderRouteBuilder {
@Override
protected void config() {
rest()
.post("/demo")
.to("seda:demo-sender");
}
}
=== errorhanlingdemo/Exception/IntegrationError.java ===
public record IntegrationError(
String code,
String message
) {
}
=== errorhanlingdemo/Exception/IntegrationException.java ===
@Getter
public class IntegrationException extends RuntimeException {
private final int statusCode;
private final List errors;
public IntegrationException(
int statusCode,
String errorCode,
String message,
Exception cause) {
super(message, cause);
List integrationErrors = List.of(
new IntegrationError(errorCode, message));
this.statusCode = statusCode;
this.errors = integrationErrors;
}
public IntegrationException(
int statusCode,
List errors,
String message,
Exception cause) {
super(message, cause);
this.statusCode = statusCode;
this.errors = List.copyOf(errors);
}
}
Подробнее здесь: https://stackoverflow.com/questions/798 ... ache-camel
Мобильная версия