Предоставление нужного порта службам воздушного потока в докере ⇐ Python
Предоставление нужного порта службам воздушного потока в докере
Я пытаюсь построить минимальный конвейер данных, используя Docker, Postgres и Airflow. Мой файл docker-compose.yaml можно найти здесь, он расширен из документации airflow здесь. Я расширил его, включив в него отдельную базу данных Postgres, куда я буду записывать данные, и экземпляр pgadmin (они добавляются внизу).
Я могу подтвердить, что службы работают и доступны, когда я запускаю docker Compose Up -d, и я могу войти в веб-интерфейс Airflow, чтобы просмотреть свои данные. Я создал очень простой инструмент для ежеминутной вставки даты и времени в таблицу. Код дага показан ниже:
из DAG импорта воздушного потока из airflow.operators.python импортировать PythonOperator из даты и времени импорта даты и времени, timedelta импорт psycopg2 из airflow.hooks.postgres_hook импортировать PostgresHook default_args = { 'владелец': 'воздушный поток', «Повторные попытки»: 1, 'retry_delay': timedelta(минут=5), «start_date»: дата-время (2024, 1, 1), } защита create_table(): pg_hook = PostgresHook(postgres_conn_id='postgres_default') конн = pg_hook.get_conn() курсор = conn.cursor() create_query = """ СОЗДАТЬ ТАБЛИЦУ, ЕСЛИ НЕ СУЩЕСТВУЕТ fact_datetime ( дата и время TIMESTAMP ); """ курсор.execute(create_query) конн.коммит() курсор.закрыть() конн.закрыть() защита Insert_datetime(): pg_hook = PostgresHook(postgres_conn_id='postgres_default') конн = pg_hook.get_conn() курсор = conn.cursor() вставить_запрос = """ INSERT INTO fact_datetime (дата и время) ЗНАЧЕНИЯ (СЕЙЧАС()); """ курсор.execute(insert_query) конн.коммит() курсор.закрыть() конн.закрыть() с DAG('insert_datetime_dag', default_args=default_args, описание = 'DAG для вставки текущей даты и времени каждую минуту', Schedule_interval='*/1 * * * *', catchup=False) как даг: create_table_task = PythonOperator( Task_id = 'create_table', python_callable = create_table, ) Insert_datetime_task = PythonOperator( Task_id = 'insert_datetime', python_callable =insert_datetime, ) create_table_task >> Insert_datetime_task Перед запуском этого дага я добавил соединение Postgres в веб-интерфейс Airflow, что должно позволить мне использовать PostgreHook.
Когда я запускаю даг, кажется, что выполнение зависло на задаче create_table со следующими журналами:
ce682335169d *** Найдены локальные файлы: *** * /opt/airflow/logs/dag_id=insert_datetime_dag/run_id=scheduled__2024-01-02T17:24:00+00:00/task_id=create_table/attempt=1.log [2024-01-02, 17:25:26 UTC] {taskinstance.py:1957} ИНФОРМАЦИЯ: все зависимости соблюдены для dep_context=non-requeueable deps ti= [2024-01-02, 17:25:26 UTC] {taskinstance.py:1957} ИНФОРМАЦИЯ: все зависимости соблюдены для dep_context=requeueable deps ti= [2024-01-02, 17:25:26 UTC] {taskinstance.py:2171} ИНФОРМАЦИЯ — Стартовая попытка 1 из 2 [2024-01-02, 17:25:26 UTC] {taskinstance.py:2192} ИНФОРМАЦИЯ. Выполнение 2024-01-02 17:24:00+00:00 [2024-01-02, 17:25:26 UTC] {standard_task_runner.py:60} ИНФОРМАЦИЯ – запущен процесс 148 для запуска задачи. [2024-01-02, 17:25:26 UTC] {standard_task_runner.py:87} ИНФОРМАЦИЯ - Выполняется: ['***', 'tasks', 'run', 'insert_datetime_dag', 'create_table', 'scheduled__2024 -01-02T17:24:00+00:00', '--job-id', '7', '--raw', '--subdir', 'DAGS_FOLDER/dag.py', '--cfg -путь', '/tmp/tmpkkdtejih'] [2024-01-02, 17:25:26 UTC] {standard_task_runner.py:88} ИНФОРМАЦИЯ – задание 7: подзадача create_table [2024-01-02, 17:25:26 UTC] {task_command.py:423} ИНФОРМАЦИЯ – выполнение на хосте ce682335169d [2024-01-02, 17:25:26 UTC] {taskinstance.py:2481} ИНФОРМАЦИЯ. Экспорт переменных среды: AIRFLOW_CTX_DAG_OWNER='***' AIRFLOW_CTX_DAG_ID='insert_datetime_dag' AIRFLOW_CTX_TASK_ID='create_table' AIRFLOW_CTX_EXECUTION_DATE='2024- 01 -02T17:24:00+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='scheduled__2024-01-02T17:24:00+00:00' [2024-01-02, 17:25:26 UTC] {base.py:83} ИНФОРМАЦИЯ. Использование идентификатора соединения «postgres_default» для выполнения задачи. [2024-01-02, 17:25:26 UTC] {taskinstance.py:2699} ОШИБКА — задача не выполнена с исключением Traceback (последний вызов последний): Файл «/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py», строка 433, в _execute_task результат = выполнение_callable(контекст=контекст, **execute_callable_kwargs) Файл «/home/airflow/.local/lib/python3.8/site-packages/airflow/operators/python.py», строка 199, выполняется. return_value = self.execute_callable() Файл «/home/airflow/.local/lib/python3.8/site-packages/airflow/operators/python.py», строка 216, в Execute_callable return self.python_callable(*self.op_args, **self.op_kwargs) Файл «/opt/airflow/dags/dag.py», строка 16, в create_table. конн = pg_hook.get_conn() Файл «/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/postgres/hooks/postgres.py», строка 158, в get_conn self.conn = psycopg2.connect(**conn_args) Файл "/home/airflow/.local/lib/python3.8/site-packages/psycopg2/__init__.py", строка 122, в соединении conn = _connect(dsn, Connection_factory=connection_factory, **kwasync) psycopg2.OperationalError: соединение с сервером по адресу «localhost» (127.0.0.1), порт 5432 не удалось: соединение отклонено Сервер работает на этом хосте и принимает соединения TCP/IP? подключение к серверу по адресу «localhost» (::1), порт 5432 не удалось: невозможно назначить запрошенный адрес Сервер работает на этом хосте и принимает соединения TCP/IP? [2024-01-02, 17:25:26 UTC] {taskinstance.py:1138} ИНФОРМАЦИЯ — пометка задачи как UP_FOR_RETRY. dag_id=insert_datetime_dag, Task_id=create_table, Execution_date=20240102T172400, start_date=20240102T172526, end_date=20240102T172526 [2024-01-02, 17:25:26 UTC] {standard_task_runner.py:107} ОШИБКА: не удалось выполнить задание 7 для задачи create_table (подключение к серверу по адресу «localhost» (127.0.0.1), порт 5432 не удалось: соединение отказался Сервер работает на этом хосте и принимает соединения TCP/IP? подключение к серверу по адресу «localhost» (::1), порт 5432 не удалось: невозможно назначить запрошенный адрес Сервер работает на этом хосте и принимает соединения TCP/IP? ; 148) [2024-01-02, 17:25:26 UTC] {local_task_job_runner.py:234} ИНФОРМАЦИЯ — Задача завершена с кодом возврата 1 [2024-01-02, 17:25:26 UTC] {taskinstance.py:3281} ИНФОРМАЦИЯ – 0 последующих задач, запланированных на основе последующей проверки расписания
Если я правильно прочитал, похоже, что воздушный поток не видит мой экземпляр postgres. Эту проблему следует решить, предоставив порт 5432 одной из служб воздушного потока.
Я не уверен, какой службе требуется доступ к порту, и я не знаю, как редактировать файл компоновки Docker. Может кто-нибудь, пожалуйста:
[*]Сообщите мне, прав ли я в своей оценке проблемы, и [*]Предложите правильные изменения в моем файле компоновки Docker, чтобы я мог успешно запустить свой даг.
Я пытаюсь построить минимальный конвейер данных, используя Docker, Postgres и Airflow. Мой файл docker-compose.yaml можно найти здесь, он расширен из документации airflow здесь. Я расширил его, включив в него отдельную базу данных Postgres, куда я буду записывать данные, и экземпляр pgadmin (они добавляются внизу).
Я могу подтвердить, что службы работают и доступны, когда я запускаю docker Compose Up -d, и я могу войти в веб-интерфейс Airflow, чтобы просмотреть свои данные. Я создал очень простой инструмент для ежеминутной вставки даты и времени в таблицу. Код дага показан ниже:
из DAG импорта воздушного потока из airflow.operators.python импортировать PythonOperator из даты и времени импорта даты и времени, timedelta импорт psycopg2 из airflow.hooks.postgres_hook импортировать PostgresHook default_args = { 'владелец': 'воздушный поток', «Повторные попытки»: 1, 'retry_delay': timedelta(минут=5), «start_date»: дата-время (2024, 1, 1), } защита create_table(): pg_hook = PostgresHook(postgres_conn_id='postgres_default') конн = pg_hook.get_conn() курсор = conn.cursor() create_query = """ СОЗДАТЬ ТАБЛИЦУ, ЕСЛИ НЕ СУЩЕСТВУЕТ fact_datetime ( дата и время TIMESTAMP ); """ курсор.execute(create_query) конн.коммит() курсор.закрыть() конн.закрыть() защита Insert_datetime(): pg_hook = PostgresHook(postgres_conn_id='postgres_default') конн = pg_hook.get_conn() курсор = conn.cursor() вставить_запрос = """ INSERT INTO fact_datetime (дата и время) ЗНАЧЕНИЯ (СЕЙЧАС()); """ курсор.execute(insert_query) конн.коммит() курсор.закрыть() конн.закрыть() с DAG('insert_datetime_dag', default_args=default_args, описание = 'DAG для вставки текущей даты и времени каждую минуту', Schedule_interval='*/1 * * * *', catchup=False) как даг: create_table_task = PythonOperator( Task_id = 'create_table', python_callable = create_table, ) Insert_datetime_task = PythonOperator( Task_id = 'insert_datetime', python_callable =insert_datetime, ) create_table_task >> Insert_datetime_task Перед запуском этого дага я добавил соединение Postgres в веб-интерфейс Airflow, что должно позволить мне использовать PostgreHook.
Когда я запускаю даг, кажется, что выполнение зависло на задаче create_table со следующими журналами:
ce682335169d *** Найдены локальные файлы: *** * /opt/airflow/logs/dag_id=insert_datetime_dag/run_id=scheduled__2024-01-02T17:24:00+00:00/task_id=create_table/attempt=1.log [2024-01-02, 17:25:26 UTC] {taskinstance.py:1957} ИНФОРМАЦИЯ: все зависимости соблюдены для dep_context=non-requeueable deps ti= [2024-01-02, 17:25:26 UTC] {taskinstance.py:1957} ИНФОРМАЦИЯ: все зависимости соблюдены для dep_context=requeueable deps ti= [2024-01-02, 17:25:26 UTC] {taskinstance.py:2171} ИНФОРМАЦИЯ — Стартовая попытка 1 из 2 [2024-01-02, 17:25:26 UTC] {taskinstance.py:2192} ИНФОРМАЦИЯ. Выполнение 2024-01-02 17:24:00+00:00 [2024-01-02, 17:25:26 UTC] {standard_task_runner.py:60} ИНФОРМАЦИЯ – запущен процесс 148 для запуска задачи. [2024-01-02, 17:25:26 UTC] {standard_task_runner.py:87} ИНФОРМАЦИЯ - Выполняется: ['***', 'tasks', 'run', 'insert_datetime_dag', 'create_table', 'scheduled__2024 -01-02T17:24:00+00:00', '--job-id', '7', '--raw', '--subdir', 'DAGS_FOLDER/dag.py', '--cfg -путь', '/tmp/tmpkkdtejih'] [2024-01-02, 17:25:26 UTC] {standard_task_runner.py:88} ИНФОРМАЦИЯ – задание 7: подзадача create_table [2024-01-02, 17:25:26 UTC] {task_command.py:423} ИНФОРМАЦИЯ – выполнение на хосте ce682335169d [2024-01-02, 17:25:26 UTC] {taskinstance.py:2481} ИНФОРМАЦИЯ. Экспорт переменных среды: AIRFLOW_CTX_DAG_OWNER='***' AIRFLOW_CTX_DAG_ID='insert_datetime_dag' AIRFLOW_CTX_TASK_ID='create_table' AIRFLOW_CTX_EXECUTION_DATE='2024- 01 -02T17:24:00+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='scheduled__2024-01-02T17:24:00+00:00' [2024-01-02, 17:25:26 UTC] {base.py:83} ИНФОРМАЦИЯ. Использование идентификатора соединения «postgres_default» для выполнения задачи. [2024-01-02, 17:25:26 UTC] {taskinstance.py:2699} ОШИБКА — задача не выполнена с исключением Traceback (последний вызов последний): Файл «/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py», строка 433, в _execute_task результат = выполнение_callable(контекст=контекст, **execute_callable_kwargs) Файл «/home/airflow/.local/lib/python3.8/site-packages/airflow/operators/python.py», строка 199, выполняется. return_value = self.execute_callable() Файл «/home/airflow/.local/lib/python3.8/site-packages/airflow/operators/python.py», строка 216, в Execute_callable return self.python_callable(*self.op_args, **self.op_kwargs) Файл «/opt/airflow/dags/dag.py», строка 16, в create_table. конн = pg_hook.get_conn() Файл «/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/postgres/hooks/postgres.py», строка 158, в get_conn self.conn = psycopg2.connect(**conn_args) Файл "/home/airflow/.local/lib/python3.8/site-packages/psycopg2/__init__.py", строка 122, в соединении conn = _connect(dsn, Connection_factory=connection_factory, **kwasync) psycopg2.OperationalError: соединение с сервером по адресу «localhost» (127.0.0.1), порт 5432 не удалось: соединение отклонено Сервер работает на этом хосте и принимает соединения TCP/IP? подключение к серверу по адресу «localhost» (::1), порт 5432 не удалось: невозможно назначить запрошенный адрес Сервер работает на этом хосте и принимает соединения TCP/IP? [2024-01-02, 17:25:26 UTC] {taskinstance.py:1138} ИНФОРМАЦИЯ — пометка задачи как UP_FOR_RETRY. dag_id=insert_datetime_dag, Task_id=create_table, Execution_date=20240102T172400, start_date=20240102T172526, end_date=20240102T172526 [2024-01-02, 17:25:26 UTC] {standard_task_runner.py:107} ОШИБКА: не удалось выполнить задание 7 для задачи create_table (подключение к серверу по адресу «localhost» (127.0.0.1), порт 5432 не удалось: соединение отказался Сервер работает на этом хосте и принимает соединения TCP/IP? подключение к серверу по адресу «localhost» (::1), порт 5432 не удалось: невозможно назначить запрошенный адрес Сервер работает на этом хосте и принимает соединения TCP/IP? ; 148) [2024-01-02, 17:25:26 UTC] {local_task_job_runner.py:234} ИНФОРМАЦИЯ — Задача завершена с кодом возврата 1 [2024-01-02, 17:25:26 UTC] {taskinstance.py:3281} ИНФОРМАЦИЯ – 0 последующих задач, запланированных на основе последующей проверки расписания
Если я правильно прочитал, похоже, что воздушный поток не видит мой экземпляр postgres. Эту проблему следует решить, предоставив порт 5432 одной из служб воздушного потока.
Я не уверен, какой службе требуется доступ к порту, и я не знаю, как редактировать файл компоновки Docker. Может кто-нибудь, пожалуйста:
[*]Сообщите мне, прав ли я в своей оценке проблемы, и [*]Предложите правильные изменения в моем файле компоновки Docker, чтобы я мог успешно запустить свой даг.
-
- Похожие темы
- Ответы
- Просмотры
- Последнее сообщение
-
-
Импорт воздушного потока автоматически создает каталог воздушного потока
Anonymous » » в форуме Python - 0 Ответы
- 52 Просмотры
-
Последнее сообщение Anonymous
-
-
-
Импорт воздушного потока автоматически создает каталог воздушного потока
Anonymous » » в форуме Python - 0 Ответы
- 53 Просмотры
-
Последнее сообщение Anonymous
-
-
-
Этот учебник -> «Запуск воздушного потока в докере» все еще работает? [закрыто]
Anonymous » » в форуме Python - 0 Ответы
- 2 Просмотры
-
Последнее сообщение Anonymous
-