Application.properties:
>
Код: Выделить всё
spring.kafka.bootstrap-servers=listOfServers for kafka
spring.kafka.producer.value-serializer= MyCustomKafkaPayloadAvroSerializer
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.properties.schema.registry.url=listOfServers for schema registry
spring.cloud.stream.bindings.user.destination=myUserTopic
spring.cloud.stream.bindings.user.producer.useNativeEncoding=true
spring.cloud.stream.bindings.user.producer.partitionCount=1
spring.cloud.stream.bindings.user.producer.partitionKeyExpression=headers['partitionId']
spring.cloud.stream.kafka.binder.autoCreateTopics=false
spring.cloud.stream.kafka.binder.configuration.security.protocol=SSL
spring.cloud.stream.kafka.binder.configuration.ssl.truststore.location=
spring.cloud.stream.kafka.binder.configuration.ssl.truststore.type=
spring.cloud.stream.kafka.binder.configuration.ssl.keystore.location=
spring.cloud.stream.kafka.binder.configuration.ssl.keystore.type=
< /code>
Кроме того, код для Streambridge: < /p>
@Component
@RequiredArgsConstructor
@Slf4j
public class EventPublisher {
private final StreamBridge streamBridge;
private static final String USER = "user";
public void sendToChannel(String message) {
log.info("sendToChannel - sending to channel hello event");
try {
if (streamBridge.send(USER, buildChannelMessage(message))) {
log.info("sendToChannel - message was successfully sent ");
} else {
log.error("sendToChannel - failed to send message");
}
} catch (Exception e) {
log.error("sendToChannel - error while sending message on output binding {}", USER, e);
}
}
private Message buildChannelMessage(String message) {
HelloEventAvro helloEventAvro = HelloEventAvro.newBuilder()
.setHelloMessage(message)
.build();
long timestamp = Instant.now().toEpochMilli();
return MessageBuilder.withPayload(helloEventAvro)
.setHeader("partitionId", 1)
.setHeader("X-Timestamp", timestamp)
.build();
}
}
< /code>
и среди используемых зависимостей: < /p>
Spring-Boot 2.6.2 < /li>
Кафка-клиенты 2.8.1 < /li>
-repr-stream 3.2.1 < /li>
spring-cloud-bind-bind-bind-bind-bind-bind-bind-bind-kafa 3.2.1 < /li>
/> Spring-Integration-Kafka 5.5.8 < /li>
< /ul>
Мои вопросы: < /p>
Является ли привязкой продюсера с myusertopic правильным или мне нужно добавить собственность Spring.cloud.sry.source = пользователь? BindingName верен, или оно должно уважать соглашение как «пользователь-аут-0», учитывая, что у меня нет поставщика компонента? Using kafka topic for outbound: myUserTopic (which is correct)
Caching the binder: kafka
Retrieving cached binder: kafka
.....
Channel 'unknown.channel.name' has 1 subscriber(s). (which is strange)
Код: Выделить всё
public class StreamBridgeTests {
@Test
public void testSendingMessageToDestination() {
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(TestChannelBinderConfiguration
.getCompleteConfiguration(Application.class))
.web(WebApplicationType.NONE).run()) {
HelloEventAvro helloEventAvro = buildHelloEventAvro();
Message helloEventAvroMessage = MessageBuilder
.withPayload(helloEventAvro)
.setHeader(CustomKafkaHeaders.PARTITION_ID.value(), helloEventAvro.getId())
.build();
StreamBridge bridge = context.getBean(StreamBridge.class);
bridge.send("user", helloEventAvroMessage);
OutputDestination outputDestination = context.getBean(OutputDestination.class);
Message message = outputDestination.receive(100, "user");
assertThat(new String(message.getPayload())).contains("hello");
}
}
}
пользователь. /> myusertopic.destination < /strong> с размером 1 и helloeventavromessage, созданным < /li>
< /ul>
Если я изменю привязку с «пользователя» на имя кафки «Myusertopic». В методе получения () 1 -й канал, и это является ожидаемое, что является ожидаемым, что является necnelopic, и это является Myusertopic ». MessageQueue (myusertopic.destination): < /p>
Код: Выделить всё
public class StreamBridgeTests {
@Test
public void testSendingMessageToDestination() {
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(TestChannelBinderConfiguration
.getCompleteConfiguration(Application.class))
.web(WebApplicationType.NONE).run()) {
HelloEventAvro helloEventAvro = buildHelloEventAvro();
Message helloEventAvroMessage = MessageBuilder
.withPayload(helloEventAvro)
.setHeader(CustomKafkaHeaders.PARTITION_ID.value(), helloEventAvro.getId())
.build();
StreamBridge bridge = context.getBean(StreamBridge.class);
bridge.send("user", helloEventAvroMessage);
OutputDestination outputDestination = context.getBean(OutputDestination.class);
Message message = outputDestination.receive(100, "myUserTopic");
assertThat(new String(message.getPayload())).contains("hello");
}
}
}
Подробнее здесь: https://stackoverflow.com/questions/713 ... hich-are-d
Мобильная версия