KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Локально: время ожидания сообщения истекло"}Python

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Локально: время ожидания сообщения истекло"}

Сообщение Anonymous »

Код: Выделить всё

KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}

Обнаружена ошибка при создании сообщения в теме Kafka из задачи воздушного потока (оператор Python).
При создании того же сообщения в той же теме из простого приложения-производителя Python (вне воздушного потока) он работает нормально. Но при выполнении задачи воздушного потока происходит сбой.
Кластер воздушного потока успешно прослушивает брокеров Kafka.
Аутентификация и авторизация темы выполнены.

Код: Выделить всё

from airflow import DAG
from airflow.operators.python import PythonOperator
from confluent_kafka import Producer

def produce_to_kafka(**context):
# Kafka configuration
KAFKA_CONFIG = {
'bootstrap.servers': 'hostname:port',
'security.protocol': 'ssl',
"partitioner": "random",
'ssl.ca.location': 'path.pem',
'ssl.certificate.location':'path.cer',
'ssl.key.location':'path.key',
}
KAFKA_TOPIC = 'my_topic'

producer = Producer(KAFKA_CONFIG)

producer.produce(KAFKA_TOPIC, key='my_key', value='my_message', partition=1, callback=handle_error)
producer.flush()

with DAG(
dag_id='my_dag',
schedule_interval='0 0 * * *',
start_date=datetime(2025, 1, 1),
catchup=False,
) as dag:

send_to_kafka = PythonOperator(
task_id='send_messages_to_kafka',
python_callable=produce_to_kafka,
provide_context=True,
)

send_to_kafka

Я попытался создать простое приложение Python производителя вне воздушного потока с теми же конфигурациями производителя, и оно работает нормально.

Подробнее здесь: https://stackoverflow.com/questions/793 ... -timed-out
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «Python»