Axon nohandlerforCommandException при использовании AxonFramework без пружинной загрузкиJAVA

Программисты JAVA общаются здесь
Ответить Пред. темаСлед. тема
Anonymous
 Axon nohandlerforCommandException при использовании AxonFramework без пружинной загрузки

Сообщение Anonymous »

Настроение на моих 2 предыдущих вопросах: < /p>

Использование Axonframework без пружины и с kafka + postgresql < /p>
< /li>
axon не может разрешить событие в событии. Следующая структура: < /p>

Бэкэнд с пружинной загрузкой, который запускает аксон без проблем, публикует события для источника кафки и т. Д. /> < /ul>
Это реализация Axonconnector с использованием kafka: < /p>

Код: Выделить всё

@Log4j2
public class KafkaPostgresAxonConnector implements AxonConnector {

private final String kafkaBootstrapServers;
private final String postgresUrl;
private final String postgresUser;
private final String postgresPassword;
private final String componentName;
private final String groupId;
private final Configurer configurer;
private final JacksonSerializer serializer;

private KafkaProducer producer;
private Configuration axonConfig;

public KafkaPostgresAxonConnector(String kafkaBootstrapServers, String postgresUrl,
String postgresUser, String postgresPassword, String componentName) {
this.kafkaBootstrapServers = kafkaBootstrapServers;
this.postgresUrl = postgresUrl;
this.postgresUser = postgresUser;
this.postgresPassword = postgresPassword;
this.componentName = componentName;
this.groupId = componentName + UUID.randomUUID();

serializer = JacksonSerializer.builder()
.defaultTyping()
.lenientDeserialization()
.build();

configurer = DefaultConfigurer.defaultConfiguration()
.configureSerializer(c -> serializer)
.configureEventSerializer(c -> serializer)
.configureMessageSerializer(c -> serializer);
}

@Override
public void start() {
DataSource dataSource = dataSource();

initializePostgres(dataSource);
initializeKafkaConsumer();
initializeKafkaProducer();

log.info("Starting AxonFramework with component: {}", componentName);

axonConfig = configurer.buildConfiguration();
axonConfig.start();
}

@Override
public void registerEventHandler(Object eventHandler) {
configurer.registerEventHandler(conf -> eventHandler);
}

@Override
public  void registerAggregate(Class aggregateType) {
configurer.configureAggregate(aggregateType);
}

@Override
public UUID sendCommand(Object command) {
if (axonConfig == null) {
throw new IllegalStateException("Axon Framework not initialized");
}

CommandGateway commandGateway = axonConfig.commandGateway();
return commandGateway.sendAndWait(command);
}

@Override
public  R sendQuery(Object query, ResponseType responseType) {
if (axonConfig == null) {
log.warn("Cannot send query - Axon Framework not initialized");
return null;
}

QueryGateway queryGateway = axonConfig.queryGateway();
return queryGateway.query(query, responseType).join();
}

@Override
public void close() {
if (producer != null) {
producer.close();
}
if (axonConfig != null) {
axonConfig.shutdown();
}
}

/**
* KAFKA
* */

private KafkaMessageConverter messageConverter() {
return DefaultKafkaMessageConverter.builder()
.serializer(serializer)
.build();
}

/**
* KAFKA PRODUCER
* */

private void initializeKafkaProducer() {
ProducerFactory producerFactory = producerFactory();
KafkaMessageConverter messageConverter = messageConverter();
KafkaPublisher kafkaPublisher = kafkaPublisher(
producerFactory,
messageConverter
);
KafkaEventPublisher kafkaEventPublisher = kafkaEventPublisher(kafkaPublisher);
registerPublisherToEventProcessor(configurer.eventProcessing(), kafkaEventPublisher);
}

public void registerPublisherToEventProcessor(
EventProcessingConfigurer eventProcessingConfigurer,
KafkaEventPublisher kafkaEventPublisher
) {
String processingGroup = KafkaEventPublisher.DEFAULT_PROCESSING_GROUP;

eventProcessingConfigurer
.registerEventHandler(configuration -> kafkaEventPublisher)
.registerSubscribingEventProcessor(processingGroup)
.usingSubscribingEventProcessors();
}

private KafkaPublisher kafkaPublisher(
ProducerFactory producerFactory,
KafkaMessageConverter  kafkaMessageConverter
) {
return KafkaPublisher.builder()
.producerFactory(producerFactory)
.messageConverter(kafkaMessageConverter)
.serializer(serializer)
.build();
}

public KafkaEventPublisher kafkaEventPublisher(
KafkaPublisher kafkaPublisher
) {
return KafkaEventPublisher.builder()
.kafkaPublisher(kafkaPublisher)
.build();
}

public ProducerFactory producerFactory() {
Map producerProps = new HashMap();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
producerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
ByteArraySerializer.class);

return DefaultProducerFactory.builder()
.configuration(producerProps)
.confirmationMode(ConfirmationMode.NONE)
.build();
}

/**
* KAFKA CONSUMER
* */

private void initializeKafkaConsumer() {
ConsumerFactory consumerFactory = consumerFactory();
Fetcher> fetcher() {
return AsyncFetcher.

Подробнее здесь: [url]https://stackoverflow.com/questions/79582395/axon-nohandlerforcommandexception-when-using-axonframework-without-spring-boot[/url]
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «JAVA»