Когда я запускаю его через пользовательский интерфейс Airflow, он работает нормально.
Но затем DAG исчезает из списка, и я не знаю почему.
Есть ли что-то очевидное, что я здесь упускаю?
Код: Выделить всё
from datetime import datetime
from airflow import DAG
from airflow.decorators import dag,task
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
from airflow.operators.dummy import DummyOperator
import pandas as pd
default_args = {
"email_on_failure": False,
"email_on_retry": False,
"retries": 0,
"start_date": datetime(2025,1,1),
"catchup": False,
"schedule": 'None',
}
@dag(
default_args=default_args,
dag_display_name="Load all Donations"
)
def opportunity_load():
all_tasks = []
chunk_size = 100
# We load the contact CSV in chunks
cont_counter = 0
for cont_chunk in pd.read_csv('/opt/airflow/data/Contacts.csv',chunksize=chunk_size):
query_parameters = {"affiliate":"uk","contacts":[]}
for index,row in cont_chunk.iterrows():
query_parameters["contacts"].append({
"salesforce_id" : row['Id'],
"first_name" : row['FirstName'],
"last_name" : row['LastName']
})
cont_counter += 1
current_task = SQLExecuteQueryOperator(
task_id = f'cont_push_chunk_{cont_counter}',
sql = {'load_contacts.sql'},
conn_id = 'datawarehouse',
params = query_parameters
)
all_tasks.append(current_task)
for task_i in range(len(all_tasks) - 1):
all_tasks[task_i] >> all_tasks[task_i + 1]
opportunity_load()
Как видите, я разбиваю CSV на фрагменты по 100 (возможно, позже увеличу это число до 1000+). . Затем я помещаю каждый фрагмент в свою базу данных, используя шаблон запроса, хранящийся в файле load_contacts.sql, который вы можете увидеть ниже:
Код: Выделить всё
INSERT INTO `{{params.affiliate}}_contacts`(salesforce_id,first_name,last_name)
VALUES
{% for contact in params.contacts %}
("{{contact.salesforce_id}}","{{contact.first_name}}","{{contact.last_name}}")
{% if not loop.last %}
,
{% endif %}
{% endfor %}
ON DUPLICATE KEY UPDATE
salesforce_id=VALUES(salesforce_id),
first_name = VALUES(first_name),
last_name = VALUES(last_name);
Подробнее здесь: https://stackoverflow.com/questions/793 ... breaking-s