для каждого идентификатора события , получить сведения о событии
Два шага используют контейнер докеров, так как я не хочу использовать PythonOperator, поскольку мне хотелось бы развернуть проект в в какой-то момент и не хочу, чтобы обработка выполнялась в том же экземпляре, где работает Airflow.
Приведенный ниже код — моя попытка сделать то, что я хочу:
import re
from datetime import datetime
from airflow.decorators import dag, task
from airflow.hooks.base import BaseHook
from airflow.models import Variable
from airflow.models.baseoperator import chain
from airflow.operators.python import PythonOperator
from airflow.providers.docker.operators.docker import DockerOperator
from airflow.providers.slack.notifications.slack import SlackNotifier
from airflow.sensors.base import PokeReturnValue
leagues = ["league1", "league2", "league3"]
def extract_event_ids_from_logs(ti, previous_task):
logs = ti.xcom_pull(key="return_value", task_ids=previous_task)
match = re.search(r"\[([0-9, ]+)\]", logs)
if match:
a = [int(id.strip()) for id in match.group(1).split(",")]
return a
return []
def merge_ids(ti, tasks):
ids = []
for ti_ in tasks:
val = ti.xcom_pull(key="return_value", task_ids=ti_)
ids.extend(val)
return ids
@dag(
start_date=datetime(2024, 11, 1),
schedule="@daily",
catchup=False,
tags=["scraping"],
)
def scraping():
tasks = []
for league in leagues:
fetch_league_events = DockerOperator(
task_id=f"fetch_{league}_events",
max_active_tis_per_dag=1,
image="image:v1",
command=f"fetch-event-ids-from-league --league {league}",
api_version="auto",
auto_remove=True,
tty=True,
xcom_all=False,
mount_tmp_dir=False,
)
extract_ids = PythonOperator(
task_id=f"extract_ids_{league}",
python_callable=extract_event_ids_from_logs,
op_kwargs={"previous_task": fetch_league_events.task_id},
)
fetch_league_events >> extract_ids
tasks.append(extract_ids)
merge_task = PythonOperator(
task_id="merge_ids",
python_callable=merge_ids,
op_kwargs={"tasks": [o.task_id for o in tasks]},
# provide_context=False,
)
for task_ in tasks:
task_ >> merge_task
fetch_event_details = DockerOperator(
task_id="fetch_event_details",
image="image:v1",
).expand(
command=[f"fetch-event --event-id {event_id}" for event_id in "{{ ti.xcom_pull(task_ids='merge_ids', key='return_value') }}"]
)
merge_task >> fetch_event_details
scraping()
Вышеописанное явно не работает, потому что я перебираю строку.
Каков правильный синтаксис для выполнения того, чего я пытаюсь достичь?
Я также пробовал разные вещи, например, я пытался создать оператор Python, который извлекает значения XComs и выполняет цикл с помощью оператора Docker. Я не получал никаких ошибок, но задачи Docker не запускались.
Я пытался поискать в Интернете, но не нашел ничего полезного. Я нашел эту тему, в которой говорилось, что не рекомендуется использовать xcoms для создания дага (здесь). Я понимаю логику, но какой шаблон проектирования будет правильным для достижения того, чего я пытаюсь достичь.
Я создаю даг, который должен делать следующее: [list] [*]извлекать идентификаторы событий [*]для каждого идентификатора события , получить сведения о событии [/list] Два шага используют контейнер докеров, так как я не хочу использовать PythonOperator, поскольку мне хотелось бы развернуть проект в в какой-то момент и не хочу, чтобы обработка выполнялась в том же экземпляре, где работает Airflow. Приведенный ниже код — моя попытка сделать то, что я хочу: [code]import re from datetime import datetime
from airflow.decorators import dag, task from airflow.hooks.base import BaseHook from airflow.models import Variable from airflow.models.baseoperator import chain from airflow.operators.python import PythonOperator from airflow.providers.docker.operators.docker import DockerOperator from airflow.providers.slack.notifications.slack import SlackNotifier from airflow.sensors.base import PokeReturnValue
merge_task = PythonOperator( task_id="merge_ids", python_callable=merge_ids, op_kwargs={"tasks": [o.task_id for o in tasks]}, # provide_context=False, ) for task_ in tasks: task_ >> merge_task
fetch_event_details = DockerOperator( task_id="fetch_event_details", image="image:v1", ).expand( command=[f"fetch-event --event-id {event_id}" for event_id in "{{ ti.xcom_pull(task_ids='merge_ids', key='return_value') }}"] )
merge_task >> fetch_event_details
scraping()
[/code] Вышеописанное явно не работает, потому что я перебираю строку. Каков правильный синтаксис для выполнения того, чего я пытаюсь достичь? Я также пробовал разные вещи, например, я пытался создать оператор Python, который извлекает значения XComs и выполняет цикл с помощью оператора Docker. Я не получал никаких ошибок, но задачи Docker не запускались. [code]def fetch_event_using_docker(ti): event_ids = ti.xcom_pull(task_ids="merge_ids", key="return_value")
for event_id in event_ids: action = DockerOperator( task_id=f"fetch_event_{event_id}", image="image:v1", api_version="auto", auto_remove=True, tty=True, xcom_all=False, mount_tmp_dir=False, max_active_tis_per_dag=1, command=f"fetch-event --event-id {event_id}", )
merge_task >> fetch_task [/code] Я пытался поискать в Интернете, но не нашел ничего полезного. Я нашел эту тему, в которой говорилось, что не рекомендуется использовать xcoms для создания дага (здесь). Я понимаю логику, но какой шаблон проектирования будет правильным для достижения того, чего я пытаюсь достичь.
Хотелось бы задать вопрос по Airflow, в настоящее время, когда мы устанавливаем аргументы DAG по умолчанию на повторную попытку 5 раз,
в случае сбоя
default_args = {
'owner': 'testing',
'retries': 5,
'retry_delay': timedelta(minutes=1)
}
Я разрабатываю проект Python, используя Docker-контейнер, в котором хранится образ, содержащий необходимые зависимости и многое другое. Но у меня возникли проблемы с подключением базы данных Postgre, в которой хранится определенная информация из...