Использование Axonframework без пружины и с kafka + postgresql < /p>
< /li>
axon не может разрешить событие в событии. Следующая структура: < /p>
Бэкэнд с пружинной загрузкой, который запускает аксон без проблем, публикует события для источника кафки и т. Д. /> < /ul>
Это реализация Axonconnector с использованием kafka: < /p>
Код: Выделить всё
@Log4j2
public class KafkaPostgresAxonConnector implements AxonConnector {
private final String kafkaBootstrapServers;
private final String postgresUrl;
private final String postgresUser;
private final String postgresPassword;
private final String componentName;
private final String groupId;
private final Configurer configurer;
private final JacksonSerializer serializer;
private KafkaProducer producer;
private Configuration axonConfig;
public KafkaPostgresAxonConnector(String kafkaBootstrapServers, String postgresUrl,
String postgresUser, String postgresPassword, String componentName) {
this.kafkaBootstrapServers = kafkaBootstrapServers;
this.postgresUrl = postgresUrl;
this.postgresUser = postgresUser;
this.postgresPassword = postgresPassword;
this.componentName = componentName;
this.groupId = componentName + UUID.randomUUID();
serializer = JacksonSerializer.builder()
.defaultTyping()
.lenientDeserialization()
.build();
configurer = DefaultConfigurer.defaultConfiguration()
.configureSerializer(c -> serializer)
.configureEventSerializer(c -> serializer)
.configureMessageSerializer(c -> serializer);
}
@Override
public void start() {
DataSource dataSource = dataSource();
initializePostgres(dataSource);
initializeKafkaConsumer();
initializeKafkaProducer();
log.info("Starting AxonFramework with component: {}", componentName);
axonConfig = configurer.buildConfiguration();
axonConfig.start();
}
@Override
public void registerEventHandler(Object eventHandler) {
configurer.registerEventHandler(conf -> eventHandler);
}
@Override
public void registerAggregate(Class aggregateType) {
configurer.configureAggregate(aggregateType);
}
@Override
public UUID sendCommand(Object command) {
if (axonConfig == null) {
throw new IllegalStateException("Axon Framework not initialized");
}
CommandGateway commandGateway = axonConfig.commandGateway();
return commandGateway.sendAndWait(command);
}
@Override
public R sendQuery(Object query, ResponseType responseType) {
if (axonConfig == null) {
log.warn("Cannot send query - Axon Framework not initialized");
return null;
}
QueryGateway queryGateway = axonConfig.queryGateway();
return queryGateway.query(query, responseType).join();
}
@Override
public void close() {
if (producer != null) {
producer.close();
}
if (axonConfig != null) {
axonConfig.shutdown();
}
}
/**
* KAFKA
* */
private KafkaMessageConverter messageConverter() {
return DefaultKafkaMessageConverter.builder()
.serializer(serializer)
.build();
}
/**
* KAFKA PRODUCER
* */
private void initializeKafkaProducer() {
ProducerFactory producerFactory = producerFactory();
KafkaMessageConverter messageConverter = messageConverter();
KafkaPublisher kafkaPublisher = kafkaPublisher(
producerFactory,
messageConverter
);
KafkaEventPublisher kafkaEventPublisher = kafkaEventPublisher(kafkaPublisher);
registerPublisherToEventProcessor(configurer.eventProcessing(), kafkaEventPublisher);
}
public void registerPublisherToEventProcessor(
EventProcessingConfigurer eventProcessingConfigurer,
KafkaEventPublisher kafkaEventPublisher
) {
String processingGroup = KafkaEventPublisher.DEFAULT_PROCESSING_GROUP;
eventProcessingConfigurer
.registerEventHandler(configuration -> kafkaEventPublisher)
.registerSubscribingEventProcessor(processingGroup)
.usingSubscribingEventProcessors();
}
private KafkaPublisher kafkaPublisher(
ProducerFactory producerFactory,
KafkaMessageConverter kafkaMessageConverter
) {
return KafkaPublisher.builder()
.producerFactory(producerFactory)
.messageConverter(kafkaMessageConverter)
.serializer(serializer)
.build();
}
public KafkaEventPublisher kafkaEventPublisher(
KafkaPublisher kafkaPublisher
) {
return KafkaEventPublisher.builder()
.kafkaPublisher(kafkaPublisher)
.build();
}
public ProducerFactory producerFactory() {
Map producerProps = new HashMap();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
producerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
ByteArraySerializer.class);
return DefaultProducerFactory.builder()
.configuration(producerProps)
.confirmationMode(ConfirmationMode.NONE)
.build();
}
/**
* KAFKA CONSUMER
* */
private void initializeKafkaConsumer() {
ConsumerFactory consumerFactory = consumerFactory();
Fetcher> fetcher() {
return AsyncFetcher.
Подробнее здесь: [url]https://stackoverflow.com/questions/79582395/axon-nohandlerforcommandexception-when-using-axonframework-without-spring-boot[/url]