Ошибка загрузки данных: «Объект Engine» не имеет атрибута «курсор»: chan = "stdout": source = "task"Apache

Ответить
Anonymous
 Ошибка загрузки данных: «Объект Engine» не имеет атрибута «курсор»: chan = "stdout": source = "task"

Сообщение Anonymous »

Я пытаюсь запустить пакетный процесс с помощью воздушного потока Apache. Стадии извлечения и преобразования работают очень хорошо, но этапы загрузки дают ошибку. Вот мой код: < /p>

Код: Выделить всё

from airflow.decorators import dag, task
from airflow.hooks.base import BaseHook
from airflow.utils.log.logging_mixin import LoggingMixin
from sqlalchemy import create_engine
import pandas as pd
from sqlalchemy_utils import create_database, database_exists
import requests
from bs4 import BeautifulSoup
from datetime import datetime
import time
from datetime import timedelta
import psycopg2

@task
def load(df):
try:
connection_details = BaseHook.get_connection("my_postgres_conn")
except Exception as e:
print(f"❌ Could not retrieve Airflow connection: {e}")
raise

uid = connection_details.login
pwd = connection_details.password
host = connection_details.host
port = connection_details.port
schema = connection_details.schema

postgres_url = f"postgresql+psycopg2://{uid}:{pwd}@{host}:{port}/{schema}"

# Create the database if it doesn't exist
if not database_exists(postgres_url):
create_database(postgres_url)

engine = create_engine(postgres_url)

try:
# Use the engine object directly with to_sql
# This syntax is fully compatible with SQLAlchemy 1.4.54
df.to_sql("Bank_Cap", con=engine, if_exists="replace", index=False)
print("✅ Data loaded successfully into PostgreSQL")
except Exception as e:
print(f"❌ Error loading data: {e}")
raise

# Save a CSV as backup
df.to_csv("my_largest_bank.csv", index=False)

print(f"Postgres URL: {postgres_url}")
print(f"DataFrame to load:\n{df}")
log.info(f"Postgres URL: {postgres_url}")
log.info(f"DataFrame to load: {df}")
Я получаю следующую ошибку:
[2025-08-17, 02:06:00] Предупреждение-Использование Connection.get_connection_from_secret filename = "/home/eziuche/.local/share/virtualenvs/airflow-project-kvhv5bz8/lib/python3.11/site-packages/airflow/model 'my_postgres_conn': source = "airflow.hooks.base"
[2025-08-17, 02:06:00] Предупреждение-только Pandas поддерживает соединение SQLALCHEMY (двигатель /соединение) или строку базы данных или SQLITE3 DBAPI2. Другие объекты DBAPI2 не тестируются. Пожалуйста, рассмотрите возможность использования SQLALCHEMY.: Category = "userWarnning":
[2025-08-17, 02:06:00] Информация-❌ Ошибка Загрузка данных: 'Engine' объект не имеет атрибута 'cursor': chan = "stdout": aspring = "задача"
[2025-08-17, 02:06:00] с ошибкой "askse ascall"
[2025-08-17, 02:06:00] с исключительными "=« Задача ». /> AttributeRror: «Объект двигателя» не имеет атрибута 'cursor' < /p>
Я понизил свой Sqlalchemy и Pandas. В настоящее время я использую то, что может быть неправильно. Я использую apache-airflow-core == 3.0.4, sqlalchemy == 1.4.54,
sqlalchemy-jsonfield==1.0.2, sqlalchemy-utils == 0,41.2 и
pandas==1.5.3. Что я ошибаюсь,

Подробнее здесь: https://stackoverflow.com/questions/797 ... -stdout-so
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «Apache»