Я изо всех сил пытаюсь втянуть xcoms в задачу, используя Simplehttpoperator .
Приведенный ниже DAG предназначен для организации ряда запросов (через функции Google Cloud), сделанные в API третьей стороны, хранить CSV в хранилище и в конечном итоге получить доступ ко всем CSVS, Merge Meally, Transformes, Transformes и Post in BICS. /> Задачи находятся в парах, для каждого отчета первая задача запускает облачную функцию, которая генерирует запрос и хранит токен отчета в секретном менеджере, а вторая задача проверяет, доступен ли отчет для загрузки, повторения, пока он не будет, а затем сохранение в последнем задании. Слияет и загружается в bq.
Когда каждая загрузка завершена, я использую repply_filter arg Simplehttpoperator, чтобы сделать имя файла доступным для последующего использования в качестве xcom .
Я изо всех сил пытаюсь втянуть xcoms в задачу, используя Simplehttpoperator . Приведенный ниже DAG предназначен для организации ряда запросов (через функции Google Cloud), сделанные в API третьей стороны, хранить CSV в хранилище и в конечном итоге получить доступ ко всем CSVS, Merge Meally, Transformes, Transformes и Post in BICS. /> Задачи находятся в парах, для каждого отчета первая задача запускает облачную функцию, которая генерирует запрос и хранит токен отчета в секретном менеджере, а вторая задача проверяет, доступен ли отчет для загрузки, повторения, пока он не будет, а затем сохранение в последнем задании. Слияет и загружается в bq. Когда каждая загрузка завершена, я использую repply_filter arg Simplehttpoperator, чтобы сделать имя файла доступным для последующего использования в качестве xcom . [code]# Python standard modules from datetime import datetime, timedelta# Airflow modules from airflow import DAG from airflow.operators.bash_operator import BashOperator from airflow.providers.http.operators.http import SimpleHttpOperator import json
with DAG( "dailymotion_reporting", default_args=default_args, schedule_interval='0 6 * * *', tags=["my_dags"] ) as dag:
def push_xcom(**context): v = context['ti'].xcom_push(key="filename", value=response.json()["filename"]) return v
def response_check(response): if response[2] == "report not ready": print("report not ready: " + report_summary) return False elif response[2] == "report downloaded": print("report downloaded: " + report_summary) return True
#t1 as request first report report1_request = SimpleHttpOperator( task_id= "report1_request", method='POST', http_conn_id='report_request_trigger', endpoint='request_dm_report', data=json.dumps({ "dimensions": "DAY-VIDEO_ID-VIDEO_OWNER_CHANNEL_SLUG-VISITOR_DOMAIN_GROUP-VISITOR_SUBDOMAIN-VISITOR_DEVICE_TYPE-INVENTORY_POSITION", "metrics": "TOTAL_INVENTORY", "product": "EMBED" }), headers={"Content-Type": "application/json"} ) #t2 check report availability until available then download report1_check_dl = SimpleHttpOperator( task_id= "report1_check_dl", method='GET', http_conn_id='report_request_trigger', endpoint='check_previously_requested_dm_reports', response_check = lambda response: True if response.json()["report_status"] == "report downloaded" else False, response_filter = lambda response: {"filename": response.json()["filename"]} ) < /code> Задача, которая предназначена для вытягивания CSV из хранилища, ниже. Я пытаюсь извлечь имена файлов из XCOM, произведенных предыдущими задачами, и включить их в полезную нагрузку данных для моей облачной функции. < /P> ad_report_transformations = SimpleHttpOperator( task_id= "ad_report_transformations", method='POST', http_conn_id='report_request_trigger', endpoint='dm_transform_ad_data', data = json.dumps(" {{ context['ti'].xcom_pull(task_ids=['report1_check_dl', 'report2_check_dl','report3_check_dl', 'report4_check_dl', 'report5_check_dl'], key=return_value.json()['filename']) }} "), response_check = lambda response: True if response == "ok" else False ) < /code> Тем не менее, пробовал много разных методов, я продолжаю получать вариации одной и той же ошибки < /p> {taskinstance.py:1152} ERROR - 'context' is undefined[/code] Как лучше всего определить контекст, используя Simplehttpoperator? Или есть другой способ привлечь эти значения?>
Я изо всех сил пытаюсь втянуть xcoms в задачу, используя Simplehttpoperator.
Приведенный ниже DAG предназначен для организации ряда запросов (через функции Google Cloud), выполненные в API третьих лиц, хранить CSV в хранилище и в конечном итоге...
У меня есть родительское задание DAG и дочернее задание DAG. Задачи дочернего задания должны запускаться при успешном завершении задач родительского задания, которые выполняются ежедневно. Как добавить внешний триггер задания?
У меня есть родительское задание DAG и дочернее задание DAG. Задачи дочернего задания должны запускаться при успешном завершении задач родительского задания, которые выполняются ежедневно. Как добавить внешний триггер задания?