Код: Выделить всё
[2024-05-20 14:32:40,797] WARN [RequestSendThread controllerId=1] Controller 1's connection to broker kafka:9092 (id: 1 rack: null) was unsuccessful (kafka.controller.RequestSendThread)
2024-05-20 16:32:40 java.io.IOException: Connection to kafka:9092 (id: 1 rack: null) failed.
2024-05-20 16:32:40 at org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:71)
2024-05-20 16:32:40 at kafka.controller.RequestSendThread.brokerReady(ControllerChannelManager.scala:298)
2024-05-20 16:32:40 at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:251)
2024-05-20 16:32:40 at org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:131)
Это файлы:
docker-compose.yml
Код: Выделить всё
version: '3'
services:
zookeeper:
image: 'bitnami/zookeeper:latest'
hostname: zookeeper
restart: unless-stopped
ports:
- '2181:2181'
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
kafka:
image: 'bitnami/kafka:latest'
hostname: kafka
restart: unless-stopped
ports:
- '9092:9092'
environment:
- KAFKA_BROKER_ID=1
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
- KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=1
- KAFKA_TRANSACTION_STATE_LOG_MIN_ISR=1
- KAFKA_ADVERTISED_HOST_NAME=localhost
Код: Выделить всё
public class Consumer {
public static void main(String[] args) {
String bootstrapServers = "localhost:9092";
String groupId = "my-fourth-application";
// create consumer configs
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
try (final KafkaConsumer consumer = new KafkaConsumer(properties)) {
consumer.subscribe(List.of("consumer_test"));
while (true) {
ConsumerRecords records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
} catch (Exception exception) {
System.out.println("Error: " + exception.getMessage());
}
}
Код: Выделить всё
public class Producer {
public static void main(String[] args) throws InterruptedException {
Properties config = new Properties();
config.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
config.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer producer = new KafkaProducer(config);
int i = 0;
while (true) {
final ProducerRecord record = new ProducerRecord("consumer_test", "consumer_key", (i++)+"");
producer.send(record);
Thread.sleep(5000);
}
}
Подробнее здесь: https://stackoverflow.com/questions/785 ... -in-docker
Мобильная версия