Я изо всех сил пытаюсь втянуть xcoms в задачу, используя Simplehttpoperator. < /p>
Приведенный ниже DAG предназначен для организации ряда запросов (через функции Google Cloud), выполненные в API третьих лиц, хранить CSV в хранилище и в конечном итоге доступ к всем CSV, слияние, преобразование и сохранение в большом запросе. Пары, для каждого отчета первая задача запускает облачную функцию, генерирует запрос и хранит токен отчета в секретном менеджере, а вторая задача проверяет, доступен ли отчет для загрузки, повторения до тех пор, пока он не будет, а затем сохранение в Google Cloud Storage. < /p>
Все CSV будет приведено к последней задаче. Bq.
# Python standard modules
from datetime import datetime, timedelta# Airflow modules
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.providers.http.operators.http import SimpleHttpOperator
import json
default_args = {
'owner': '--',
'depends_on_past': False,
# Start on 27th of June, 2020
'start_date': datetime(2021, 6, 16),
'email': ['--'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 3,
'retry_delay': timedelta(seconds=60),
'provide_context': True
}
with DAG(
"dailymotion_reporting",
default_args=default_args,
schedule_interval='0 6 * * *',
tags=["my_dags"]
) as dag:
def push_xcom(**context):
v = context['ti'].xcom_push(key="filename", value=response.json()["filename"])
return v
def response_check(response):
if response[2] == "report not ready":
print("report not ready: " + report_summary)
return False
elif response[2] == "report downloaded":
print("report downloaded: " + report_summary)
return True
#t1 as request first report
report1_request = SimpleHttpOperator(
task_id= "report1_request",
method='POST',
http_conn_id='report_request_trigger',
endpoint='request_dm_report',
data=json.dumps({
"dimensions": "DAY-VIDEO_ID-VIDEO_OWNER_CHANNEL_SLUG-VISITOR_DOMAIN_GROUP-VISITOR_SUBDOMAIN-VISITOR_DEVICE_TYPE-INVENTORY_POSITION",
"metrics": "TOTAL_INVENTORY",
"product": "EMBED"
}),
headers={"Content-Type": "application/json"}
)
#t2 check report availabilty until available then download
report1_check_dl = SimpleHttpOperator(
task_id= "report1_check_dl",
method='GET',
http_conn_id='report_request_trigger',
endpoint='check_previously_requested_dm_reports',
response_check = lambda response: True if response.json()["report_status"] == "report downloaded" else False,
response_filter = lambda response: {"filename": response.json()["filename"]}
)
< /code>
Задача, которая предназначена для вытягивания CSV из хранилища, ниже. Я пытаюсь извлечь имена файлов из XCOM, произведенных предыдущими задачами, и включить их в полезную нагрузку данных для моей облачной функции. < /P>
ad_report_transformations = SimpleHttpOperator(
task_id= "ad_report_transformations",
method='POST',
http_conn_id='report_request_trigger',
endpoint='dm_transform_ad_data',
data = json.dumps(" {{ context['ti'].xcom_pull(task_ids=['report1_check_dl', 'report2_check_dl','report3_check_dl', 'report4_check_dl', 'report5_check_dl'], key=return_value.json()['filename']) }} "),
response_check = lambda response: True if response == "ok" else False
)
< /code>
Тем не менее, пробовал много разных методов, я продолжаю получать вариации одной и той же ошибки < /p>
{taskinstance.py:1152} ERROR - 'context' is undefined
Как лучше всего определить контекст, используя Simplehttpoperator? Или есть другой способ привлечь эти значения? В большинстве решений, которые я видел с аналогичными проблемами, используют Pythonoperator, который имеет предоставление_контекстового arg, который, по -видимому, включает выше, но я хотел посмотреть, есть ли для меня способ сделать это без необходимости переписать все мои задачи в качестве функций. Спасибо
Я изо всех сил пытаюсь втянуть xcoms в задачу, используя Simplehttpoperator. < /p> Приведенный ниже DAG предназначен для организации ряда запросов (через функции Google Cloud), выполненные в API третьих лиц, хранить CSV в хранилище и в конечном итоге доступ к всем CSV, слияние, преобразование и сохранение в большом запросе. Пары, для каждого отчета первая задача запускает облачную функцию, генерирует запрос и хранит токен отчета в секретном менеджере, а вторая задача проверяет, доступен ли отчет для загрузки, повторения до тех пор, пока он не будет, а затем сохранение в Google Cloud Storage. < /p> Все CSV будет приведено к последней задаче. Bq.[code]# Python standard modules from datetime import datetime, timedelta# Airflow modules from airflow import DAG from airflow.operators.bash_operator import BashOperator from airflow.providers.http.operators.http import SimpleHttpOperator import json
with DAG( "dailymotion_reporting", default_args=default_args, schedule_interval='0 6 * * *', tags=["my_dags"] ) as dag:
def push_xcom(**context): v = context['ti'].xcom_push(key="filename", value=response.json()["filename"]) return v
def response_check(response): if response[2] == "report not ready": print("report not ready: " + report_summary) return False elif response[2] == "report downloaded": print("report downloaded: " + report_summary) return True
#t1 as request first report report1_request = SimpleHttpOperator( task_id= "report1_request", method='POST', http_conn_id='report_request_trigger', endpoint='request_dm_report', data=json.dumps({ "dimensions": "DAY-VIDEO_ID-VIDEO_OWNER_CHANNEL_SLUG-VISITOR_DOMAIN_GROUP-VISITOR_SUBDOMAIN-VISITOR_DEVICE_TYPE-INVENTORY_POSITION", "metrics": "TOTAL_INVENTORY", "product": "EMBED" }), headers={"Content-Type": "application/json"} ) #t2 check report availabilty until available then download report1_check_dl = SimpleHttpOperator( task_id= "report1_check_dl", method='GET', http_conn_id='report_request_trigger', endpoint='check_previously_requested_dm_reports', response_check = lambda response: True if response.json()["report_status"] == "report downloaded" else False, response_filter = lambda response: {"filename": response.json()["filename"]} ) < /code> Задача, которая предназначена для вытягивания CSV из хранилища, ниже. Я пытаюсь извлечь имена файлов из XCOM, произведенных предыдущими задачами, и включить их в полезную нагрузку данных для моей облачной функции. < /P> ad_report_transformations = SimpleHttpOperator( task_id= "ad_report_transformations", method='POST', http_conn_id='report_request_trigger', endpoint='dm_transform_ad_data', data = json.dumps(" {{ context['ti'].xcom_pull(task_ids=['report1_check_dl', 'report2_check_dl','report3_check_dl', 'report4_check_dl', 'report5_check_dl'], key=return_value.json()['filename']) }} "), response_check = lambda response: True if response == "ok" else False ) < /code> Тем не менее, пробовал много разных методов, я продолжаю получать вариации одной и той же ошибки < /p> {taskinstance.py:1152} ERROR - 'context' is undefined[/code] Как лучше всего определить контекст, используя Simplehttpoperator? Или есть другой способ привлечь эти значения? В большинстве решений, которые я видел с аналогичными проблемами, используют Pythonoperator, который имеет предоставление_контекстового arg, который, по -видимому, включает выше, но я хотел посмотреть, есть ли для меня способ сделать это без необходимости переписать все мои задачи в качестве функций. Спасибо
Я изо всех сил пытаюсь втянуть xcoms в задачу, используя Simplehttpoperator .
Приведенный ниже DAG предназначен для организации ряда запросов (через функции Google Cloud), сделанные в API третьей стороны, хранить CSV в хранилище и в конечном итоге...
У меня есть родительское задание DAG и дочернее задание DAG. Задачи дочернего задания должны запускаться при успешном завершении задач родительского задания, которые выполняются ежедневно. Как добавить внешний триггер задания?
У меня есть родительское задание DAG и дочернее задание DAG. Задачи дочернего задания должны запускаться при успешном завершении задач родительского задания, которые выполняются ежедневно. Как добавить внешний триггер задания?