Локально: время ожидания истекло при использовании tfio.experimental.streaming.KafkaBatchIODataset.Python

Программы на Python
Ответить
Anonymous
 Локально: время ожидания истекло при использовании tfio.experimental.streaming.KafkaBatchIODataset.

Сообщение Anonymous »

Я хочу читать сообщения, опубликованные в теме Kafka (в данном случае first_topic), и передавать эти сообщения в нейронную сеть с помощью тензорного потока. Для этого я использую модуль tfio.experimental.streaming.KafkaBatchIODataset со следующей конфигурацией:

Код: Выделить всё

online_train_ds = tfio.experimental.streaming.KafkaGroupIODataset(
topics=["first_topic"],
group_id="",
servers="kafka:9092",
stream_timeout=30000,
message_poll_timeout=1000,
configuration=[
"session.timeout.ms=7000",
"max.poll.interval.ms=8000",
"auto.offset.reset=earliest"
],
)
При тестировании конфигурации я всегда получаю следующий вывод команды в Python:

Код: Выделить всё

tensorflow_io/core/kernels/kafka_kernels.cc:1001] Local: Timed out
tensorflow_io/core/kernels/kafka_kernels.cc:1001] Local: Timed out
tensorflow_io/core/kernels/kafka_kernels.cc:1001] Local: Timed out
tensorflow_io/core/kernels/kafka_kernels.cc:1001] Local: Timed out
...
Только в конце, когда истечет период ожидания, сообщения, отправленные в kafka, обрабатываются.
Я хочу обработать все сообщения из темы Kafka до тех пор, пока в течение 30 секунд не будет отправлено сообщение, подобное этой статье: https://www.tensorflow.org/io/tutorials ... e_learning. На данный момент я получаю тайм-аут примерно на 30 секунд, а затем все сообщения обрабатываются и скрипт завершает работу.
Мой брокер Kafka работает как Docker-контейнер с этой конфигурацией:

Код: Выделить всё

services:
kafka:
image: 'bitnami/kafka:latest'
container_name: kafka
environment:
- KAFKA_CFG_NODE_ID=0
- KAFKA_CFG_CONTROLLER_BROKER_ID=0
- KAFKA_CFG_PROCESS_ROLES=controller,broker
- KAFKA_CFG_LISTENERS=PLAINTEXT://kafka:9092,CONTROLLER://kafka:9093
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
ports:
- '9092:9092'
- '9093:9093'
Что я делаю не так?


Подробнее здесь: https://stackoverflow.com/questions/792 ... hiodataset
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «Python»