Airflow DAG - контекст доступа с использованием Simplehttpoperator, чтобы включить xcom PullPython

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 Airflow DAG - контекст доступа с использованием Simplehttpoperator, чтобы включить xcom Pull

Сообщение Anonymous »

Я изо всех сил пытаюсь втянуть xcoms в задачу, используя Simplehttpoperator .
Приведенный ниже DAG предназначен для организации ряда запросов (через функции Google Cloud), сделанные в API третьей стороны, хранить CSV в хранилище и в конечном итоге получить доступ ко всем CSVS, Merge Meally, Transformes, Transformes и Post in BICS. /> Задачи находятся в парах, для каждого отчета первая задача запускает облачную функцию, которая генерирует запрос и хранит токен отчета в секретном менеджере, а вторая задача проверяет, доступен ли отчет для загрузки, повторения, пока он не будет, а затем сохранение в последнем задании. Слияет и загружается в bq.
Когда каждая загрузка завершена, я использую repply_filter arg Simplehttpoperator, чтобы сделать имя файла доступным для последующего использования в качестве xcom .

Код: Выделить всё

# Python standard modules
from datetime import datetime, timedelta# Airflow modules
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.providers.http.operators.http import SimpleHttpOperator
import json

default_args = {
'owner': '--',
'depends_on_past': False,
# Start on 27th of June, 2020
'start_date': datetime(2021, 6, 16),
'email': ['--'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 3,
'retry_delay': timedelta(seconds=60),
'provide_context': True
}

with DAG(
"dailymotion_reporting",
default_args=default_args,
schedule_interval='0 6 * * *',
tags=["my_dags"]
) as dag:

def push_xcom(**context):
v = context['ti'].xcom_push(key="filename", value=response.json()["filename"])
return v

def response_check(response):
if response[2] == "report not ready":
print("report not ready: " + report_summary)
return False
elif response[2] == "report downloaded":
print("report downloaded: " + report_summary)
return True

#t1 as request first report
report1_request = SimpleHttpOperator(
task_id= "report1_request",
method='POST',
http_conn_id='report_request_trigger',
endpoint='request_dm_report',
data=json.dumps({
"dimensions": "DAY-VIDEO_ID-VIDEO_OWNER_CHANNEL_SLUG-VISITOR_DOMAIN_GROUP-VISITOR_SUBDOMAIN-VISITOR_DEVICE_TYPE-INVENTORY_POSITION",
"metrics": "TOTAL_INVENTORY",
"product": "EMBED"
}),
headers={"Content-Type": "application/json"}
)
#t2 check report availability until available then download
report1_check_dl = SimpleHttpOperator(
task_id= "report1_check_dl",
method='GET',
http_conn_id='report_request_trigger',
endpoint='check_previously_requested_dm_reports',
response_check = lambda response: True if response.json()["report_status"] == "report downloaded" else False,
response_filter = lambda response: {"filename": response.json()["filename"]}
)
< /code>
Задача, которая предназначена для вытягивания CSV из хранилища, ниже. Я пытаюсь извлечь имена файлов из XCOM, произведенных предыдущими задачами, и включить их в полезную нагрузку данных для моей облачной функции. < /P>
ad_report_transformations = SimpleHttpOperator(
task_id= "ad_report_transformations",
method='POST',
http_conn_id='report_request_trigger',
endpoint='dm_transform_ad_data',
data = json.dumps(" {{ context['ti'].xcom_pull(task_ids=['report1_check_dl', 'report2_check_dl','report3_check_dl', 'report4_check_dl', 'report5_check_dl'], key=return_value.json()['filename']) }} "),
response_check = lambda response: True if response == "ok" else False
)
< /code>
Тем не менее, пробовал много разных методов, я продолжаю получать вариации одной и той же ошибки < /p>
{taskinstance.py:1152} ERROR - 'context' is undefined

Как лучше всего определить контекст, используя Simplehttpoperator? Или есть другой способ привлечь эти значения?>

Подробнее здесь: https://stackoverflow.com/questions/681 ... -xcom-pull
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение
  • Airflow DAG - контекст доступа с использованием Simplehttpoperator, чтобы включить xcom Pull
    Anonymous » » в форуме Python
    0 Ответы
    10 Просмотры
    Последнее сообщение Anonymous
  • Airflow Xcom Pull - строка Jinja заменить кавычки
    Anonymous » » в форуме Python
    0 Ответы
    51 Просмотры
    Последнее сообщение Anonymous
  • Как запустить DAG при успехе другого DAG в Airflow с использованием Python?
    Anonymous » » в форуме Python
    0 Ответы
    37 Просмотры
    Последнее сообщение Anonymous
  • Как запустить DAG при успехе другого DAG в Airflow с использованием Python?
    Anonymous » » в форуме Python
    0 Ответы
    24 Просмотры
    Последнее сообщение Anonymous
  • Скопируйте проект Airflow в каталог Airflow DAG.
    Anonymous » » в форуме Python
    0 Ответы
    30 Просмотры
    Последнее сообщение Anonymous

Вернуться в «Python»