Мои действия
1 Прочитайте эту статью https:/ /www.confluent.io/blog/kafka-listeners-explained/
2 Создана инфраструктура Kafka в докере
Код: Выделить всё
version: '2'
services:
zookeeper:
image: "confluentinc/cp-zookeeper:5.2.1"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
# This has three listeners you can experiment with.
# BOB for internal traffic on the Docker network
# FRED for traffic from the Docker-host machine (`localhost`)
# ALICE for traffic from outside, reaching the Docker host on the DNS name `never-gonna-give-you-up`
# Use
kafka0:
image: "confluentinc/cp-enterprise-kafka:5.2.1"
ports:
- '9092:9092'
- '29094:29094'
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 0
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENERS: LISTENER_BOB://kafka0:29092,LISTENER_FRED://kafka0:9092,LISTENER_ALICE://kafka0:29094
KAFKA_ADVERTISED_LISTENERS: LISTENER_BOB://kafka0:29092,LISTENER_FRED://localhost:9092,LISTENER_ALICE://never-gonna-give-you-up:29094
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_BOB:PLAINTEXT,LISTENER_FRED:PLAINTEXT,LISTENER_ALICE:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_BOB
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 100
Код: Выделить всё
import asyncio
from aiokafka import AIOKafkaProducer
async def send_to_kafka():
producer = AIOKafkaProducer(
bootstrap_servers='localhost:9092',
enable_idempotence=True)
await producer.start()
try:
await producer.send_and_wait("my-topic-1", b"Super message")
finally:
await producer.stop()
asyncio.run(send_to_kafka())
Код: Выделить всё
Topic my-topic-1 not found in cluster metadata
Код: Выделить всё
docker exec -ti fetcher-service-kafka0-1 bash
kafka-topics --bootstrap-server kafka0:29092 --create --if-not-exists --topic my-topic-1 --replication-factor 1 --partitions 1
[2024-11-27 12:11:11,237] WARN Couldn't resolve server kafka0:29092 from bootstrap.servers as DNS resolution failed for kafka0 (org.apache.kafka.clients.ClientUtils)
fetcher-service-init-kafka-1 | Exception in thread "main" org.apache.kafka.common.KafkaException: Failed to create new KafkaAdminClient
fetcher-service-init-kafka-1 | at org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:386)
fetcher-service-init-kafka-1 | at org.apache.kafka.clients.admin.AdminClient.create(AdminClient.java:55)
fetcher-service-init-kafka-1 | at kafka.admin.TopicCommand$AdminClientTopicService$.createAdminClient(TopicCommand.scala:150)
fetcher-service-init-kafka-1 | at kafka.admin.TopicCommand$AdminClientTopicService$.apply(TopicCommand.scala:154)
fetcher-service-init-kafka-1 | at kafka.admin.TopicCommand$.main(TopicCommand.scala:55)
fetcher-service-init-kafka-1 | at kafka.admin.TopicCommand.main(TopicCommand.scala)
fetcher-service-init-kafka-1 | Caused by: org.apache.kafka.common.config.ConfigException: No resolvable bootstrap urls given in bootstrap.servers
fetcher-service-init-kafka-1 | at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:90)
fetcher-service-init-kafka-1 | at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:49)
fetcher-service-init-kafka-1 | at org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:346)
Подробнее здесь: https://stackoverflow.com/questions/792 ... -in-docker
Мобильная версия