Контейнеризованный потребитель Kafka не может подключиться к контейнеру KafkaJAVA

Программисты JAVA общаются здесь
Ответить Пред. темаСлед. тема
Anonymous
 Контейнеризованный потребитель Kafka не может подключиться к контейнеру Kafka

Сообщение Anonymous »

Я пытаюсь реализовать микросервисную архитектуру, используя Docker и Spring Boot. У меня работает контейнер Kafka, а также потребительский контейнер. Есть и производитель, столкнувшийся с той же проблемой, но я уверен, что если я исправлю потребителя, я смогу сделать то же самое и для производителя. Потребитель — это независимый модуль Spring со своей собственной конфигурацией и файлом докеров. Контейнеры потребителя, Kafka и производителя создаются и запускаются с помощью Docker Compose. В моем файле docker-compose я пытаюсь настроить загрузочный сервер для подключения к kafka:9092, а не к localhost:29092, который используется по умолчанию в файле application.yml потребителя.
У меня возникла проблема: что потребительский контейнер не может подключиться к контейнеру Kafka. В потребительском контейнере я получаю следующие журналы:

Код: Выделить всё

2024-11-08 12:47:21 2024-11-08T20:47:21.708Z  INFO 1 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-order-consumer-group-1, groupId=order-consumer-group] Node -1 disconnected.
2024-11-08 12:47:21 2024-11-08T20:47:21.708Z  WARN 1 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-order-consumer-group-1, groupId=order-consumer-group] Connection to node -1 (localhost/127.0.0.1:29092) could not be established.  Node may not be available.
На моем локальном компьютере все работает.
Я считаю, что проблема связана с тем, что потребитель пытается использовать "(localhost/127.0.0.1:29092)" для подключения к серверу Kafka.
Я пробовал вручную изменить файл application.yml Spring: kafka: Consumer: bootstrap-servers: localhost:29092 на kafka:9092, но получаю начальную загрузку -серверы при запуске потребителя:

Код: Выделить всё

Caused by: org.apache.kafka.common.config.ConfigException: No resolvable bootstrap urls given in bootstrap.servers
Файл OrderConsumer.Java:

Код: Выделить всё

package com.Memorium.order_consumer;

import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import com.Memorium.order_consumer.payload.Order;

@Slf4j
@Component
public class OrderConsumer {
@KafkaListener(topics = "order", groupId = "order-consumer-group")
public void orderConsumer(Order order) {
log.info("Consumer consume Kafka message ->  {}", order);
}
}
application.yml для OrderConsumer:

Код: Выделить всё

server:
port: 9002

spring:
kafka:
consumer:
auto-offset-reset: earliest
bootstrap-servers: localhost:29092 # we need this to be kafka:2902, but it will not work
group-id: order-consumer-group
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
properties:
spring.deserializer.value.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer
spring.json.trusted.packages: com.Memorium.order_consumer.payload
spring.json.value.default.type: com.Memorium.order_consumer.payload.Order

logging:
level:
root: INFO
org.springframework.kafka: DEBUG
org.apache.kafka: DEBUG
org.apache.kafka.clients.consumer.ConsumerConfig: DEBUG
org.apache.kafka.clients.NetworkClient: TRACE

Конфигурация OrderConsumer заносится в журналы:

Код: Выделить всё

2024-11-08 12:44:33 2024-11-08T20:44:33.029Z  INFO 1 --- [           main] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values:
2024-11-08 12:44:33     allow.auto.create.topics = true
2024-11-08 12:44:33     auto.commit.interval.ms = 5000
2024-11-08 12:44:33     auto.include.jmx.reporter = true
2024-11-08 12:44:33     auto.offset.reset = earliest
2024-11-08 12:44:33     bootstrap.servers = [localhost:29092]
2024-11-08 12:44:33     check.crcs = true
2024-11-08 12:44:33     client.dns.lookup = use_all_dns_ips
2024-11-08 12:44:33     client.id = consumer-order-consumer-group-1
2024-11-08 12:44:33     client.rack =
2024-11-08 12:44:33     connections.max.idle.ms = 540000
2024-11-08 12:44:33     default.api.timeout.ms = 60000
2024-11-08 12:44:33     enable.auto.commit = false
2024-11-08 12:44:33     enable.metrics.push = true
2024-11-08 12:44:33     exclude.internal.topics = true
2024-11-08 12:44:33     fetch.max.bytes = 52428800
2024-11-08 12:44:33     fetch.max.wait.ms = 500
2024-11-08 12:44:33     fetch.min.bytes = 1
2024-11-08 12:44:33     group.id = order-consumer-group
2024-11-08 12:44:33     group.instance.id = null
2024-11-08 12:44:33     group.protocol = classic
2024-11-08 12:44:33     group.remote.assignor = null
2024-11-08 12:44:33     heartbeat.interval.ms = 3000
2024-11-08 12:44:33     interceptor.classes = []
2024-11-08 12:44:33     internal.leave.group.on.close = true
2024-11-08 12:44:33     internal.throw.on.fetch.stable.offset.unsupported = false
2024-11-08 12:44:33     isolation.level = read_uncommitted
2024-11-08 12:44:33     key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
2024-11-08 12:44:33     max.partition.fetch.bytes = 1048576
2024-11-08 12:44:33     max.poll.interval.ms = 300000
2024-11-08 12:44:33     max.poll.records = 500
2024-11-08 12:44:33     metadata.max.age.ms = 300000
2024-11-08 12:44:33     metric.reporters = []
2024-11-08 12:44:33     metrics.num.samples = 2
2024-11-08 12:44:33     metrics.recording.level = INFO
2024-11-08 12:44:33     metrics.sample.window.ms = 30000
2024-11-08 12:44:33     partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor,  class org.apache.kafka.clients.consumer.CooperativeStickyAssignor]
2024-11-08 12:44:33     receive.buffer.bytes = 65536
2024-11-08 12:44:33     reconnect.backoff.max.ms = 1000
2024-11-08 12:44:33     reconnect.backoff.ms = 50
2024-11-08 12:44:33     request.timeout.ms = 30000
2024-11-08 12:44:33     retry.backoff.max.ms = 1000
2024-11-08 12:44:33     retry.backoff.ms = 100
2024-11-08 12:44:33     sasl.client.callback.handler.class = null
2024-11-08 12:44:33     sasl.jaas.config = null
2024-11-08 12:44:33     sasl.kerberos.kinit.cmd = /usr/bin/kinit
2024-11-08 12:44:33     sasl.kerberos.min.time.before.relogin = 60000
2024-11-08 12:44:33     sasl.kerberos.service.name = null
2024-11-08 12:44:33     sasl.kerberos.ticket.renew.jitter = 0.05
2024-11-08 12:44:33     sasl.kerberos.ticket.renew.window.factor = 0.8
2024-11-08 12:44:33     sasl.login.callback.handler.class = null
2024-11-08 12:44:33     sasl.login.class = null
2024-11-08 12:44:33     sasl.login.connect.timeout.ms = null
2024-11-08 12:44:33     sasl.login.read.timeout.ms = null
2024-11-08 12:44:33     sasl.login.refresh.buffer.seconds = 300
2024-11-08 12:44:33     sasl.login.refresh.min.period.seconds = 60
2024-11-08 12:44:33     sasl.login.refresh.window.factor = 0.8
2024-11-08 12:44:33     sasl.login.refresh.window.jitter = 0.05
2024-11-08 12:44:33     sasl.login.retry.backoff.max.ms = 10000
2024-11-08 12:44:33     sasl.login.retry.backoff.ms = 100
2024-11-08 12:44:33     sasl.mechanism = GSSAPI
2024-11-08 12:44:33     sasl.oauthbearer.clock.skew.seconds = 30
2024-11-08 12:44:33     sasl.oauthbearer.expected.audience = null
2024-11-08 12:44:33     sasl.oauthbearer.expected.issuer = null
2024-11-08 12:44:33     sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000
2024-11-08 12:44:33     sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000
2024-11-08 12:44:33     sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
2024-11-08 12:44:33     sasl.oauthbearer.jwks.endpoint.url = null
2024-11-08 12:44:33     sasl.oauthbearer.scope.claim.name = scope
2024-11-08 12:44:33     sasl.oauthbearer.sub.claim.name = sub
2024-11-08 12:44:33     sasl.oauthbearer.token.endpoint.url = null
2024-11-08 12:44:33     security.protocol = PLAINTEXT
2024-11-08 12:44:33     security.providers = null
2024-11-08 12:44:33     send.buffer.bytes = 131072
2024-11-08 12:44:33     session.timeout.ms = 45000
2024-11-08 12:44:33     socket.connection.setup.timeout.max.ms = 30000
2024-11-08 12:44:33     socket.connection.setup.timeout.ms = 10000
2024-11-08 12:44:33     ssl.cipher.suites = null
2024-11-08 12:44:33     ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
2024-11-08 12:44:33     ssl.endpoint.identification.algorithm = https
2024-11-08 12:44:33     ssl.engine.factory.class = null
2024-11-08 12:44:33     ssl.key.password = null
2024-11-08 12:44:33     ssl.keymanager.algorithm = SunX509
2024-11-08 12:44:33     ssl.keystore.certificate.chain = null
2024-11-08 12:44:33     ssl.keystore.key = null
2024-11-08 12:44:33     ssl.keystore.location = null
2024-11-08 12:44:33     ssl.keystore.password = null
2024-11-08 12:44:33     ssl.keystore.type = JKS
2024-11-08 12:44:33     ssl.protocol = TLSv1.3
2024-11-08 12:44:33     ssl.provider = null
2024-11-08 12:44:33     ssl.secure.random.implementation = null
2024-11-08 12:44:33     ssl.trustmanager.algorithm = PKIX
2024-11-08 12:44:33     ssl.truststore.certificates = null
2024-11-08 12:44:33     ssl.truststore.location = null
2024-11-08 12:44:33     ssl.truststore.password = null
2024-11-08 12:44:33     ssl.truststore.type = JKS
2024-11-08 12:44:33     value.deserializer = class org.springframework.kafka.support.serializer.ErrorHandlingDeserializer

Файл OrderProducer.java:

Код: Выделить всё

package com.Memorium.order_producer;

import com.Memorium.order_producer.payload.Order;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;

@Slf4j
@Service
public class OrderProducer {
public static final String TOPIC = "order";

@Autowired
private KafkaTemplate kafkaTemplate;

public void sendOrder(Order order){
Message  message = MessageBuilder
.withPayload(order)
.setHeader(KafkaHeaders.TOPIC, "order")
.build();
kafkaTemplate.send(message);
log.info("Producer produced Kafka message -> {}", message);
}
}
Файл ProducerController.java:

Код: Выделить всё

package com.Memorium.order_producer;

import com.Memorium.order_producer.payload.Order;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;

@RequestMapping("/api/order")
@RestController
public class ProducerController {
@Autowired
private OrderProducer orderProducer;

@PostMapping("/publish")
public ResponseEntity publish(@RequestBody Order order) {
orderProducer.sendOrder(order);
return ResponseEntity.ok(String.format("Order published, %s", order));
}
}
application.yml для производителя заказов:

Код: Выделить всё

server:
port: 9003

spring:
kafka:
producer:
acks: -1
bootstrap-servers: localhost:29092
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
properties:
spring.json.add.type.headers: false # GOD DAM THIS TOOK ME HOURS TO FIX, this is why consumer keeps trying to access the producer's order class

logging:
level:
root: INFO
org.springframework.kafka: DEBUG
org.apache.kafka: DEBUG
org.apache.kafka.clients.consumer.ConsumerConfig: DEBUG
org.apache.kafka.clients.NetworkClient: TRACE

docker-compose:

Код: Выделить всё

version: '3.8'

services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- "22181:2181"
networks:
- kafka_network

kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- "29092:29092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
networks:
- kafka_network

kafka-ui:
image: provectuslabs/kafka-ui:latest
depends_on:
- kafka
ports:
- "8090:8080"
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181
networks:
- kafka_network

producer:
build:
context: ../order-producer
ports:
- "9003:9003"
depends_on:
- kafka
environment:
- SPRING_KAFKA_BOOTSTRAP_SERVERS=kafka:9092
networks:
- kafka_network

consumer:
build:
context: ../order-consumer
ports:
- "9002:9002"
depends_on:
- kafka
environment:
- SPRING_KAFKA_BOOTSTRAP_SERVERS=kafka:9092
networks:
- kafka_network

networks:
kafka_network:
driver: bridge

Если я попытаюсь опубликовать его производителю, я получу аналогичное сообщение:

Код: Выделить всё

2024-11-08T21:10:21.179Z  WARN 1 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-1] Bootstrap broker localhost:29092 (id: -1 rack: null) disconnected
2024-11-08T21:10:22.085Z  INFO 1 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-1] Node -1 disconnected.
Я пробовал вручную изменить загрузочные серверы application.yml на kafka:9092, но в этом случае я не смогу создать файл .jar. Я также пытался узнать, могу ли я получить доступ к локальному хосту моего локального компьютера от потребителя, и попытаться получить доступ к Kafka таким образом, но безуспешно.
Мне удалось запустить потребителя и будут работать нормально, если я не буду помещать их в контейнер, а вместо этого просто запущу их внутри intelliJ. Kafka по-прежнему будет находиться в контейнере.
Буду признателен за любые подробные объяснения, я все еще изучаю Docker.

Подробнее здесь: https://stackoverflow.com/questions/791 ... -container
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «JAVA»