Код: Выделить всё
online_train_ds = tfio.experimental.streaming.KafkaGroupIODataset(
topics=["first_topic"],
group_id="",
servers="kafka:9092",
stream_timeout=30000,
message_poll_timeout=1000,
configuration=[
"session.timeout.ms=7000",
"max.poll.interval.ms=8000",
"auto.offset.reset=earliest"
],
)
Код: Выделить всё
tensorflow_io/core/kernels/kafka_kernels.cc:1001] Local: Timed out
tensorflow_io/core/kernels/kafka_kernels.cc:1001] Local: Timed out
tensorflow_io/core/kernels/kafka_kernels.cc:1001] Local: Timed out
tensorflow_io/core/kernels/kafka_kernels.cc:1001] Local: Timed out
...
Я хочу обработать все сообщения из темы Kafka до тех пор, пока в течение 30 секунд не будет отправлено сообщение, подобное этой статье: https://www.tensorflow.org/io/tutorials ... e_learning. На данный момент я получаю тайм-аут примерно на 30 секунд, а затем все сообщения обрабатываются и скрипт завершает работу.
Мой брокер Kafka работает как Docker-контейнер с этой конфигурацией:
Код: Выделить всё
services:
kafka:
image: 'bitnami/kafka:latest'
container_name: kafka
environment:
- KAFKA_CFG_NODE_ID=0
- KAFKA_CFG_CONTROLLER_BROKER_ID=0
- KAFKA_CFG_PROCESS_ROLES=controller,broker
- KAFKA_CFG_LISTENERS=PLAINTEXT://kafka:9092,CONTROLLER://kafka:9093
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
ports:
- '9092:9092'
- '9093:9093'
Подробнее здесь: https://stackoverflow.com/questions/792 ... hiodataset
Мобильная версия