Переплет Streambridge вместо включения и выходных аннотаций, которые устарели с 3,1 версии Spring Cloud StreamJAVA

Программисты JAVA общаются здесь
Ответить
Anonymous
 Переплет Streambridge вместо включения и выходных аннотаций, которые устарели с 3,1 версии Spring Cloud Stream

Сообщение Anonymous »

У меня есть несколько API, которые разговаривают друг с другом через Kafka (производите и потребляют сообщения). В одном из API -интерфейсов я создаю сообщения на основе триггера HTTP -запроса (когда называется конечная точка, Meesage производится и отправляется в Kafka) с @Output и @Enable -связывающими аннотациями. Эти меза используются другими API, которые подписываются на эту тему. (https://docs.spring.io/spring-cloud-str ... en_sourary). Тем не менее, я не понимал, как следует правильно настроить тему источника, привязки и назначения с точки зрения соглашения об именах, когда не определена нет функции источника. У меня есть следующая конфигурация, которая успешно создает сообщения на «MyFootopic», но я заметил некоторые странные журналы, когда начинается приложение, поскольку привязка, кажется, не выполнена должным образом: < /p>
Application.properties:
>

Код: Выделить всё

spring.kafka.bootstrap-servers=listOfServers for kafka
spring.kafka.producer.value-serializer= MyCustomKafkaPayloadAvroSerializer
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.properties.schema.registry.url=listOfServers for schema registry

spring.cloud.stream.bindings.user.destination=myUserTopic
spring.cloud.stream.bindings.user.producer.useNativeEncoding=true
spring.cloud.stream.bindings.user.producer.partitionCount=1

spring.cloud.stream.bindings.user.producer.partitionKeyExpression=headers['partitionId']

spring.cloud.stream.kafka.binder.autoCreateTopics=false
spring.cloud.stream.kafka.binder.configuration.security.protocol=SSL
spring.cloud.stream.kafka.binder.configuration.ssl.truststore.location=
spring.cloud.stream.kafka.binder.configuration.ssl.truststore.type=
spring.cloud.stream.kafka.binder.configuration.ssl.keystore.location=
spring.cloud.stream.kafka.binder.configuration.ssl.keystore.type=
< /code>
Кроме того, код для Streambridge: < /p>
@Component
@RequiredArgsConstructor
@Slf4j
public class EventPublisher {

private final StreamBridge streamBridge;
private static final String USER = "user";

public void sendToChannel(String message) {
log.info("sendToChannel - sending to channel hello event");

try {
if (streamBridge.send(USER, buildChannelMessage(message))) {
log.info("sendToChannel - message was successfully sent ");
} else {
log.error("sendToChannel - failed to send message");
}
} catch (Exception e) {
log.error("sendToChannel - error while sending message on output binding {}", USER, e);
}
}

private Message buildChannelMessage(String message) {
HelloEventAvro helloEventAvro = HelloEventAvro.newBuilder()
.setHelloMessage(message)
.build();
long timestamp = Instant.now().toEpochMilli();

return MessageBuilder.withPayload(helloEventAvro)
.setHeader("partitionId", 1)
.setHeader("X-Timestamp", timestamp)
.build();
}
}
< /code>
и среди используемых зависимостей: < /p>

 Spring-Boot 2.6.2 < /li>
 Кафка-клиенты 2.8.1 < /li>
-repr-stream 3.2.1 < /li>
 spring-cloud-bind-bind-bind-bind-bind-bind-bind-bind-kafa 3.2.1 < /li>
             />  Spring-Integration-Kafka 5.5.8 < /li>
< /ul>
Мои вопросы: < /p>

 Является ли привязкой продюсера с myusertopic правильным или мне нужно добавить собственность Spring.cloud.sry.source = пользователь? BindingName верен, или оно должно уважать соглашение как «пользователь-аут-0», учитывая, что у меня нет поставщика компонента?    Using kafka topic for outbound: myUserTopic (which is correct)
Caching the binder: kafka
Retrieving cached binder: kafka
.....
Channel 'unknown.channel.name' has 1 subscriber(s). (which is strange)
Для полученных следующих сообщений log ' inknown.channel.name ' больше не появляется. Можете ли вы направить меня, чтобы понять, есть ли на моей стороне ошибочную конфигурацию? Все примеры пружинного потока из документации и GitHub используют Streambridge с динамическими направлениями или поставщиком. Я попытался написать тест для этого конкретного случая использования Streambridge, следуя примерам отсюда ([https://github.com/spring-cloud/spring- ... 996a4/spri ng-cloud-stream/src/test/java/org/springframework/cloud/stream/function/streebridgetests.java#l716] [1]) и адаптировать его для моего применения выше.

Код: Выделить всё

public class StreamBridgeTests {

@Test
public void testSendingMessageToDestination() {
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(TestChannelBinderConfiguration
.getCompleteConfiguration(Application.class))
.web(WebApplicationType.NONE).run()) {

HelloEventAvro helloEventAvro = buildHelloEventAvro();

Message helloEventAvroMessage = MessageBuilder
.withPayload(helloEventAvro)
.setHeader(CustomKafkaHeaders.PARTITION_ID.value(), helloEventAvro.getId())
.build();

StreamBridge bridge = context.getBean(StreamBridge.class);
bridge.send("user", helloEventAvroMessage);

OutputDestination outputDestination = context.getBean(OutputDestination.class);
Message message = outputDestination.receive(100, "user");
assertThat(new String(message.getPayload())).contains("hello");
}
}
}
Я глубже с отладкой, и кажется, что выходное обеспечение создается с помощью одного канала (myusertopic.destination) и 2 сообщения Message, вместо 1, упомянутого ниже:

пользователь. /> myusertopic.destination < /strong> с размером 1 и helloeventavromessage, созданным < /li>
< /ul>
Если я изменю привязку с «пользователя» на имя кафки «Myusertopic». В методе получения () 1 -й канал, и это является ожидаемое, что является ожидаемым, что является necnelopic, и это является Myusertopic ». MessageQueue (myusertopic.destination): < /p>

Код: Выделить всё

public class StreamBridgeTests {

@Test
public void testSendingMessageToDestination() {
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(TestChannelBinderConfiguration
.getCompleteConfiguration(Application.class))
.web(WebApplicationType.NONE).run()) {

HelloEventAvro helloEventAvro = buildHelloEventAvro();

Message helloEventAvroMessage = MessageBuilder
.withPayload(helloEventAvro)
.setHeader(CustomKafkaHeaders.PARTITION_ID.value(), helloEventAvro.getId())
.build();

StreamBridge bridge = context.getBean(StreamBridge.class);
bridge.send("user", helloEventAvroMessage);

OutputDestination outputDestination = context.getBean(OutputDestination.class);
Message message = outputDestination.receive(100, "myUserTopic");
assertThat(new String(message.getPayload())).contains("hello");
}
}
}
Учитывая приведенное выше, я до сих пор не понимаю, как работает метод BindingName из метода Send () StreamBridge в корреляции с методом chect () от outputDestination ( не должно быть одинаковым в обоих местах, если у меня есть только одна тема? https://github.com/spring-cloud/spring- ... 996a4/spri ng-cloud-stream/src/test/java/org/springframework/cloud/stream/function/streambridgetests.java#L716)

Подробнее здесь: https://stackoverflow.com/questions/713 ... hich-are-d
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «JAVA»