Потребитель Kafka не получает сообщения в докереJAVA

Программисты JAVA общаются здесь
Ответить
Anonymous
 Потребитель Kafka не получает сообщения в докере

Сообщение Anonymous »

Я создаю очень простую настройку потребителя/производителя для Kafka. Я использую Zookeper и Kafka с композицией Docker. Кажется, что и то, и другое взаимосвязано, но когда дело доходит до производства или потребления, ничего не происходит. Я вижу созданную тему и видел это в журнале контейнера Kafka при его запуске. Не уверен, что это связано.

Код: Выделить всё

[2024-05-20 14:32:40,797] WARN [RequestSendThread controllerId=1] Controller 1's connection to broker kafka:9092 (id: 1 rack: null) was unsuccessful (kafka.controller.RequestSendThread)
2024-05-20 16:32:40 java.io.IOException: Connection to kafka:9092 (id: 1 rack: null) failed.
2024-05-20 16:32:40     at org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:71)
2024-05-20 16:32:40     at kafka.controller.RequestSendThread.brokerReady(ControllerChannelManager.scala:298)
2024-05-20 16:32:40     at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:251)
2024-05-20 16:32:40     at org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131)
Я почти уверен, что мне что-то не хватает в файле компоновки Docker, но я не могу понять, что именно, попробовал несколько вариантов в Интернете, но не смог решить эту проблему.
Это файлы:
docker-compose.yml

Код: Выделить всё

version: '3'

services:
zookeeper:
image: 'bitnami/zookeeper:latest'
hostname: zookeeper
restart: unless-stopped
ports:
- '2181:2181'
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
kafka:
image: 'bitnami/kafka:latest'
hostname: kafka
restart: unless-stopped
ports:
- '9092:9092'
environment:
- KAFKA_BROKER_ID=1
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
- KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=1
- KAFKA_TRANSACTION_STATE_LOG_MIN_ISR=1
- KAFKA_ADVERTISED_HOST_NAME=localhost
Consumer.java

Код: Выделить всё

public class Consumer {

public static void main(String[] args) {
String bootstrapServers = "localhost:9092";
String groupId = "my-fourth-application";

// create consumer configs
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);

try (final KafkaConsumer consumer = new KafkaConsumer(properties)) {
consumer.subscribe(List.of("consumer_test"));
while (true) {
ConsumerRecords records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
} catch (Exception exception) {
System.out.println("Error: " + exception.getMessage());
}

}
Producer.java

Код: Выделить всё

public class Producer {

public static void main(String[] args) throws InterruptedException {
Properties config = new Properties();
config.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
config.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer producer = new KafkaProducer(config);
int i = 0;
while (true) {
final ProducerRecord record = new ProducerRecord("consumer_test", "consumer_key", (i++)+"");
producer.send(record);
Thread.sleep(5000);
}
}


Подробнее здесь: https://stackoverflow.com/questions/785 ... -in-docker
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «JAVA»