Проблема в следующем:
- Задача генератора должна принимать аргумент, ссылающийся на префикс URI, и это должен быть оцениваемый шаблон Jinja (он ссылается на файл Task_instance. xcom_pull) result
- Задачу генератора необходимо вызывать послеgenerate_run_id, чтобы она знала правильное местоположение.
с ValueError: метод расширения получил неожиданный тип "str" для аргумента ключевого слова "префикс" Может кто-нибудь, пожалуйста, предложите решение этой проблемы? Я, например, не понимаю, как должна работать параметризованная задача, используемая для генерации значений для созданных задачами сопоставлений.
Если есть более простое решение для импорта нескольких файлов из одной группы обеспечения доступности баз данных в GCP Экземпляр Cloud SQL, сообщите мне.
#!/usr/bin/env python
import time
import logging
from airflow.operators.python import get_current_context
from airflow.decorators import task
from airflow.decorators import dag
from airflow import models
from airflow.operators import empty
from airflow.utils.state import State
from airflow.operators.python import task, get_current_context
from airflow.operators.python_operator import PythonOperator
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
from airflow.providers.google.cloud.operators.cloud_sql import CloudSQLExecuteQueryOperator
from airflow.providers.google.cloud.operators.cloud_sql import CloudSQLImportInstanceOperator
from airflow.utils.dates import days_ago
from airflow.models.param import Param
from airflow.models import Variable
from google.cloud import storage
import datetime
ENVIRONMENT_TYPE = # ...
GCP_REGION = # ...
foobar_PROJECT_ID = # ...
POSTGRESQL_PROJECT_ID = # ...
EXPORT_BUCKET = # ...
LOD_GCS_STAGING = # ...
EXPORT_PATH="foobar/{{ dag_run.conf.get('load_date', params.load_date) }}/{{ task_instance.xcom_pull(key='simple_run_id', task_ids='generate_run_id') }}"
EXPORT_URI=str("gs://" + EXPORT_BUCKET + "/" + EXPORT_PATH )
CLOUD_SQL_POSTGRESQL_CONNECTION_ID = "..."
def _generate_run_id(**context):
run_id = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
output_variable_name = 'simple_run_id'
context['ti'].xcom_push(key=output_variable_name, value=run_id)
logging.info(f"Generated '{output_variable_name}' with value: '{run_id}'")
default_args = {
"region": GCP_REGION,
'retries': 0
}
with models.DAG(
"FooBar", # id displayed in the DAG airflow page
default_args=default_args,
schedule_interval=None,
params={
"load_date": Param("2024-11-27", type="string", format="date", examples=["2024-11-27", "2024-06-01", "2023-12-29"])
},
max_active_tasks=1 ) as dag:
start = empty.EmptyOperator(task_id='start', trigger_rule='all_success')
end = PythonOperator(
task_id='end',
provide_context=True,
python_callable=final_status,
trigger_rule='all_done', # Ensures this task runs even if upstream fails
dag=dag,
)
generate_run_id = PythonOperator(
task_id="generate_run_id",
python_callable=_generate_run_id,
dag=dag
)
## ... lines omitted
@task
def build_request_bodies(prefix):
delimiter = '/'
logging.info(f"Listing bucket '{EXPORT_BUCKET}' with prefix: '{prefix}'")
storage_client = storage.Client()
blobs = storage_client.list_blobs(bucket_or_name=EXPORT_BUCKET, prefix=prefix, delimiter=delimiter)
csv_files = []
for blob in blobs:
if blob.name.endswith('.csv'):
csv_files.append(f"gs://{blob.bucket.name}/{blob.name}")
logging.info(f"Found {len(csv_files)} CSV files: '{csv_files}'")
bodies = list([{
"importContext": {
"fileType": "csv",
"uri": exported_file,
"database": "foobar",
"csvImportOptions": {
"table": "foobar_export.foo_landing",
}
}
} for exported_file in csv_files ])
logging.info(f"Built import request bodies: {bodies}'")
return bodies
# Import data exported from BigQuery
import_to_cloud_sql = CloudSQLImportInstanceOperator.partial(
project_id = POSTGRESQL_PROJECT_ID,
task_id = "import_to_cloud_sql",
instance = CLOUD_SQL_POSTGRESQL_INSTANCE,
map_index_template="""{{ task.body.importContext.uri }}"""
).expand(body = build_request_bodies.expand(prefix=f"{EXPORT_PATH}/"))
## ... lines omitted
start >>\
generate_run_id >>\
## ... lines omitted
build_request_bodies(prefix=f"{EXPORT_PATH}/") >>\
import_to_cloud_sql >>\
## ... lines omitted
end
Подробнее здесь: https://stackoverflow.com/questions/793 ... n-previous