Причины запуска первого задания: у диспетчера нет подписчиков для исключения канала при объявлении нескольких DirectChanJAVA

Программисты JAVA общаются здесь
Ответить Пред. темаСлед. тема
Anonymous
 Причины запуска первого задания: у диспетчера нет подписчиков для исключения канала при объявлении нескольких DirectChan

Сообщение Anonymous »

У меня есть приложение Spring Batch, которое успешно выполняет весенние пакетные задания, но у меня возникает исключение при объявлении нескольких DirectChannels.
Исключение возникает, когда я запускаю «firstJob». Вот исключение:

Код: Выделить всё

`Caused by: org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'IFRSGoodBookService-1.secondReplies'.
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:76)
at org.springframework.integration.channel.AbstractMessageChannel.sendInternal(AbstractMessageChannel.java:375)
at org.springframework.integration.channel.AbstractMessageChannel.sendWithMetrics(AbstractMessageChannel.java:346)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:326)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:499)
at org.springframework.integration.handler.AbstractMessageProducingHandler.doProduceOutput(AbstractMessageProducingHandler.java:354)
at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:283)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:247)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:142)
at org.springframework.integration.handler.AbstractMessageHandler.doHandleMessage(AbstractMessageHandler.java:105)
at org.springframework.integration.handler.AbstractMessageHandler.handleWithMetrics(AbstractMessageHandler.java:90)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:70)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
at org.springframework.integration.channel.AbstractMessageChannel.sendInternal(AbstractMessageChannel.java:375)
at org.springframework.integration.channel.AbstractMessageChannel.sendWithMetrics(AbstractMessageChannel.java:346)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:326)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:299)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.endpoint.MessageProducerSupport.lambda$sendMessage$1(MessageProducerSupport.java:262)
at io.micrometer.observation.Observation.lambda$observe$0(Observation.java:493)
at io.micrometer.observation.Observation.observeWithContext(Observation.java:603)
at io.micrometer.observation.Observation.observe(Observation.java:492)
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:262)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.access$200(AmqpInboundChannelAdapter.java:69)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.createAndSend(AmqpInboundChannelAdapter.java:397)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.onMessage(AmqpInboundChannelAdapter.java:360)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1663)
...  14 common frames omitted
Caused by: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:139)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
...  50 common frames omitted`
Это мой класс FlowConfig, который объявляет каналы:

Код: Выделить всё

@Configuration
@RequiredArgsConstructor
class FlowConfig {

private final QueueConfig queueConfig;

@Bean
public DirectChannel firstRequests() {
return new DirectChannel();
}

@Bean
public DirectChannel firstReplies() {
return new DirectChannel();
}

@Bean
public DirectChannel secondRequests() {
return new DirectChannel();
}

@Bean
public DirectChannel secondReplies() {
return new DirectChannel();
}

@Bean("secondManagerInBoundFlow")
@Profile("manager")
public IntegrationFlow secondManagerInBoundFlow() {
return queueConfig.getInboundAdapter(true, secondReplies());
}

@Bean("secondWorkerInBoundFlow")
@Profile("worker")
public IntegrationFlow secondInBoundFlow() {
return queueConfig.getInboundAdapter(false, secondRequests());
}

@Bean("secondManagerOutboundFlow")
@Profile("manager")
public IntegrationFlow secondManagerOutboundFlow() {
return queueConfig.getOutboundAdapter(true, secondRequests());
}

@Bean("secondWorkerOutboundFlow")
@Profile("worker")
public IntegrationFlow secondWorkerOutboundFlow() {
return queueConfig.getOutboundAdapter(false, secondReplies());
}

@Bean("firstManagerInBoundFlow")
@Profile("manager")
public IntegrationFlow firstManagerInBoundFlow() {
return queueConfig.getInboundAdapter(true, firstReplies());
}

@Bean("firstWorkerInBoundFlow")
@Profile("worker")
public IntegrationFlow firstWorkerInBoundFlow() {
return queueConfig.getInboundAdapter(false, firstRequests());
}

@Bean("firstManagerOutboundFlow")
@Profile("manager")
public IntegrationFlow firstManagerOutboundFlow() {
return queueConfig.getOutboundAdapter(true, firstRequests());
}

@Bean("firstWorkerOutboundFlow")
@Profile("worker")
public IntegrationFlow firstWorkerOutboundFlow() {
return queueConfig.getOutboundAdapter(false, firstReplies());
}
}
Это реализация входящего и исходящего адаптеров:

Код: Выделить всё

@Configuration
@ConditionalOnProperty("spring.rabbitmq.enabled")
@RequiredArgsConstructor
public class RabbitMqQueueConfig implements QueueConfig {

private final ConnectionFactory connectionFactory;
private final RabbitTemplate defaultRabbitTemplate;
private final QueueConstants queueConstants;

@Override
public IntegrationFlow getInboundAdapter(boolean isManager, DirectChannel channel) {
String queueName = isManager ? queueConstants.getConstantWithPrefix(QueueConstants.JOB_REPLIES_QUEUE)
: queueConstants.getConstantWithPrefix(QueueConstants.JOB_REQUESTS_QUEUE);
return IntegrationFlow.from(Amqp.inboundAdapter(connectionFactory, queueName)).channel(channel).get();
}

@Override
public IntegrationFlow getOutboundAdapter(boolean isManager, DirectChannel channel) {
String queueName = isManager ? queueConstants.getConstantWithPrefix(QueueConstants.JOB_REQUESTS_QUEUE)
:  queueConstants.getConstantWithPrefix(QueueConstants.JOB_REPLIES_QUEUE);
AmqpOutboundChannelAdapterSpec messageHandlerSpec = Amqp.outboundAdapter(defaultRabbitTemplate).routingKey(queueName);
return IntegrationFlow.from(channel).handle(messageHandlerSpec).get();
}
}
Это JobManagerConfiguration

Код: Выделить всё

@Configuration
@Profile("manager")
@EnableBatchIntegration
@AllArgsConstructor
public class JobManagerPartitionConfiguration {

private final JobRepository jobRepository;
private final RemotePartitioningManagerStepBuilderFactory managerStepBuilderFactory;
private PlatformTransactionManager transactionManager;
private DeleteDataTasklet deleteDataTasklet;
private InstDataLoader instDataLoader;
private final ApplicationProperties appProperties;
private final DirectChannel firstRequests;
private final DirectChannel firstReplies;
private final DirectChannel secondRequests;
private final DirectChannel secondReplies;
private final ManagerJobListener managerJobListener;
private final IdBoundaryPartitioner idBoundaryPartitioner;
private final ContextService contextService;

@Bean
public Step firstJobManagerStep() {
return managerStepBuilderFactory.get("firstJobManagerStep")
.partitioner("remotefirstJobAsStep", idBoundaryPartitioner)
.gridSize(appProperties.getJobParameters().getGridSize())
.outputChannel(firstRequests)
.inputChannel(firstReplies)
.listener(new SyncStepContextWithJob())
.build();
}

@Bean
public Step secondJobManagerStep() {
return managerStepBuilderFactory.get("secondJobManagerStep")
.partitioner("remoteSecondJobAsStep", idBoundaryPartitioner)
.gridSize(appProperties.getJobParameters().getGridSize())
.outputChannel(secondRequests)
.inputChannel(secondReplies)
.listener(new SyncStepContextWithJob())
.build();
}

@Bean
public Job secondJob(Step secondJobManagerStep) {
return new JobBuilder("secondJob", jobRepository).incrementer(new RunIdIncrementer())
.start(instDataLoaderStep())
.next(deleteTable())
.next(secodJobManagerStep)
.listener(contextService)
.listener(managerJobListener)
.build();
}

@Bean
public Job firstJob(Step firstJobManagerStep) {
return new JobBuilder("firstJob", jobRepository).incrementer(new RunIdIncrementer())
.start(instDataLoaderStep())
.next(deleteTable())
.next(firstJobManagerStep)
.listener(contextService)
.listener(managerJobListener)
.build();
}
Это моя конфигурация JobWorker:

Код: Выделить всё

@Configuration
@Profile("worker")
@EnableBatchIntegration
@AllArgsConstructor
@Slf4j
public class JobWorkerPartitionConfiguration {

private RemotePartitioningWorkerStepBuilderFactory workerStepBuilderFactory;
private JobRepository jobRepository;
private JobCache cache;
private CacheJobListener  jobListener;
private WorkerJobListener workerJobListener;
private StepMonitoringListener monitoringListener;
private ApplicationProperties appProperties;
private InstDataLoader instDataLoader;
private ContractLoader contractLoader;
private CacheItemWriter cacheItemWriter;
private PlatformTransactionManager transactionManager;
private InstDataJpaCache instDataJpaCache;
private ContextService contextService;
private MonitorService monitorService;

@Bean
@StepScope
Job remoteFirstJob(NamedParameterJdbcTemplate jdbcTemplate) {
return new JobBuilder("remoteFirstJob", jobRepository).start(instDataLoaderStep())
.next(contractLoaderTaskletStep())
.next(calculateStep())
.listener(contextService)
.listener(jobListener)
.listener(workerJobListener)
.listener(monitoringListener)
// .listener(new SyncStepContextWithJob(this.monitorService))
.build();
}

@Bean
public Step remoteFirstJobAsStep(
DirectChannel firstRequests,
DirectChannel firstReplies

) {
return workerStepBuilderFactory.get("remoteFirstJobAsStep")
.inputChannel(firstRequests)
.outputChannel(firstReplies)
.parametersExtractor(remoteJobParametersExtractor())
.listener(new SyncStepContextWithJob())
.build();
}

Дело в том, почему возникает такое исключение, когда я запускаю первое задание. Он не должен заботиться о вторых ответах, поскольку мои примеры в классе JobManagerPartitionConfiguration определены для firstJob inputChannel = "firstReplies" и outputChannel = "firstRequests", так что это означает, что он должен использовать эти каналы, а не конфигурации второго канала.

Подробнее здесь: https://stackoverflow.com/questions/782 ... -exception
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение
  • У диспетчера нет подписчиков для канала 'inknown.channel.name
    Anonymous » » в форуме JAVA
    0 Ответы
    14 Просмотры
    Последнее сообщение Anonymous
  • У диспетчера нет подписчиков для канала 'inknown.channel.name
    Anonymous » » в форуме JAVA
    0 Ответы
    3 Просмотры
    Последнее сообщение Anonymous
  • У диспетчера нет подписчиков на канал unknown.channel.name.
    Anonymous » » в форуме JAVA
    0 Ответы
    19 Просмотры
    Последнее сообщение Anonymous
  • Pandas представляет терминаторы строк через to_csv без причины или причины
    Anonymous » » в форуме Python
    0 Ответы
    34 Просмотры
    Последнее сообщение Anonymous
  • Pandas представляет терминаторы строк через to_csv без причины или причины
    Anonymous » » в форуме Linux
    0 Ответы
    28 Просмотры
    Последнее сообщение Anonymous

Вернуться в «JAVA»