Я использую облачный поток String Kafka для отправки и получения сообщений.
Послушайте мою конфигурацию:
spring:
cloud:
function:
definition: kafkaConsumer;kafkaProducer
stream:
kafka:
binder:
replicationFactor: 1
auto-create-topics: true
brokers: 10.0.100.10:9092
bindings:
binding-import-device:
destination: import-device-topic
content-type: text/plain
group: device
Слушайте мои KafkaConsumer и KafkaProducer (я использую Jhipster для создания своего проекта, этот код генерируется Jhipster)
@Component
public class KafkaProducer implements Supplier {
@Override
public String get() {
return "kakfa_producer";
}
}
@Component
public class KafkaConsumer implements Consumer {
private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumer.class);
private Map emitters = new HashMap();
public SseEmitter register(String key) {
LOG.debug("Registering sse client for {}", key);
SseEmitter emitter = new SseEmitter();
emitter.onCompletion(() -> emitters.remove(key));
emitters.put(key, emitter);
return emitter;
}
public void unregister(String key) {
LOG.debug("Unregistering sse emitter for: {}", key);
Optional.ofNullable(emitters.get(key)).ifPresent(SseEmitter::complete);
}
@Override
public void accept(String input) {
LOG.debug("Got message from kafka stream: {}", input);
emitters
.entrySet()
.stream()
.map(Map.Entry::getValue)
.forEach((SseEmitter emitter) -> {
try {
emitter.send(event().data(input, MediaType.TEXT_PLAIN));
} catch (IOException e) {
LOG.debug("error sending sse message, {}", input);
}
});
}
}
Я использую StreamBridge, KafkaListener для отправки и прослушивания сообщений
streamBridge.send("binding-import-device", device.getId());
@KafkaListener(topics = { "binding-import-device" }, groupId = "device")
public void consume(String id) {
log.info("consume device: {}", id);
}
Когда я запускаю свое приложение, оно печатает следующие журналы:
2024-11-29T00:07:51.560+07:00 WARN 21908 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-sound-device-3, groupId=device] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Node may not be available.
2024-11-29T00:07:51.560+07:00 WARN 21908 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-sound-device-3, groupId=device] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected
2024-11-29T00:07:52.537+07:00 INFO 21908 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-sound-device-3, groupId=device] Node -1 disconnected.
Я отправляю сообщения и использую их в одном и том же проекте весенней загрузки, он всегда печатает эти журналы, и я могу отправлять сообщения, но не могу использовать эти сообщения. Я занимаюсь этой проблемой неделями, но ничего не получается. Я не знаю, где находится «Bootstrapbroker localhost:9092», который напечатан в журналах. Я новичок в Кафке. Можете ли вы рассказать мне, как решить эту проблему? Огромное вам спасибо.
Я запускаю Kafka с помощью Docker на 10.0.100.10
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.2.1
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ZOOKEEPER_SYNC_LIMIT: 2
kafka:
image: confluentinc/cp-kafka:7.2.1
environment:
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:19092,PLAINTEXT_HOST://kafkatest:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_BROKER_ID: 1
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
ports:
- 9092:9092
kafka-ui:
image: provectuslabs/kafka-ui
container_name: kafka-ui
ports:
- "8777:8080"
restart: always
environment:
- KAFKA_CLUSTERS_0_NAME=kafka
- KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka:19092
- KAFKA_CLUSTERS_0_ZOOKEEPER=zookeeper:2181
Подробнее здесь: https://stackoverflow.com/questions/792 ... sconnected
Spring Cloud Stream Kafka: брокер начальной загрузки отключен ⇐ JAVA
Программисты JAVA общаются здесь
-
Anonymous
1732846055
Anonymous
Я использую облачный поток String Kafka для отправки и получения сообщений.
Послушайте мою конфигурацию:
spring:
cloud:
function:
definition: kafkaConsumer;kafkaProducer
stream:
kafka:
binder:
replicationFactor: 1
auto-create-topics: true
brokers: 10.0.100.10:9092
bindings:
binding-import-device:
destination: import-device-topic
content-type: text/plain
group: device
Слушайте мои KafkaConsumer и KafkaProducer (я использую Jhipster для создания своего проекта, этот код генерируется Jhipster)
@Component
public class KafkaProducer implements Supplier {
@Override
public String get() {
return "kakfa_producer";
}
}
@Component
public class KafkaConsumer implements Consumer {
private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumer.class);
private Map emitters = new HashMap();
public SseEmitter register(String key) {
LOG.debug("Registering sse client for {}", key);
SseEmitter emitter = new SseEmitter();
emitter.onCompletion(() -> emitters.remove(key));
emitters.put(key, emitter);
return emitter;
}
public void unregister(String key) {
LOG.debug("Unregistering sse emitter for: {}", key);
Optional.ofNullable(emitters.get(key)).ifPresent(SseEmitter::complete);
}
@Override
public void accept(String input) {
LOG.debug("Got message from kafka stream: {}", input);
emitters
.entrySet()
.stream()
.map(Map.Entry::getValue)
.forEach((SseEmitter emitter) -> {
try {
emitter.send(event().data(input, MediaType.TEXT_PLAIN));
} catch (IOException e) {
LOG.debug("error sending sse message, {}", input);
}
});
}
}
Я использую StreamBridge, KafkaListener для отправки и прослушивания сообщений
streamBridge.send("binding-import-device", device.getId());
@KafkaListener(topics = { "binding-import-device" }, groupId = "device")
public void consume(String id) {
log.info("consume device: {}", id);
}
Когда я запускаю свое приложение, оно печатает следующие журналы:
2024-11-29T00:07:51.560+07:00 WARN 21908 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-sound-device-3, groupId=device] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Node may not be available.
2024-11-29T00:07:51.560+07:00 WARN 21908 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-sound-device-3, groupId=device] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected
2024-11-29T00:07:52.537+07:00 INFO 21908 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-sound-device-3, groupId=device] Node -1 disconnected.
Я отправляю сообщения и использую их в одном и том же проекте весенней загрузки, он всегда печатает эти журналы, и я могу отправлять сообщения, но не могу использовать эти сообщения. Я занимаюсь этой проблемой неделями, но ничего не получается. Я не знаю, где находится «Bootstrapbroker localhost:9092», который напечатан в журналах. Я новичок в Кафке. Можете ли вы рассказать мне, как решить эту проблему? Огромное вам спасибо.
Я запускаю Kafka с помощью Docker на 10.0.100.10
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.2.1
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ZOOKEEPER_SYNC_LIMIT: 2
kafka:
image: confluentinc/cp-kafka:7.2.1
environment:
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:19092,PLAINTEXT_HOST://kafkatest:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_BROKER_ID: 1
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
ports:
- 9092:9092
kafka-ui:
image: provectuslabs/kafka-ui
container_name: kafka-ui
ports:
- "8777:8080"
restart: always
environment:
- KAFKA_CLUSTERS_0_NAME=kafka
- KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka:19092
- KAFKA_CLUSTERS_0_ZOOKEEPER=zookeeper:2181
Подробнее здесь: [url]https://stackoverflow.com/questions/79234875/spring-cloud-stream-kafka-bootstrap-broker-disconnected[/url]
Ответить
1 сообщение
• Страница 1 из 1
Перейти
- Кемерово-IT
- ↳ Javascript
- ↳ C#
- ↳ JAVA
- ↳ Elasticsearch aggregation
- ↳ Python
- ↳ Php
- ↳ Android
- ↳ Html
- ↳ Jquery
- ↳ C++
- ↳ IOS
- ↳ CSS
- ↳ Excel
- ↳ Linux
- ↳ Apache
- ↳ MySql
- Детский мир
- Для души
- ↳ Музыкальные инструменты даром
- ↳ Печатная продукция даром
- Внешняя красота и здоровье
- ↳ Одежда и обувь для взрослых даром
- ↳ Товары для здоровья
- ↳ Физкультура и спорт
- Техника - даром!
- ↳ Автомобилистам
- ↳ Компьютерная техника
- ↳ Плиты: газовые и электрические
- ↳ Холодильники
- ↳ Стиральные машины
- ↳ Телевизоры
- ↳ Телефоны, смартфоны, плашеты
- ↳ Швейные машинки
- ↳ Прочая электроника и техника
- ↳ Фототехника
- Ремонт и интерьер
- ↳ Стройматериалы, инструмент
- ↳ Мебель и предметы интерьера даром
- ↳ Cантехника
- Другие темы
- ↳ Разное даром
- ↳ Давай меняться!
- ↳ Отдам\возьму за копеечку
- ↳ Работа и подработка в Кемерове
- ↳ Давай с тобой поговорим...
Мобильная версия