Тематический раздел с потоком весенних облаков. Не могу потребительскую темуJAVA

Программисты JAVA общаются здесь
Ответить Пред. темаСлед. тема
Anonymous
 Тематический раздел с потоком весенних облаков. Не могу потребительскую тему

Сообщение Anonymous »

Я пытаюсь создавать и использовать сообщения в Kafka и обратно (используя потоки весенних облаков).
При создании сообщений у меня нет проблем, используя kafbat, я вижу что сообщение в моей теме создано корректно (хоть и создается в кодировке JWT, но для меня это не проблема).
У меня проблема при потреблении, так как при запуске служба не может для подключения или поиска раздела темы по какой-либо причине.
Мой yml:

Код: Выделить всё

spring:
cloud:
function:
definition: chents
stream:
bindings:
chents-in-0:
destination: segnnel
group: console-consumer-20965
consumer:
max-attempts: 2
chents-out-0:
destination: segnnel
kafka:
bootstrap-servers: localhost:19092
binder:
brokers: localhost:19092
bindings:
chents-in-0:
consumer:
enableDlq: false
ack-mode: record
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
properties:
spring.json.trusted.packages: "*"
binders:
local-kafka:
type: kafka
Мой код для создания сообщений с помощью потокового моста (это необходимо):

Код: Выделить всё

private void sendToKafka(EventData event) {
Message message = MessageBuilder.withPayload(event)
.setHeader("eventType", "Channel")
.build();
streamBridge.send("segnnel", message);
}
и это мой класс подписчиков

Код: Выделить всё

@Configuration
public class ChannelSubscriber {

private Repository repository;

private static final Mapper mapper ...;

public ChannelSubscriber(Repository repository) {
this.repository = repository;
}

@Bean
Consumer chents() {
return e -> Mono.just(e).map(mapper::mapToEntity).flatMap(repository::save)
.doOnError(ex -> log.error("Error processing event: ", ex));
}

}
Однако служба запускается правильно, она может отправлять сообщения, но время от времени выдает ошибку при попытке их использования

Код: Выделить всё

Caused by: java.lang.RuntimeException: Failed to obtain partition information for the topic segnnel
at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.lambda$getPartitionsForTopic$9(KafkaTopicProvisioner.java:658)
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:344)

o.s.cloud.stream.binding.BindingService - Failed to create consumer binding; retrying in 30 seconds
org.springframework.cloud.stream.binder.BinderException: Cannot initialize binder checking the topic (segnnel):
at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.getPartitionsForTopic(KafkaTopicProvisioner.java:685)
Я использовал прослушиватель kafka, и мне удалось получать сообщения из этой темы, но мне пришлось использовать потоки Spring Cloud, чтобы сделать его гибким.
Я также пытался изменить темы на случай, если возникнут какие-либо проблемы с тем, чтобы они были одинаковыми при производстве и потреблении, но это не так.

Подробнее здесь: https://stackoverflow.com/questions/791 ... umer-topic
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «JAVA»