Используя functionRouter, определенный в моем application-test.yml, я смог получить сообщение из темы и направить его через MessageRoutingCallBack к двум различным bean-компонентам, фильтруя по значению заголовка:
< Strong>application-test.yml
Код: Выделить всё
spring:
cloud:
function.definition: functionRouter
stream:
function.routing.enabled: true
bindings:
functionRouter-in-0:
content-type: application/json
destination: ${CLOUD_TOPIC:Example.Input}
group: ${CLOUD_STREAM_BINDINGS_INPUT_GROUPID:getExampleConsumerGroup}
Код: Выделить всё
@Configuration
@Slf4j
public class MessageRoutingConfig {
private static final Map routingFunctions =
Map.of( "CONS_A", "consumerA",
"CONS_B", "consumerB");
@Bean
public MessageRoutingCallback routingCallback() {
return new MessageRoutingCallback() {
@Override
public String routingResult(Message message) {
String messageType = (String) message.getHeaders().get(KAFKA_HEADER_MSG_TYPE);
return routingFunctions.get(messageType);
}
};
}
}
Ниже приведен мой текущий реализация взята в качестве примера из документации Spring и адаптирована.
application-test.yml
Код: Выделить всё
spring:
cloud:
function.definition: functionRouter;mySpecialRouter
stream:
function.routing.enabled: true
bindings:
functionRouter-in-0:
content-type: application/json
destination: ${CLOUD_TOPIC:Example.Input}
group: ${CLOUD_STREAM_BINDINGS_INPUT_GROUPID:getExampleConsumerGroup}
mySpecialRouter-in-0:
content-type: application/json
destination: ${CLOUD_TOPIC_NEW:Example.Input2}
group: ${CLOUD_STREAM_BINDINGS_INPUT_GROUPID:getExampleConsumerGroup}
Код: Выделить всё
@Configuration
public class MultipleRouterConfiguration2 {
@Bean
RoutingFunction mySpecialRouter(
FunctionCatalog functionCatalog,
BeanFactory beanFactory,
@Nullable MessageRoutingCallback routingCallback) {
Map propertiesMap = new HashMap();
// Implement correctly propertiesMap with definition or routing-expression
return new RoutingFunction(
functionCatalog, propertiesMap, new BeanFactoryResolver(beanFactory), routingCallback);
}
}
Подробнее здесь: https://stackoverflow.com/questions/790 ... routingfun