Код: Выделить всё
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
При создании того же сообщения в той же теме из простого приложения-производителя Python (вне воздушного потока) он работает нормально. Но при выполнении задачи воздушного потока происходит сбой.
Кластер воздушного потока успешно прослушивает брокеров Kafka.
Аутентификация и авторизация темы выполнены.
Код: Выделить всё
from airflow import DAG
from airflow.operators.python import PythonOperator
from confluent_kafka import Producer
def produce_to_kafka(**context):
# Kafka configuration
KAFKA_CONFIG = {
'bootstrap.servers': 'hostname:port',
'security.protocol': 'ssl',
"partitioner": "random",
'ssl.ca.location': 'path.pem',
'ssl.certificate.location':'path.cer',
'ssl.key.location':'path.key',
}
KAFKA_TOPIC = 'my_topic'
producer = Producer(KAFKA_CONFIG)
producer.produce(KAFKA_TOPIC, key='my_key', value='my_message', partition=1, callback=handle_error)
producer.flush()
with DAG(
dag_id='my_dag',
schedule_interval='0 0 * * *',
start_date=datetime(2025, 1, 1),
catchup=False,
) as dag:
send_to_kafka = PythonOperator(
task_id='send_messages_to_kafka',
python_callable=produce_to_kafka,
provide_context=True,
)
send_to_kafka
Подробнее здесь: https://stackoverflow.com/questions/793 ... -timed-out