- Укажите подробную информацию о своей цели.
Во избежание путаницы, речь идет о SpringWolf AsyncAPI, а не SpringDoc и Swagger.
Речь идет о Reactor Kafka, а не Apache Kafka, а не Spring Kafka
Соответствующая документация показана здесь:

- Опишите, что вы пробовали
- Покажите код
Код: Выделить всё
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
4.0.0
org.springframework.boot
spring-boot-starter-parent
3.4.0-M3
org.example
springwolfreactorkafka
1.1
23
org.springframework.boot
spring-boot-starter-web
io.projectreactor.kafka
reactor-kafka
io.github.springwolf
springwolf-core
1.6.0
io.github.springwolf
springwolf-kafka
1.6.0
io.github.springwolf
springwolf-asyncapi
1.6.0
org.springframework.boot
spring-boot-maven-plugin
org.graalvm.buildtools
native-maven-plugin
confluent
https://packages.confluent.io/maven/
spring-milestones
Spring Milestones
https://repo.spring.io/milestone
false
spring-milestones
Spring Milestones
https://repo.spring.io/milestone
false
Код: Выделить всё
package org.example;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class SpringWolfReactorKafkaApplication {
public static void main(final String[] args) {
SpringApplication.run(SpringWolfReactorKafkaApplication.class);
}
}
Код: Выделить всё
package org.example;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import reactor.kafka.receiver.KafkaReceiver;
import reactor.kafka.receiver.ReceiverOptions;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class SpringWolfReactorKafkaConsumerConfiguration {
@Bean
public KafkaReceiver kafkaReceiver() {
final Map properties = new HashMap();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "locahost:9092");
properties.put(ConsumerConfig.CLIENT_ID_CONFIG, "someidentifier");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "someidentifier");
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
final ReceiverOptions receiverOptions = ReceiverOptions.create(properties);
return KafkaReceiver.create(receiverOptions.subscription(Collections.singleton("the_topic")));
}
}
Код: Выделить всё
package org.example;
import io.github.springwolf.core.asyncapi.annotations.AsyncListener;
import io.github.springwolf.core.asyncapi.annotations.AsyncOperation;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono;
import reactor.kafka.receiver.KafkaReceiver;
@Service
public final class SpringWolfReactorKafkaService implements CommandLineRunner {
private final KafkaReceiver kafkaReceiver;
public SpringWolfReactorKafkaService(final KafkaReceiver receiver) {
this.kafkaReceiver = receiver;
}
@Override
@AsyncListener(operation = @AsyncOperation(channelName = "the_topic"))
public void run(final String... args) {
kafkaReceiver.receiveAutoAck().concatMap(message -> message).flatMap(this::processMessages).subscribe();
}
private Mono processMessages(final ConsumerRecord consumerRecord) {
return Mono.just(consumerRecord).map(oneMessage -> oneMessage.value().toUpperCase());
}
}
Код: Выделить всё
springwolf.payload.extractable-classes.reactor.kafka.receiver.ReceiverRecord=1
springwolf.docket.base-package=org.example
springwolf.docket.info.title=${spring.application.name}
springwolf.docket.info.version=1.6.0
springwolf.docket.servers.kafka-server.protocol=kafka
springwolf.docket.servers.kafka-server.host=localhost:9092
- Опишите ожидаемое
- Опишите фактические результаты
- Вопрос
Подробнее здесь: https://stackoverflow.com/questions/790 ... ation-page