Чтобы обеспечить некоторый контекст, я создал абстракцию над Consumer для обработки шаблонного кода, специально для настройки MDC и централизованной обработки ошибок.
Вот мой основной интерфейс:
Код: Выделить всё
public interface StreamConsumer extends Consumer {
void process(MessageHeaders headers, IN input) throws Exception;
default void setupLogging(MessageHeaders headers, IN input) {}
default void onError(MessageHeaders headers, IN input, Exception e) {}
}
Код: Выделить всё
@Slf4j
public abstract class AbstractConsumer implements StreamConsumer {
@Override
public void accept(Message inMessage) {
IN payload = inMessage.getPayload();
MessageHeaders headers = inMessage.getHeaders();
log.debug("Received message: {}", payload);
log.debug("Received headers: {}", headers);
try {
setupLogging(headers, payload);
process(headers, payload);
} catch (Exception e) {
log.error("Error processing message: {}", payload, e);
onError(headers, payload, e);
throw new RuntimeException(e);
} finally {
MDC.clear();
}
}
}
Код: Выделить всё
@Slf4j
@Component
public class LogTaskConsumer extends AbstractConsumer {
@Override
public void process(MessageHeaders headers, Task input) {
// ... business logic
}
@Override
public void setupLogging(MessageHeaders headers, Task input) {
//MDC setup example...
super.setupLogging(headers, input);
Object id = headers.get("id");
if (id != null) {
MDC.put("id", id.toString());
}
}
}
Код: Выделить всё
spring:
cloud:
function:
definition: logTaskConsumer;...others
stream:
default-binder: rabbit
binding-retry-interval: 3
default:
group: ${spring.application.name}
content-type: application/json
consumer:
concurrency: 5
bindings:
logTaskConsumer-in-0:
destination: xorch_task_log_2
consumer:
max-attempts: 5
Интеграционные тесты пройдены, сообщение доходит до потребителя и обрабатывается правильно. Функционально все работает как положено. Однако при запуске я вижу следующее предупреждение в журналах:
-2025-12-27 03:07:48.844 [] - WARN 26452 --- [ restartedMain] c.f.c.c.BeanFactoryAwareFunctionRegistry: Не удалось найти функцию
'logTaskConsumer' для функции определение 'logTaskConsumer'. Возвращает
null.
Я не получаю это предупреждение, когда реализую потребителю «традиционный» функциональный способ (с использованием стандартного @Bean).
Основываясь на моих исследованиях, это похоже на то, что прокси-серверы Spring не позволяют Spring Cloud Stream правильно определить тип компонента или найти его в реестре, хотя он работает во время выполнения из-за резервных механизмов.
Мой вопрос: как устранить это предупреждение? Мне бы очень хотелось сохранить эту абстракцию, поскольку моя команда считает ее удобной и интуитивно понятной. Однако, если вы считаете, что этот шаблон не рекомендуется для Spring Cloud Stream, какой альтернативный подход будет лучшим?
Подробнее здесь: https://stackoverflow.com/questions/798 ... stom-consu
Мобильная версия