Задача воздушного потока с параметром для сопоставлений, созданных задачами, после определенной предыдущей задачиPython

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 Задача воздушного потока с параметром для сопоставлений, созданных задачами, после определенной предыдущей задачи

Сообщение Anonymous »

Я работаю с GCP Cloud Composer (Airflow), где мне хотелось бы перечислить файлы .csv в сегменте хранилища GCS с определенным префиксом и начать импорт в экземпляр GCP Cloud SQL для каждого из них. Я хотел бы использовать динамическое сопоставление задач Airflow с сопоставлениями, созданными задачами: то есть получить одну задачу для создания списка, по которому затем расширяется задача импорта.
Проблема в следующем:
  • Задача генератора должна принимать аргумент, ссылающийся на префикс URI, и это должен быть оцениваемый шаблон Jinja (он ссылается на файл Task_instance. xcom_pull) result
  • Задачу генератора необходимо вызывать послеgenerate_run_id, чтобы она знала правильное местоположение.
У меня есть что-то похожее, но теперь оно не работает, эта ошибка при вызове расширения
с ValueError: метод расширения получил неожиданный тип "str" ​​для аргумента ключевого слова "префикс" Может кто-нибудь, пожалуйста, предложите решение этой проблемы? Я, например, не понимаю, как должна работать параметризованная задача, используемая для генерации значений для созданных задачами сопоставлений.
Если есть более простое решение для импорта нескольких файлов из одной группы обеспечения доступности баз данных в GCP Экземпляр Cloud SQL, сообщите мне.
#!/usr/bin/env python

import time
import logging

from airflow.operators.python import get_current_context

from airflow.decorators import task
from airflow.decorators import dag

from airflow import models
from airflow.operators import empty
from airflow.utils.state import State

from airflow.operators.python import task, get_current_context

from airflow.operators.python_operator import PythonOperator

from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator

from airflow.providers.google.cloud.operators.cloud_sql import CloudSQLExecuteQueryOperator
from airflow.providers.google.cloud.operators.cloud_sql import CloudSQLImportInstanceOperator

from airflow.utils.dates import days_ago

from airflow.models.param import Param
from airflow.models import Variable

from google.cloud import storage

import datetime

ENVIRONMENT_TYPE = # ...

GCP_REGION = # ...
foobar_PROJECT_ID = # ...

POSTGRESQL_PROJECT_ID = # ...

EXPORT_BUCKET = # ...

LOD_GCS_STAGING = # ...

EXPORT_PATH="foobar/{{ dag_run.conf.get('load_date', params.load_date) }}/{{ task_instance.xcom_pull(key='simple_run_id', task_ids='generate_run_id') }}"

EXPORT_URI=str("gs://" + EXPORT_BUCKET + "/" + EXPORT_PATH )

CLOUD_SQL_POSTGRESQL_CONNECTION_ID = "..."

def _generate_run_id(**context):
run_id = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
output_variable_name = 'simple_run_id'
context['ti'].xcom_push(key=output_variable_name, value=run_id)
logging.info(f"Generated '{output_variable_name}' with value: '{run_id}'")

default_args = {
"region": GCP_REGION,
'retries': 0
}
with models.DAG(
"FooBar", # id displayed in the DAG airflow page
default_args=default_args,
schedule_interval=None,
params={
"load_date": Param("2024-11-27", type="string", format="date", examples=["2024-11-27", "2024-06-01", "2023-12-29"])
},
max_active_tasks=1 ) as dag:
start = empty.EmptyOperator(task_id='start', trigger_rule='all_success')

end = PythonOperator(
task_id='end',
provide_context=True,
python_callable=final_status,
trigger_rule='all_done', # Ensures this task runs even if upstream fails
dag=dag,
)

generate_run_id = PythonOperator(
task_id="generate_run_id",
python_callable=_generate_run_id,
dag=dag
)

## ... lines omitted

@task
def build_request_bodies(prefix):

delimiter = '/'

logging.info(f"Listing bucket '{EXPORT_BUCKET}' with prefix: '{prefix}'")

storage_client = storage.Client()
blobs = storage_client.list_blobs(bucket_or_name=EXPORT_BUCKET, prefix=prefix, delimiter=delimiter)

csv_files = []

for blob in blobs:
if blob.name.endswith('.csv'):
csv_files.append(f"gs://{blob.bucket.name}/{blob.name}")

logging.info(f"Found {len(csv_files)} CSV files: '{csv_files}'")

bodies = list([{
"importContext": {
"fileType": "csv",
"uri": exported_file,
"database": "foobar",
"csvImportOptions": {
"table": "foobar_export.foo_landing",
}
}
} for exported_file in csv_files ])

logging.info(f"Built import request bodies: {bodies}'")

return bodies

# Import data exported from BigQuery
import_to_cloud_sql = CloudSQLImportInstanceOperator.partial(
project_id = POSTGRESQL_PROJECT_ID,
task_id = "import_to_cloud_sql",
instance = CLOUD_SQL_POSTGRESQL_INSTANCE,
map_index_template="""{{ task.body.importContext.uri }}"""
).expand(body = build_request_bodies.expand(prefix=f"{EXPORT_PATH}/"))

## ... lines omitted

start >>\
generate_run_id >>\
## ... lines omitted
build_request_bodies(prefix=f"{EXPORT_PATH}/") >>\
import_to_cloud_sql >>\
## ... lines omitted
end



Подробнее здесь: https://stackoverflow.com/questions/793 ... n-previous
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «Python»