Я использую облачный поток String Kafka для отправки и получения сообщений.
Послушайте мою конфигурацию:
spring:
cloud:
function:
definition: kafkaConsumer;kafkaProducer
stream:
kafka:
binder:
replicationFactor: 1
auto-create-topics: true
brokers: 10.0.100.10:9092
bindings:
binding-import-device:
destination: import-device-topic
content-type: text/plain
group: device
Слушайте мои KafkaConsumer и KafkaProducer (я использую Jhipster для создания своего проекта, этот код генерируется Jhipster)
@Component
public class KafkaProducer implements Supplier {
@Override
public String get() {
return "kakfa_producer";
}
}
@Component
public class KafkaConsumer implements Consumer {
private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumer.class);
private Map emitters = new HashMap();
public SseEmitter register(String key) {
LOG.debug("Registering sse client for {}", key);
SseEmitter emitter = new SseEmitter();
emitter.onCompletion(() -> emitters.remove(key));
emitters.put(key, emitter);
return emitter;
}
public void unregister(String key) {
LOG.debug("Unregistering sse emitter for: {}", key);
Optional.ofNullable(emitters.get(key)).ifPresent(SseEmitter::complete);
}
@Override
public void accept(String input) {
LOG.debug("Got message from kafka stream: {}", input);
emitters
.entrySet()
.stream()
.map(Map.Entry::getValue)
.forEach((SseEmitter emitter) -> {
try {
emitter.send(event().data(input, MediaType.TEXT_PLAIN));
} catch (IOException e) {
LOG.debug("error sending sse message, {}", input);
}
});
}
}
Я использую StreamBridge, KafkaListener для отправки и прослушивания сообщений
streamBridge.send("binding-import-device", device.getId());
@KafkaListener(topics = { "binding-import-device" }, groupId = "device")
public void consume(String id) {
log.info("consume device: {}", id);
}
Когда я запускаю свое приложение, оно печатает следующие журналы:
2024-11-29T00:07:51.560+07:00 WARN 21908 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-sound-device-3, groupId=device] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Node may not be available.
2024-11-29T00:07:51.560+07:00 WARN 21908 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-sound-device-3, groupId=device] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected
2024-11-29T00:07:52.537+07:00 INFO 21908 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-sound-device-3, groupId=device] Node -1 disconnected.
Я отправляю сообщения и использую их в одном и том же проекте весенней загрузки, он всегда печатает эти журналы, и я могу отправлять сообщения, но не могу использовать эти сообщения. Я занимаюсь этой проблемой неделями, но ничего не получается. Я не знаю, где находится «Bootstrapbroker localhost:9092», который напечатан в журналах. Я новичок в Кафке. Можете ли вы рассказать мне, как решить эту проблему? Огромное вам спасибо.
Я запускаю Kafka с помощью Docker на 10.0.100.10
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.2.1
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ZOOKEEPER_SYNC_LIMIT: 2
kafka:
image: confluentinc/cp-kafka:7.2.1
environment:
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:19092,PLAINTEXT_HOST://kafkatest:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_BROKER_ID: 1
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
ports:
- 9092:9092
kafka-ui:
image: provectuslabs/kafka-ui
container_name: kafka-ui
ports:
- "8777:8080"
restart: always
environment:
- KAFKA_CLUSTERS_0_NAME=kafka
- KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka:19092
- KAFKA_CLUSTERS_0_ZOOKEEPER=zookeeper:2181
Подробнее здесь: https://stackoverflow.com/questions/792 ... sconnected
Spring Cloud Stream Kafka: брокер начальной загрузки отключен ⇐ JAVA
-
- Похожие темы
- Ответы
- Просмотры
- Последнее сообщение
-
-
Spring Kafka, как исправить org.apache.kafka.clients.NetworkClient Node -1 отключен
Anonymous » » в форуме JAVA - 0 Ответы
- 55 Просмотры
-
Последнее сообщение Anonymous
-
-
-
Spring Kafka, как исправить org.apache.kafka.clients.NetworkClient Node -1 отключен
Anonymous » » в форуме JAVA - 0 Ответы
- 60 Просмотры
-
Последнее сообщение Anonymous
-