Теперь я пытаюсь перейти на новую модель функционального программирования потока Spring Cloud и из документации я пришел к выводу, что StreamBridge с внешними исходными данными — это подход, необходимый для моего случая. (https://docs.spring.io/spring-cloud-str ... en_sources). Однако я не понимал, как правильно настроить источник, имя привязки и тему назначения с точки зрения соглашения об именах, когда функция источника не определена. У меня есть следующая конфигурация, которая успешно выдает сообщения на «myFooTopic», но я заметил некоторые странные журналы при запуске приложения, поскольку привязка, похоже, выполнена неправильно:
application.properties:
Код: Выделить всё
spring.kafka.bootstrap-servers=listOfServers for kafka
spring.kafka.producer.value-serializer= MyCustomKafkaPayloadAvroSerializer
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.properties.schema.registry.url=listOfServers for schema registry
spring.cloud.stream.bindings.user.destination=myUserTopic
spring.cloud.stream.bindings.user.producer.useNativeEncoding=true
spring.cloud.stream.bindings.user.producer.partitionCount=1
spring.cloud.stream.bindings.user.producer.partitionKeyExpression=headers['partitionId']
spring.cloud.stream.kafka.binder.autoCreateTopics=false
spring.cloud.stream.kafka.binder.configuration.security.protocol=SSL
spring.cloud.stream.kafka.binder.configuration.ssl.truststore.location=
spring.cloud.stream.kafka.binder.configuration.ssl.truststore.type=
spring.cloud.stream.kafka.binder.configuration.ssl.keystore.location=
spring.cloud.stream.kafka.binder.configuration.ssl.keystore.type=
Код: Выделить всё
@Component
@RequiredArgsConstructor
@Slf4j
public class EventPublisher {
private final StreamBridge streamBridge;
private static final String USER = "user";
public void sendToChannel(String message) {
log.info("sendToChannel - sending to channel hello event");
try {
if (streamBridge.send(USER, buildChannelMessage(message))) {
log.info("sendToChannel - message was successfully sent ");
} else {
log.error("sendToChannel - failed to send message");
}
} catch (Exception e) {
log.error("sendToChannel - error while sending message on output binding {}", USER, e);
}
}
private Message buildChannelMessage(String message) {
HelloEventAvro helloEventAvro = HelloEventAvro.newBuilder()
.setHelloMessage(message)
.build();
long timestamp = Instant.now().toEpochMilli();
return MessageBuilder.withPayload(helloEventAvro)
.setHeader("partitionId", 1)
.setHeader("X-Timestamp", timestamp)
.build();
}
}
- spring-boot 2.6.2
- kafka-clients 2.8.1
- spring-cloud-stream 3.2.1
- spring-cloud-stream-binder-kafka 3.2.1
- spring-integration-kafka 5.5.8
- правильна ли привязка производителя к myUserTopic или мне нужно добавить свойство Spring.cloud.stream.source=user?
- "user" имя привязки правильное или оно должно соблюдать соглашение как "user-out-0" учитывая, что у меня не настроен поставщик компонентов?
- Когда после запуска приложения создается первое сообщение, я вижу следующие журналы:
Код: Выделить всё
Using kafka topic for outbound: myUserTopic (which is correct)
Caching the binder: kafka
Retrieving cached binder: kafka
.....
Channel 'unknown.channel.name' has 1 subscriber(s). (which is strange)
Я не понимаю, почему имя канала — «unknown» вместо выходного имени привязки «user», указанного в конфигурации application.properties. Можете ли вы помочь мне понять, есть ли какие-либо неправильные настройки с моей стороны? Все примеры Spring-cloud-stream из документации и GitHub используют StreamBridge либо с динамическими пунктами назначения, либо с поставщикомConfiguration.
Изменить:
У меня есть еще один вопрос относительно тестирования вышеуказанной конфигурации. Я попытался написать тест для этого конкретного варианта использования StreamBridge, следуя примерам отсюда. ([https://github.com/spring-cloud/spring- ... 996a4/spri ng-cloud-stream/src/test/java/org/springframework/cloud/stream/function/StreamBridgeTests.java#L716][1]) и адаптируйте его для моей конфигурации application.properties, указанной выше, и по какой-то причине сообщение, полученное в outputDestination, имеет значение null для этого теста:
Код: Выделить всё
public class StreamBridgeTests {
@Test
public void testSendingMessageToDestination() {
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(TestChannelBinderConfiguration
.getCompleteConfiguration(Application.class))
.web(WebApplicationType.NONE).run()) {
HelloEventAvro helloEventAvro = buildHelloEventAvro();
Message helloEventAvroMessage = MessageBuilder
.withPayload(helloEventAvro)
.setHeader(CustomKafkaHeaders.PARTITION_ID.value(), helloEventAvro.getId())
.build();
StreamBridge bridge = context.getBean(StreamBridge.class);
bridge.send("user", helloEventAvroMessage);
OutputDestination outputDestination = context.getBean(OutputDestination.class);
Message message = outputDestination.receive(100, "user");
assertThat(new String(message.getPayload())).contains("hello");
}
}
}
- user.destination с размером 0 (где «user» = мое имя привязки)
- myUserTopic.destination с размер 1 и созданное сообщение helloEventAvroMessage
Код: Выделить всё
public class StreamBridgeTests {
@Test
public void testSendingMessageToDestination() {
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(TestChannelBinderConfiguration
.getCompleteConfiguration(Application.class))
.web(WebApplicationType.NONE).run()) {
HelloEventAvro helloEventAvro = buildHelloEventAvro();
Message helloEventAvroMessage = MessageBuilder
.withPayload(helloEventAvro)
.setHeader(CustomKafkaHeaders.PARTITION_ID.value(), helloEventAvro.getId())
.build();
StreamBridge bridge = context.getBean(StreamBridge.class);
bridge.send("user", helloEventAvroMessage);
OutputDestination outputDestination = context.getBean(OutputDestination.class);
Message message = outputDestination.receive(100, "myUserTopic");
assertThat(new String(message.getPayload())).contains("hello");
}
}
}
[1]: https://github.com/spring-cloud/spring- ... .java#L716)
Подробнее здесь: https://stackoverflow.com/questions/713 ... hich-are-d
Мобильная версия