Не могу импортировать мои местные модули Python в воздушном потокеPython

Программы на Python
Ответить
Anonymous
 Не могу импортировать мои местные модули Python в воздушном потоке

Сообщение Anonymous »

Я не могу импортировать свои местные модули Python в воздушном потоке. Он работает на контейнере. Что это может быть? Я пробовал все возможное!

Код: Выделить всё

      
└── aqs_airflow2
├── logs
│   ├── scheduler
│   │   ├── 2025-01-24
│   │   │   ├── native_dags
│   │   │   └── igr.py.log
│   │   ├── 2025-01-23
│   │   │   ├── native_dags
│   │   │   └── igr.py.log
│   │   └── 2025-01-22
│   │       ├── native_dags
│   │       └── igr.py.log
│   └── dag_processor_manager
├── setup.py
├── Dockerfile
├── tasks
│   ├── dagutils
│   │   ├── utils.py
│   │   ├── __init__.py
│   │   ├── redshiftAQS.py
│   │   └── boto3AQS.py
│   └── __init__.py
├── .env
├── docker-compose.yaml
├── dags
│   ├── igr.py
│   └── __pycache__
│       └── igr.cpython-312.pyc
├── etl.egg-info
└── README.md

< /code>
igr.py

Код: Выделить всё

igr.py

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.models import Variable
from datetime import datetime
from tasks.dagutils import boto3AQS as s3, utils, redshiftAQS as rs
from ftplib import FTP
import pandas as pd
import duckdb
import os
import json

def from_PDA_to_bucket():
s3_config = Variable.get("secret", deserialize_json=True, default_var="")
s3.initialize_module(config_json=json.loads(s3_config))

ftp_host = Variable.get("pda_url", default_var="dadosabertos.ans.gov.br")
ftp = FTP(ftp_host)
ftp.login()

ftp_dir = '/FTP/PDA/IGR/IGR_versao_2023/'
filename = 'IGR.csv'
local_path = '/tmp/' + filename  # caminho temporário local no Lambda
cleaned_path = '/tmp/cleaned_' + filename  # caminho temporário para o arquivo limpo
ftp.cwd(ftp_dir)

if not os.path.exists('/tmp'):
os.makedirs('/tmp')

with open(local_path, 'wb') as local_file:
ftp.retrbinary('RETR ' + filename, local_file.write)
ftp.quit()

utils.clean_csv(local_path, cleaned_path)

df = pd.read_csv(cleaned_path, delimiter=";", dtype=str)

path = Variable.get("path", default_var="raw/") + "igr/"
Variable.set(key="private_key", value=path + cleaned_path.split("/")[2].replace("csv", "parquet"))

bucket = Variable.get("api_key", default_var="")
private_key = Variable.get("private_key", default_var="")
s3.write_file(df=df, bucket=bucket, key=private_key)

def from_bucket_to_DB():
s3_config = Variable.get("secret", deserialize_json=True, default_var="")
s3_config_accesible = json.loads(s3_config)

aws_access_key_id = s3_config_accesible["AWS_ACCESS_KEY_ID"]
aws_secret_access_key = s3_config_accesible["AWS_SECRET_ACCESS_KEY"]
temporarily_s3_path = f"'s3://{Variable.get("api_key", default_var="")}/{Variable.get("private_key", default_var="raw/")}'"

# conn = duckdb.connect("igr.db")

conn = duckdb.connect(database="igr.db")
conn.execute("PRAGMA threads=16;")  # Usa 16 núcleos
conn.execute("PRAGMA memory_limit='12GB';")  # Usa mais memória

conn.sql(f"""
CREATE SECRET secret1 (
TYPE S3,
KEY_ID '{aws_access_key_id}',
SECRET '{aws_secret_access_key}',
REGION 'us-east-1'
);
""")

conn.sql(f"""
CREATE TABLE igr AS
SELECT *
FROM read_parquet({temporarily_s3_path})
""")
# conn.sql("EXPORT DATABASE 'igr.duckdb';")

conn.close()

# config_schema = Variable.get("secret", default_var="", deserialize_json=True)
# rs_config = json.loads(config_schema)
# rs.initialize_module(rs_config)
# conn = rs.get_connection_rs()

# key = Variable.get("private_key", default_var="")
# rs.create_copy_to_db(path=key, conn=conn, schema=rs_config['REDSHIFT_SCHEMA'], table='igr_aa', bucket=Variable.get("api_key", default_var=""))
# Variable.set(key="private_key", value='')

# Definição da DAG
with DAG(
dag_id="igr",
default_args={
"owner": "airflow",
"retries": 1,
},
description="IGR simples carga total",
schedule_interval="0 0 1 8 0",  # Execução manual
start_date=datetime(2025, 1, 1),
catchup=False,
) as dag:
# Tarefa para extrair dados
extract_task = PythonOperator(
task_id="from_PDA_to_bucket",
python_callable=from_PDA_to_bucket,
)

# Tarefa para enviar para o banco
load_task = PythonOperator(
task_id="from_bucket_to_DB",
python_callable=from_bucket_to_DB
)

# Dependência entre as tarefas
extract_task >> load_task

< /code>
Verifying in container:
        [ec2-user@ip-172-31-22-184 aqs_airflow2]$ docker exec -it 257787d4c0b6 /bin/bash
default@257787d4c0b6:/opt/airflow$ python -c "import tasks; print(tasks)"


< /code>
setup.py
        from setuptools import find_packages, setup

setup(
name = "etl",
version = "1.0",
packages = find_packages(),
)
< /code>
.env
        AIRFLOW_UID=1000
AIRFLOW_GID=0
< /code>
Dockerfile
        FROM apache/airflow:2.10.4
COPY .  /opt/airflow/etl
WORKDIR /opt/airflow/etl

ENV PYTHONPATH="${PYTHONPATH}:/opt/airflow"
RUN if [ -f "setup.py" ]; then pip install -e .; else echo "setup.py not found" && exit 1;  fi

< /code>
A module I trying to import on my DAG:
        [ec2-user@ip-172-31-22-184 aqs_airflow2]$ cat tasks/dagutils/utils.py
import csv

def clean_csv(input_path, output_path):
with open(input_path, 'r', newline='', encoding='utf-8') as original_file, \
open(output_path, 'w', newline='', encoding='utf-8') as cleaned_file:
reader = csv.reader(original_file, delimiter=';', quotechar='"')
writer = csv.writer(cleaned_file, delimiter=';')

for row in reader:
cleaned_row = []
for field in row:
# Handle semicolons inside columns by keeping the quotes
if '"' in field:
field = field.replace('";"', '";"')
cleaned_row.append(field)
writer.writerow(cleaned_row)

< /code>
docker-compose.yaml
        x-airflow-common:
&airflow-common
image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.10.4}
environment:
&airflow-common-env
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
AIRFLOW__CORE__FERNET_KEY: ''
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
AIRFLOW__CORE__LOAD_EXAMPLES: 'true'
AIRFLOW__API__AUTH_BACKENDS: 'airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session'
AIRFLOW__SCHEDULER__ENABLE_HEALTH_CHECK: 'true'
_PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-}
volumes:
- ${AIRFLOW_PROJ_DIR:-.}/dags:/opt/airflow/dags
- ${AIRFLOW_PROJ_DIR:-.}/logs:/opt/airflow/logs
- ${AIRFLOW_PROJ_DIR:-.}/config:/opt/airflow/config
- ${AIRFLOW_PROJ_DIR:-.}/plugins:/opt/airflow/plugins
- ${AIRFLOW_PROJ_DIR:-.}/tasks:/opt/airflow/tasks
user: "${AIRFLOW_UID:-50000}:0"
depends_on:
&airflow-common-depends-on
redis:
condition: service_healthy
postgres:
condition: service_healthy

services:
postgres:
image: postgres:13
environment:
POSTGRES_USER: airflow
POSTGRES_PASSWORD: airflow
POSTGRES_DB: airflow
volumes:
- postgres-db-volume:/var/lib/postgresql/data
healthcheck:
test: ["CMD", "pg_isready", "-U", "airflow"]
interval: 10s
retries: 5
start_period: 5s
restart: always

redis:
image: redis:7.2-bookworm
expose:
- 6379
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 10s
timeout: 30s
retries: 50
start_period: 30s
restart: always

airflow-webserver:


Подробнее здесь: [url]https://stackoverflow.com/questions/79384685/cant-import-my-local-python-modules-on-airflow[/url]
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «Python»