DAG/задача воздушного потока зависла в рабочем состоянииPython

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 DAG/задача воздушного потока зависла в рабочем состоянии

Сообщение Anonymous »


Я новичок в Airflow.

Я создал конвейер ETL и пытаюсь запустить его в DAG. Однако он продолжает работать бесконечно и не может быть завершен.

Я использую Airflow в MacOS Версия Airflow: 2.8.1

Вот мой код DAG:

из datetime import timedelta импорт даты и времени из импорта воздушного потока DAG из airflow.operators.python импортировать PythonOperator из airflow.utils.dates импорт дней_назад импорт спотипи из Spotipy.oauth2 импортировать SpotifyOAuth из dotenv импорта load_dotenv импортировать ОС импортировать панд как pd импортировать sqlalchemy из sqlalchemy.orm импортировать создатель сеанса импортировать sqlite3 защита hello_world(): print("Привет, мир") защита get_played_track_etl_2(): data_location = "sqlite:///my_played_track.db" load_dotenv() Spotify_client_id = os.getenv('CLIENT_ID') Spotify_client_secret = os.getenv('CLIENT_SECRET') redirect_uri = 'http://localhost:5000/callback' область = "прочитано пользователем, недавно воспроизведено, прочитано пользователем" sp_oauth = SpotifyOAuth( client_id = Spotify_client_id, client_secret = Spotify_client_secret, redirect_uri = redirect_uri, область действия = область действия, show_dialog = Ложь ) sp = Spotipy.Spotify(auth_manager=sp_oauth) user_data = sp.current_user() print('Мои данные:') print('Имя:', user_data['display_name']) print('Подписчики:', user_data['подписчики']['всего']) print('Ссылка:', user_data['external_urls']['spotify']) print('Учетная запись:', user_data['продукт']) сегодня = дата-время.дата-время.сейчас() вчера = сегодня - timedelta(дней=60) вчера_unix_timestamp = int(yesterday.timestamp()) * 1000 user_recent_play = sp.current_user_recently_played(limit=50,after=yesterday_unix_timestamp) название_песни = [] имя_альбома = [] URL_альбома = [] имя_художника = [] Artist_genre = [] Artist_id = [] Artist_url = [] Artist_api_href = [] play_at_list = [] временная метка = [] для песни в user_recent_play['items']: song_name.append(песня['трек']['имя']) имя_альбома.append(песня['трек']['альбом']['имя']) album_url.append(песня['трек']['альбом']['external_urls']['spotify']) имя_исполнителя.append(песня['трек']['альбом']['исполнители'][0]['имя']) Artist_id.append(песня['трек']['альбом']['артисты'][0]['id']) Artist_url.append(песня['трек']['альбом']['артисты'][0]['external_urls']['spotify']) Artist_api_href.append(песня['трек']['альбом']['артисты'][0]['href']) play_at_list.append(песня['played_at']) timestamp.append(песня['played_at'][0:10]) для идентификатора в Artist_id: Artist_info = sp.artist(artist_id=id) Artist_genre.append(artist_info['genres']) song_dict = { «Название_песни»: имя_песни, 'album_name': имя_альбома, 'album_url': album_url, 'имя_художника': имя_художника, 'artist_genre': Artist_genre, 'artist_id': Artist_id, 'artist_url': Artist_url, 'artist_api_href': Artist_api_href, 'played_at': play_at_list, 'метка времени': отметка времени } df_song = pd.DataFrame( данные = song_dict, столбцы = song_dict.keys() ) если не df_song.empty: df_song.artist_genre = df_song.artist_genre.str.join(',') печать (df_song) # нагрузка engine = sqlalchemy.create_engine(database_location) conn = sqlite3.connect('my_played_track.db') курсор = conn.cursor() sql_query = """ СОЗДАТЬ ТАБЛИЦУ, ЕСЛИ НЕ СУЩЕСТВУЕТ my_played_track( название_песни ТЕКСТ, название_альбома ТЕКСТ, альбом_url ТЕКСТ, имя_художника ТЕКСТ, художник_жанр ТЕКСТ, Artist_id ТЕКСТ, Artist_url ТЕКСТ, Artist_api_href ТЕКСТ, играл_в ТЕКСТ, временная метка ТЕКСТ, CONSTRAINT Primary_key_constraint ПЕРВИЧНЫЙ КЛЮЧ (played_at) ) """ курсор.execute(sql_query) print("Успешное подключение к базе данных") пытаться: df_song.to_sql( "мой_воспроизведенный_трек", кон=конн, индекс = Ложь, if_exists='добавить') кроме: print("Данные уже существуют в базе данных") конн.закрыть() print("Отключить базу данных успешно") default_args = { 'владелец': 'воздушный поток', 'start_date': дней_назад(0,0,0,0,0), 'электронная почта': ['airflow@example.com'], 'email_on_failure': Ложь, 'email_on_retry': Ложь, «Повторные попытки»: 1, 'retry_delay': timedelta(минуты=1), } даг = ДАГ( dag_id = 'spotify_dag', default_args=default_args, описание='Наша первая группа обеспечения доступности баз данных с процессом ETL.', Schedule_interval=timedelta(дни=1), догонять = Ложь ) Task_a = PythonOperator( идентификатор_задачи = 'а', python_callable = hello_world, даг = даг ) run_etl = PythonOperator( Task_id = 'whole_spotify_etl', python_callable = get_played_track_etl_2, даг = даг, ) задача_а >> run_etl Task_a работает нормально, но run_etl работал бесконечно без каких-либо результатов.

страница журнала задач воздушного потока

Сама функция get_played_track_etl_2 не имеет проблем и может работать без использования воздушного потока

Вот подробности об экземпляре задачи

Сведения об экземпляре задачи Причина зависимости Экземпляр задачи не запущен. Задача находится в рабочем состоянии. Состояние экземпляра задачи Задача находится в «выполняющемся» состоянии. Атрибуты экземпляра задачи Значение атрибута custom_operator_name Нет dag_model dag_run продолжительность Нет end_date Нет Execution_date 2024-02-28 00:00:00+00:00 исполнитель_конфигурация {} external_executor_id Нет имя хоста corey.mynet is_premature Ложь is_trigger_log_context Ложь идентификатор вакансии 65 индекс_карты -1 max_tries 1 метаданные Метаданные() next_kwargs Нет next_method Нет next_try_number 2 примечание Нет оператор PythonOperator имя_оператора PythonOperator идентификатор 80220 пул по умолчанию_пул пул_слотов 1 prev_attempted_tries 1 previous_start_date_success Нет previous_ti Нет previous_ti_success Нет Priority_weight 1 очередь по умолчанию очередь_by_job очередь_by_job_id 57 очередь_dttm 29.02.2024, 00:10:58 реестр rendered_task_instance_fields Нет run_as_user Нет run_id запланировано__2024-02-28T00:00:00+00:00 start_date 2024-02-29, 00:11:01 государственное управление stats_tags {'dag_id': 'spotify_dag', 'task_id': 'whole_spotify_etl'} Task_id Whole_spotify_etl Task_instance_note Нет test_mode Ложь триггер_ид нет триггер_таймаут Нет попытка_номер 1 имя unixname Кори обновлено_в 2024-02-29, 00:11:01 Атрибуты задачи Значение атрибута СЕРИАЛИЗАЦИЯ_ВЕРСИЯ 1 dag_id Spotify_dag depend_on_past Ложь deps FrostedSet({, , , }) do_xcom_push Правда downstream_task_ids () электронная почта ['airflow@example.com'] email_on_failure Ложь email_on_retry Ложь end_date Нет Execution_timeout Нет исполнитель_конфигурация {} дополнительные_ссылки [] ignore_first_dependents_on_past True входы [] is_setup Ложь is_teardown Ложь ярлык Whole_spotify_etl max_active_tis_per_dag Нет max_active_tis_per_dagrun Нет max_retry_delay Нет on_execute_callback Нет on_failure_callback Нет on_failure_fail_dagrun Ложь on_retry_callback Нет on_success_callback Нет op_args [] op_kwargs {} оператор_extra_links () имя_оператора PythonOperator магазины [] вывод {{ Task_instance.xcom_pull(task_ids='whole_spotify_etl', dag_id='spotify_dag', key='return_value') }} воздушный поток владельца параметры {} пул по умолчанию_пул пул_слотов 1 Priority_weight 1 Priority_weight_total 1 очередь по умолчанию ресурсы Нет повторная попытка 1 retry_delay 0:01:00 retry_exponential_backoff Ложь run_as_user Нет small_copy_attrs () sla Нет start_date 2024-02-28 00:00:00+00:00 субдаг Нет support_lineage Ложь Task_id Whole_spotify_etl тип_задачи PythonOperator шаблон_ext [] template_fields ['templates_dict', 'op_args', 'op_kwargs'] template_fields_renderers {'templates_dict': 'json', 'op_args': 'py', 'op_kwargs': 'py'} templates_dict Нет триггер_правило all_success ui_color #ffefeb ui_fgcolor #000 upstream_task_ids («а»,) wait_for_downstream Ложь wait_for_past_dependents_before_skipping Ложь Weight_rule ниже по течению Сведения об экземпляре задачи

В папке журналов я могу найти журнал задачи «run_etl»

Вот журнал задачи «run_etl»

[2024-02-29T08:11:01.249+0800] {taskinstance.py:1956} ИНФОРМАЦИЯ – все зависимости соблюдены для dep_context=non-requeueable deps ti= [2024-02-29T08:11:01.253+0800] {taskinstance.py:1956} ИНФОРМАЦИЯ. Все зависимости соблюдены для dep_context=requeueable deps ti= [2024-02-29T08:11:01.253+0800] {taskinstance.py:2170} ИНФОРМАЦИЯ — Стартовая попытка 1 из 2 [2024-02-29T08:11:01.262+0800] {taskinstance.py:2191} ИНФОРМАЦИЯ. Выполнение 28 февраля 2024 г. 00:00:00+00:00 [2024-02-29T08:11:01.268+0800] {standard_task_runner.py:60} ИНФОРМАЦИЯ – запущен процесс 80220 для запуска задачи. [2024-02-29T08:11:01.273+0800] {standard_task_runner.py:87} ИНФОРМАЦИЯ – Выполнение: ['airflow', 'tasks', 'run', 'spotify_dag', 'whole_spotify_etl', 'scheduled__2024-02- 28T00:00:00+00:00', '--job-id', '65', '--raw', '--subdir', 'DAGS_FOLDER/spotify_dag.py', '--cfg-path' , '/var/folders/y9/wh0jz8056576vp427n6d9rtw0000gn/T/tmpvxqd54p5'] [2024-02-29T08:11:01.275+0800] {standard_task_runner.py:88} ИНФОРМАЦИЯ – задание 65: подзадача Whole_spotify_etl [2024-02-29T08:11:01.309+0800] {task_command.py:423} ИНФОРМАЦИЯ – запуск на хосте corey.mynet [2024-02-29T08:11:01.345+0800] {taskinstance.py:2480} ИНФОРМАЦИЯ. Экспорт переменных среды: AIRFLOW_CTX_DAG_EMAIL='airflow@example.com' AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='spotify_dag' AIRFLOW_CTX_TASK_ ID = 'whole_spotify_etl' AIRFLOW_CTX_EXECUTION_DATE='2024-02-28T00:00:00+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='scheduled__2024-02-28T00:00:00+00:00' [2024-02-29T14:36:18.844+0800] {local_task_job_runner.py:302} ПРЕДУПРЕЖДЕНИЕ. Состояние этого экземпляра внешне установлено на «Нет». Завершающий экземпляр. [2024-02-29T14:36:18.870+0800] {process_utils.py:131} ИНФОРМАЦИЯ — отправка Signals.SIGTERM в группу 80220. PID всех процессов в группе: [80220] [2024-02-29T14:36:18.871+0800] {process_utils.py:86} ИНФОРМАЦИЯ — отправка сигнала Signals.SIGTERM в группу 80220 [2024-02-29T14:37:18.901+0800] {process_utils.py:149} ПРЕДУПРЕЖДЕНИЕ - обработайте psutil.Process(pid=80220, name='Python', status='running', начато='08:11: 01') не ответил на SIGTERM. Пробую SIGKILL [2024-02-29T14:37:18.906+0800] {process_utils.py:86} ИНФОРМАЦИЯ — отправка сигнала Signals.SIGKILL в группу 80220 [2024-02-29T14:37:18.914+0800] {process_utils.py:79} ИНФОРМАЦИЯ - Процесс psutil.Process(pid=80220, name='Python', status='terrated', exitcode=, start='08:11:01') (80220) завершено с кодом выхода Negsignal.SIGKILL [2024-02-29T14:37:18.915+0800] {standard_task_runner.py:175} ОШИБКА — задание 65 было прекращено до завершения (вероятно, из-за нехватки памяти) Вопрос: Как мне правильно запустить задачу с воздушным потоком?

Ожидается, что все задачи будут работать правильно.

Однако только Task_a может работать правильно, а Task run_etl выполняется бесконечно без каких-либо результатов.

Кроме того, терминал продолжает выдавать эти сообщения

Вход в терминал в vscode
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «Python»