При создании сообщений у меня нет проблем, используя kafbat, я вижу что сообщение в моей теме создано корректно (хоть и создается в кодировке JWT, но для меня это не проблема).
У меня проблема при потреблении, так как при запуске служба не может для подключения или поиска раздела темы по какой-либо причине.
Мой yml:
Код: Выделить всё
spring:
cloud:
function:
definition: chents
stream:
bindings:
chents-in-0:
destination: segnnel
group: console-consumer-20965
consumer:
max-attempts: 2
chents-out-0:
destination: segnnel
kafka:
bootstrap-servers: localhost:19092
binder:
brokers: localhost:19092
bindings:
chents-in-0:
consumer:
enableDlq: false
ack-mode: record
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
properties:
spring.json.trusted.packages: "*"
binders:
local-kafka:
type: kafka
Код: Выделить всё
private void sendToKafka(EventData event) {
Message message = MessageBuilder.withPayload(event)
.setHeader("eventType", "Channel")
.build();
streamBridge.send("segnnel", message);
}
Код: Выделить всё
@Configuration
public class ChannelSubscriber {
private Repository repository;
private static final Mapper mapper ...;
public ChannelSubscriber(Repository repository) {
this.repository = repository;
}
@Bean
Consumer chents() {
return e -> Mono.just(e).map(mapper::mapToEntity).flatMap(repository::save)
.doOnError(ex -> log.error("Error processing event: ", ex));
}
}
Код: Выделить всё
Caused by: java.lang.RuntimeException: Failed to obtain partition information for the topic segnnel
at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.lambda$getPartitionsForTopic$9(KafkaTopicProvisioner.java:658)
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:344)
o.s.cloud.stream.binding.BindingService - Failed to create consumer binding; retrying in 30 seconds
org.springframework.cloud.stream.binder.BinderException: Cannot initialize binder checking the topic (segnnel):
at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.getPartitionsForTopic(KafkaTopicProvisioner.java:685)
Я также пытался изменить темы на случай, если возникнут какие-либо проблемы с тем, чтобы они были одинаковыми при производстве и потреблении, но это не так.
Подробнее здесь: https://stackoverflow.com/questions/791 ... umer-topic