Каков наилучший способ для нижестоящего компонента в IntegrationFlow, когда он получает агрегированные сообщения, узнать, сообщения представляют собой полную группу или часть?
Вот упрощенный пример:
Код: Выделить всё
@Autowired
PetGroupHandler petGroupHandler;
@Bean
public JdbcMessageStore jdbcMessageStore(
@Qualifier("mydbDataSource") DataSource dataSource) {
JdbcMessageStore messageStore = new JdbcMessageStore(dataSource);
messageStore.setRegion("petstore-pubsub");
return messageStore;
}
@Bean
IntegrationFlow petStoreSubscriptionFlow(JdbcMessageStore jdbcMessageStore, PetOutputProcessor petOutputProcessor) {
return IntegrationFlow.from("petStoreSubscriptionMessageChannel")
.filter(petOfInterestFilter, "shouldProcess")
.aggregate(aggregatorSpec -> aggregatorSpec
.messageStore(jdbcMessageStore)
.outputProcessor(petOutputProcessor)
.expireGroupsUponCompletion(true)
.groupTimeout(300 * 1000) // 5 minutes
.sendPartialResultOnExpiry(true) // send partial group
.correlationStrategy(message -> ((Pet) message.getPayload()).getBreed())
.releaseStrategy(group -> group.size()>=2))
.handle(petGroupHandler, "handle")
.get();
Мы попытались реализовать OutputProcessor, который вводит логический заголовок, указывающий, является ли группа полной. Но это не работает, поскольку похоже, что группа не настроена на завершение в AbstractCorrelatingMessageHandler.java, сразу после вызова стратегии выпуска и перед вызовом OutputProcessor в CompleteGroup() метод.
Код: Выделить всё
if (this.releaseStrategy.canRelease(messageGroup)) {
Collection
Подробнее здесь: [url]https://stackoverflow.com/questions/79344474/spring-integration-aggregator-pattern-a-way-for-downstream-component-to-know[/url]