Код: Выделить всё
version: '3.8'
services:
service-a:
container_name: service-a
build:
context: .
dockerfile: Dockerfile
ports:
- "8881:8881"
environment:
- ENVIRONMENT=development
depends_on:
- kafka
- chroma
kafka:
container_name: my-kafka
image: bitnami/kafka:latest
ports:
- "9094:9094"
environment:
- KAFKA_ENABLE_KRAFT=yes
- KAFKA_CFG_BROKER_ID=1
- KAFKA_CFG_NODE_ID=1
- KAFKA_CFG_PROCESS_ROLES=broker,controller
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,EXTERNAL://localhost:9094
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@:9093
- ALLOW_PLAINTEXT_LISTENER=yes
chroma:
container_name: learnsuite-chroma
image: chromadb/chroma:latest
ports:
- "8882:8000"
Код: Выделить всё
bootstrap_servers: kafka:9092
Код: Выделить всё
def get_consumer_for_topic(topic: str):
group_id = settings.kafka.group
bootstrap = settings.kafka.bootstrap_servers
bootstrap_servers = bootstrap.split(',') if bootstrap else []
admin_client = KafkaAdminClient(bootstrap_servers=bootstrap_servers)
try:
topics = admin_client.list_topics()
if topic not in topics:
# Create the topic if it doesn't exist
new_topic = NewTopic(name=topic, num_partitions=1, replication_factor=1)
admin_client.create_topics([new_topic])
logger.info(f"Topic '{topic}' created.")
else:
logger.info(f"Topic '{topic}' already exists.")
except Exception as e:
logger.error(f"Error ensuring topic exists: {e}")
finally:
admin_client.close()
return KafkaConsumer(
topic,
bootstrap_servers=bootstrap_servers,
group_id = group_id,
auto_offset_reset='earliest'
)
kafka.errors.NoBrokersAvailable: NoBrokersAvailable
Я знаю, что многие люди говорят об этом, но именно сюда я попал даже после того, как погуглил. Пожалуйста, сообщите.
Ответ: Поскольку вопрос был помечен как удаленный, похоже, я не могу на него ответить, поэтому обновите здесь информацию для людей, у которых может возникнуть аналогичная проблема.
Дело в том, что у service-a очень быстрый процесс загрузки. Часть создания темы попала в раздел до полной загрузки Kafka. Поэтому я добавил механизм повторной попытки, чтобы он мог подождать. После этого из журнала я могу сказать, что в первый раз произошел сбой, но затем он загрузился во второй раз и в конечном итоге загрузился успешно.
Подробнее здесь: https://stackoverflow.com/questions/791 ... ontainer-t