Привязка StreamBridge вместо аннотаций EnableBinding и Output, которые устарели с версии Spring Cloud Stream 3.1.JAVA

Программисты JAVA общаются здесь
Ответить
Anonymous
 Привязка StreamBridge вместо аннотаций EnableBinding и Output, которые устарели с версии Spring Cloud Stream 3.1.

Сообщение Anonymous »

У меня есть несколько API, которые взаимодействуют друг с другом через Kafka (генерируют и потребляют сообщения). В одном из API я создаю сообщения на основе триггера HTTP-запроса (при вызове конечной точки создается сообщение и отправляется в kafka) с аннотациями @Output и @EnableBinding. Эти сообщения используются другими API, которые подписаны на эту тему.
Теперь я пытаюсь перейти на новую модель функционального программирования потока Spring Cloud и из документации я пришел к выводу, что StreamBridge с внешними исходными данными — это подход, необходимый для моего случая. (https://docs.spring.io/spring-cloud-str ... en_sources). Однако я не понимал, как правильно настроить источник, имя привязки и тему назначения с точки зрения соглашения об именах, когда функция источника не определена. У меня есть следующая конфигурация, которая успешно выдает сообщения на «myFooTopic», но я заметил некоторые странные журналы при запуске приложения, поскольку привязка, похоже, выполнена неправильно:
application.properties:

Код: Выделить всё

spring.kafka.bootstrap-servers=listOfServers for kafka
spring.kafka.producer.value-serializer= MyCustomKafkaPayloadAvroSerializer
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.properties.schema.registry.url=listOfServers for schema registry

spring.cloud.stream.bindings.user.destination=myUserTopic
spring.cloud.stream.bindings.user.producer.useNativeEncoding=true
spring.cloud.stream.bindings.user.producer.partitionCount=1

spring.cloud.stream.bindings.user.producer.partitionKeyExpression=headers['partitionId']

spring.cloud.stream.kafka.binder.autoCreateTopics=false
spring.cloud.stream.kafka.binder.configuration.security.protocol=SSL
spring.cloud.stream.kafka.binder.configuration.ssl.truststore.location=
spring.cloud.stream.kafka.binder.configuration.ssl.truststore.type=
spring.cloud.stream.kafka.binder.configuration.ssl.keystore.location=
spring.cloud.stream.kafka.binder.configuration.ssl.keystore.type=
Кроме того, код StreamBridge:

Код: Выделить всё

@Component
@RequiredArgsConstructor
@Slf4j
public class EventPublisher {

private final StreamBridge streamBridge;
private static final String USER = "user";

public void sendToChannel(String message) {
log.info("sendToChannel - sending to channel hello event");

try {
if (streamBridge.send(USER, buildChannelMessage(message))) {
log.info("sendToChannel - message was successfully sent ");
} else {
log.error("sendToChannel - failed to send message");
}
} catch (Exception e) {
log.error("sendToChannel - error while sending message on output binding {}", USER, e);
}
}

private Message buildChannelMessage(String message) {
HelloEventAvro helloEventAvro = HelloEventAvro.newBuilder()
.setHelloMessage(message)
.build();
long timestamp = Instant.now().toEpochMilli();

return MessageBuilder.withPayload(helloEventAvro)
.setHeader("partitionId", 1)
.setHeader("X-Timestamp", timestamp)
.build();
}
}
и среди использованных зависимостей:
  • spring-boot 2.6.2
  • kafka-clients 2.8.1
  • spring-cloud-stream 3.2.1
  • spring-cloud-stream-binder-kafka 3.2.1
  • spring-integration-kafka 5.5.8
Мои вопросы:
  • правильна ли привязка производителя к myUserTopic или мне нужно добавить свойство Spring.cloud.stream.source=user?
  • "user" имя привязки правильное или оно должно соблюдать соглашение как "user-out-0" учитывая, что у меня не настроен поставщик компонентов?
  • Когда после запуска приложения создается первое сообщение, я вижу следующие журналы:

Код: Выделить всё

    Using kafka topic for outbound: myUserTopic (which is correct)
Caching the binder: kafka
Retrieving cached binder: kafka
.....
Channel 'unknown.channel.name' has 1 subscriber(s). (which is strange)
Для следующих сообщений журнал «unknown.channel.name» больше не отображается.
Я не понимаю, почему имя канала — «unknown» вместо выходного имени привязки «user», указанного в конфигурации application.properties. Можете ли вы помочь мне понять, есть ли какие-либо неправильные настройки с моей стороны? Все примеры Spring-cloud-stream из документации и GitHub используют StreamBridge либо с динамическими пунктами назначения, либо с поставщикомConfiguration.

Изменить:
У меня есть еще один вопрос относительно тестирования вышеуказанной конфигурации. Я попытался написать тест для этого конкретного варианта использования StreamBridge, следуя примерам отсюда. ([https://github.com/spring-cloud/spring- ... 996a4/spri ng-cloud-stream/src/test/java/org/springframework/cloud/stream/function/StreamBridgeTests.java#L716][1]) и адаптируйте его для моей конфигурации application.properties, указанной выше, и по какой-то причине сообщение, полученное в outputDestination, имеет значение null для этого теста:

Код: Выделить всё

public class StreamBridgeTests {

@Test
public void testSendingMessageToDestination() {
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(TestChannelBinderConfiguration
.getCompleteConfiguration(Application.class))
.web(WebApplicationType.NONE).run()) {

HelloEventAvro helloEventAvro = buildHelloEventAvro();

Message helloEventAvroMessage = MessageBuilder
.withPayload(helloEventAvro)
.setHeader(CustomKafkaHeaders.PARTITION_ID.value(), helloEventAvro.getId())
.build();

StreamBridge bridge = context.getBean(StreamBridge.class);
bridge.send("user", helloEventAvroMessage);

OutputDestination outputDestination = context.getBean(OutputDestination.class);
Message message = outputDestination.receive(100, "user");
assertThat(new String(message.getPayload())).contains("hello");
}
}
}
Я углубился в отладку, и кажется, что выходное назначение создается с одним каналом (myUserTopic.destination) и двумя очередями сообщений вместо одной, упомянутой ниже:
  • user.destination с размером 0 (где «user» = мое имя привязки)
  • myUserTopic.destination с размер 1 и созданное сообщение helloEventAvroMessage
Если я изменю имя привязки с «user» на имя темы kafka «myUserTopic» в методе получения() OutputDestination, это будет работать так, как ожидалось: создается один канал (myUserTopic.destination) и 1 очередь сообщений (myUserTopic.destination):

Код: Выделить всё

public class StreamBridgeTests {

@Test
public void testSendingMessageToDestination() {
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(TestChannelBinderConfiguration
.getCompleteConfiguration(Application.class))
.web(WebApplicationType.NONE).run()) {

HelloEventAvro helloEventAvro = buildHelloEventAvro();

Message helloEventAvroMessage = MessageBuilder
.withPayload(helloEventAvro)
.setHeader(CustomKafkaHeaders.PARTITION_ID.value(), helloEventAvro.getId())
.build();

StreamBridge bridge = context.getBean(StreamBridge.class);
bridge.send("user", helloEventAvroMessage);

OutputDestination outputDestination = context.getBean(OutputDestination.class);
Message message = outputDestination.receive(100, "myUserTopic");
assertThat(new String(message.getPayload())).contains("hello");
}
}
}
Учитывая вышеизложенное, я до сих пор не понимаю, как связываниеName из метода send() StreamBridge работает в корреляции с методом получения() из OutputDestination (не должно быть одинаковым именем в обоих местах, если у меня только одна тема? ) и как оно разрешается в набор имен назначения темы Kafka.
[1]: https://github.com/spring-cloud/spring- ... .java#L716)

Подробнее здесь: https://stackoverflow.com/questions/713 ... hich-are-d
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «JAVA»