Airflow DAG с несколькими динамическими задачами SQLExecuteQueryOperator продолжает тихо прерыватьсяPython

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 Airflow DAG с несколькими динамическими задачами SQLExecuteQueryOperator продолжает тихо прерываться

Сообщение Anonymous »

Я пытаюсь переместить файл CSV в базу данных MariaDB. Для этого я написал следующий DAG.
Когда я запускаю его через пользовательский интерфейс Airflow, он работает нормально.
Но затем DAG исчезает из списка, и я не знаю почему.
Есть ли что-то очевидное, что я здесь упускаю?

Код: Выделить всё

from datetime import datetime

from airflow import DAG
from airflow.decorators import dag,task
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
from airflow.operators.dummy import DummyOperator

import pandas as pd

default_args = {
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 0,
    "start_date": datetime(2025,1,1),
    "catchup": False,
    "schedule": 'None',
}

@dag(
        default_args=default_args,
        dag_display_name="Load all Donations"
     )
def opportunity_load():
    all_tasks = []
    chunk_size = 100

    # We load the contact CSV in chunks
    cont_counter = 0
   
    for cont_chunk in pd.read_csv('/opt/airflow/data/Contacts.csv',chunksize=chunk_size):
        query_parameters = {"affiliate":"uk","contacts":[]}
        for index,row in cont_chunk.iterrows():
            query_parameters["contacts"].append({
                "salesforce_id" : row['Id'],
                "first_name" : row['FirstName'],
                "last_name" : row['LastName']
            })            
            cont_counter += 1
   
        current_task =  SQLExecuteQueryOperator(
            task_id = f'cont_push_chunk_{cont_counter}',
            sql = {'load_contacts.sql'},
            conn_id = 'datawarehouse',
            params = query_parameters
        )
        all_tasks.append(current_task)

    for task_i in range(len(all_tasks) - 1):
        all_tasks[task_i] >> all_tasks[task_i + 1]

opportunity_load()
Я заменил SQLExecuteQueryOperator фиктивными задачами, и это устранило проблему, поэтому я предполагаю, что где-то есть проблема с моей задачей SQL. Странно то, что DAG по-прежнему работает в командной строке.
Как видите, я разбиваю CSV на фрагменты по 100 (возможно, позже увеличу это число до 1000+). . Затем я помещаю каждый фрагмент в свою базу данных, используя шаблон запроса, хранящийся в файле load_contacts.sql, который вы можете увидеть ниже:

Код: Выделить всё

INSERT INTO `{{params.affiliate}}_contacts`(salesforce_id,first_name,last_name)
VALUES
{% for contact in params.contacts %}
("{{contact.salesforce_id}}","{{contact.first_name}}","{{contact.last_name}}")
{% if not loop.last %}
,
{% endif %}
{% endfor %}
ON DUPLICATE KEY UPDATE
salesforce_id=VALUES(salesforce_id),
first_name = VALUES(first_name),
last_name = VALUES(last_name);
Это все происходит на локальном компьютере с использованием Docker.

Подробнее здесь: https://stackoverflow.com/questions/793 ... breaking-s
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «Python»