Опросчик для отправки сообщений в привязку КафкиJAVA

Программисты JAVA общаются здесь
Ответить
Anonymous
 Опросчик для отправки сообщений в привязку Кафки

Сообщение Anonymous »

Приведенный ниже код с использованием потока Spring Cloud Kafka работает нормально, но все должно быть в этом одном методе.
@SpringBootApplication
public class DemoApplication {

public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, "--spring.cloud.stream.bindings.outbound-out-0.destination=out-topic");
}
@Bean
Supplier outbound() {
return () -> {
return LocalTime.now().toString();
};
}
}

Как мне написать IntegrationFlow, чтобы он делал то же самое, использовал исходящую связку и позволял добавлять преобразователи и т. д.? Код ниже выдает ошибку: MessageDispatchingException: у диспетчера нет подписчиков
Класс приложения:
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.Pollers;
import java.time.LocalTime;

@SpringBootApplication
public class DemoApplication {

public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, "--spring.cloud.stream.bindings.outbound-out-0.destination=out-topic");
}
@Bean
IntegrationFlow myFlow() {
return IntegrationFlow.fromSupplier(this::myPoller, p -> p.poller(Pollers.fixedDelay(5000)))
.transform(m -> {
System.out.println("my transformer");
return m;
})
.channel("outbound")
.get();
}

String myPoller() {
return LocalTime.now() + " value";
}
}

Конфигурация Gradle:
build.gradle:
plugins {
id 'java'
id 'org.springframework.boot' version '3.2.9'
id 'io.spring.dependency-management' version '1.1.6'
}

group = 'com.example'
version = '0.0.1-SNAPSHOT'

java {
toolchain {
languageVersion = JavaLanguageVersion.of(21)
}
}

repositories {
mavenCentral()
}

ext {
set('springCloudVersion', "2023.0.3")
}

dependencies {
implementation 'org.springframework.cloud:spring-cloud-stream'
implementation 'org.springframework.cloud:spring-cloud-stream-binder-kafka'
implementation 'org.springframework.kafka:spring-kafka'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'org.springframework.cloud:spring-cloud-stream-test-binder'
testImplementation 'org.springframework.kafka:spring-kafka-test'
testRuntimeOnly 'org.junit.platform:junit-platform-launcher'
}

dependencyManagement {
imports {
mavenBom "org.springframework.cloud:spring-cloud-dependencies:${springCloudVersion}"
}
}

tasks.named('test') {
useJUnitPlatform()
}


Подробнее здесь: https://stackoverflow.com/questions/790 ... ka-binding
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «JAVA»