Как динамически создавать таблицы на основе идентификаторов пользователей в SQLAlchemy с помощью AsyncSession (PostgreSQPython

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 Как динамически создавать таблицы на основе идентификаторов пользователей в SQLAlchemy с помощью AsyncSession (PostgreSQ

Сообщение Anonymous »

Я работаю над приложением FastAPI с SQLAlchemy и PostgreSQL, где мне нужно динамически создавать таблицы для каждого пользователя на основе его идентификатора пользователя. Например, если user_id равен 1, имя таблицы будет Contacts_user_1, и в ней должны храниться контакты пользователя.
Я пробовал следующий код:
from sqlalchemy import Table, Column, Integer, String, MetaData
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.future import select

metadata = MetaData()

class UserCreateService:
def create_user_contact_table(self, user_id: int):
"""
Dynamically create a table for storing contacts for a specific user.
"""
# Sanitize table name to avoid SQL injection
table_name = f"contacts_user_{user_id}"
if not table_name.isidentifier():
raise ValueError("Invalid table name generated.")

# Define the dynamic table schema
return Table(
table_name,
metadata,
Column("id", Integer, primary_key=True, autoincrement=True),
Column("name", String, nullable=True),
Column("phone_number", String, nullable=False),
Column("email", String, nullable=True),
)

@auth_router.post("/save_contacts")
async def save_contacts_dynamic(
request: SaveContactsRequest,
db: AsyncSession = Depends(get_db)
):
"""
Save fetched contacts to a dynamically created table based on the user's ID.
"""
response = {
"status": False,
"message": "Something went wrong!",
"data": []
}

try:
# Dynamically create the table for the user
contact_table = UserCreateService().create_user_contact_table(request.user_id)

# Create the table if it does not exist
async with db.bind.begin() as conn: # Ensure db.bind is an AsyncEngine
await conn.run_sync(metadata.create_all, checkfirst=True)

# Prepare contacts for insertion
contacts_to_insert = [
{"name": contact.get("name"), "phone_number": contact.get("phone_number"), "email": contact.get("email")}
for contact in request.contacts
]

# Insert contacts into the dynamic table
insert_stmt = contact_table.insert().values(contacts_to_insert)
await db.execute(insert_stmt)
await db.commit()

response.update({
"status": True,
"message": "Contacts saved successfully!",
"data": []
})

except ProgrammingError as pe:
raise HTTPException(status_code=500, detail=f"Programming Error: {str(pe)}")
except Exception as e:
# Rollback on exception
await db.rollback()
raise HTTPException(status_code=500, detail=f"Failed to save contacts: {str(e)}")

return response


Однако, когда я пытаюсь выполнить этот код, я получаю следующую ошибку:

(sqlalchemy.dialects.postgresql.asyncpg.ProgrammingError) : relation "contacts_user_1" does not exist



Подробнее здесь: https://stackoverflow.com/questions/793 ... asyncsessi
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «Python»