SQLALCHEMY, FASTAPI, DISHKA, AIOKAFKA. Невозможно использовать Connection.Transaction () в начале вручную транзакциюPython

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 SQLALCHEMY, FASTAPI, DISHKA, AIOKAFKA. Невозможно использовать Connection.Transaction () в начале вручную транзакцию

Сообщение Anonymous »

Код используется во многих проектах, и все работает. Но в одной службе иногда возникает ошибка в Prod: < /p>

"enterfaceError: (sqlalchemy.dialects.postgresql.asyncpg.interfaceerr) tags.id, tags.name, tags.color, tags.channel_id, tags.creator_id, tags.tag_type_id, tags.created_at, tags.updated_at \ nfrom Tags \ nothere Tags.channel_id = $ 1 :: order by-fourceed_at desc] \ n [parameters: (-1001943614141414141414141414141414141414141414141414141414141414141414141414141414141414. Эта ошибка в: https://sqlalche.me/e/20/rvf5)"

Проблема в том, что я не могу понять, что сталкивается на Prod, что приводит к такой ошибке, и поэтому я не знаю, как решить проблему, потому что все в порядке в Сервисе и в этой службе. /> вот код контейнера IOC: < /strong> < /p>
from typing import AsyncIterable

from aiokafka import AIOKafkaConsumer
from dishka import Provider, Scope, provide
from sqlalchemy.ext.asyncio import AsyncEngine
from sqlalchemy.orm import sessionmaker

from config import Config, get_config
from infrastructure.clients.api.invite_links import HelperBotCli
from infrastructure.db.sqlalchemy.setup import create_engine, create_session_pool
from infrastructure.message_brokers.kafka import KafkaMessageBroker
from infrastructure.uow.sqlalchemy import SQLAlchemyUnitOfWork

class BaseProvider(Provider):
@provide(scope=Scope.APP)
def config(self) -> Config:
return get_config()

@provide(scope=Scope.APP)
def sqlalchemy_engine(self, config: Config) -> AsyncEngine:
return create_engine(config.pg_config)

@provide(scope=Scope.APP)
def session_pool(self, sqlalchemy_engine: AsyncEngine) -> sessionmaker:
return create_session_pool(sqlalchemy_engine)

@provide(scope=Scope.REQUEST)
def sqlalchemy_uow(self, session_factory: sessionmaker) -> SQLAlchemyUnitOfWork:
return SQLAlchemyUnitOfWork(_session=session_factory())

@provide(scope=Scope.REQUEST)
def helper_bot_client(self, config: Config) -> HelperBotCli:
return HelperBotCli(
base_url=config.helper_bot.HELPER_BOT_BASE_URL,
create_url=config.helper_bot.ADD_LINK_ENDPOINT,
delete_url=config.helper_bot.DELETE_LINK_ENDPOINT,
)

@provide(scope=Scope.APP)
async def kafka_broker(self, config: Config) -> AsyncIterable[KafkaMessageBroker]:
consumer = AIOKafkaConsumer(
config.rp_config.CONSUME_TOPIC,
config.rp_config.CONSUME_MERGE_ACCOUNTS_TOPIC,
bootstrap_servers=config.rp_config.RP_SERVER,
group_id=config.rp_config.CONSUME_GROUP,
enable_auto_commit=False,
auto_offset_reset="earliest",
sasl_mechanism=config.rp_config.SASL_MECHANISM,
security_protocol=config.rp_config.SECURITY_PROTOCOL,
sasl_plain_username=config.rp_config.RP_USER,
sasl_plain_password=config.rp_config.RP_PASS,
max_poll_records=config.rp_config.BATCH_SIZE,
)
broker = KafkaMessageBroker(consumer)
await broker.start()
yield broker
await broker.close()

setup.py

from typing import Callable, AsyncContextManager

from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, AsyncEngine
from sqlalchemy.orm import sessionmaker

from config import PGConfig
from infrastructure.db.sqlalchemy.models import BaseModel

def create_engine(db: PGConfig, echo: bool = False) -> AsyncEngine:
engine = create_async_engine(
db.pg_database_url,
echo=echo
)
return engine

async def create_tables(engine: AsyncEngine) -> None:
async with engine.begin() as conn:
await conn.run_sync(BaseModel.metadata.create_all)

def create_session_pool(engine: AsyncEngine) -> Callable[[], AsyncContextManager[AsyncSession]]:
session_pool = sessionmaker(bind=engine, expire_on_commit=False, class_=AsyncSession)
return session_pool

Вот код uow (в нем я попытался решить эту проблему, удалив await self line._session.begin (), но это не помогло):
from dataclasses import dataclass
from types import TracebackType
from typing import Self

from sqlalchemy.ext.asyncio import AsyncSession

from infrastructure.repositories.invite_links.sqlalchemy import SQLAlchemyInviteLinksRepository
from infrastructure.repositories.tags.sqlalchemy import SQLAlchemyTagsRepository, SQLAlchemyTagsTypesRepository
from infrastructure.uow.base import BaseUnitOfWork

@dataclass
class SQLAlchemyUnitOfWork(BaseUnitOfWork):
_session: AsyncSession = None

async def __aenter__(self) -> Self:
# await self._session.begin()
return self

async def __aexit__(
self,
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None
) -> None:
try:
if exc_type is None:
await self._session.commit()
else:
await self._session.rollback()
finally:
await self._session.close()

@property
def invite_links(self) -> SQLAlchemyInviteLinksRepository:
self._session_check()
return SQLAlchemyInviteLinksRepository(self._session)

@property
def tags(self) -> SQLAlchemyTagsRepository:
self._session_check()
return SQLAlchemyTagsRepository(self._session)

@property
def tags_types(self) -> SQLAlchemyTagsTypesRepository:
self._session_check()
return SQLAlchemyTagsTypesRepository(self._session)

def _session_check(self) -> None:
if not self._session:
raise ValueError("Session is not initialized")

lifespan.py:
from contextlib import asynccontextmanager

import aiojobs
from fastapi import FastAPI
from sqlalchemy.orm import sessionmaker

from app.api.background import consume_in_background
from config import Config
from infrastructure.message_brokers.kafka import KafkaMessageBroker

@asynccontextmanager
async def lifespan(app: FastAPI) -> FastAPI:
container = app.state.dishka_container
scheduler = aiojobs.Scheduler()
broker = await container.get(KafkaMessageBroker)
config = await container.get(Config)
if config.app.is_dev or config.app.is_production:
session_factory = await container.get(sessionmaker)
job = await scheduler.spawn(consume_in_background(broker, session_factory, config))
yield
if config.app.is_dev or config.app.is_production:
await job.close()
await scheduler.close()
await container.close()

background.py
import logging

from sqlalchemy.orm import sessionmaker

from config import Config
from domain.entities.invite_links import InviteLink
from infrastructure.message_brokers.base import BaseMessageBroker
from infrastructure.repositories.filters.invite_links import InviteLinksFilter
from infrastructure.uow.sqlalchemy import SQLAlchemyUnitOfWork

async def consume_in_background(
broker: BaseMessageBroker,
session_factory: sessionmaker,
config: Config,
) -> None:
logger = logging.getLogger("LinkConsumer")
async for message_dict in broker.start_consuming():
logger.info(f"MSG: {message_dict}")
message = message_dict["value"]
topic = message_dict["topic"]
link = message.get("invite_link")
channel_id = message.get("channel_id")
try:
if topic == config.rp_config.CONSUME_TOPIC:
if not link:
continue
link = InviteLink(
channel_id=channel_id,
**link
)
uow = SQLAlchemyUnitOfWork(_session=session_factory())
async with uow as unit_of_work:
if not await unit_of_work.invite_links.is_exists(link.id, link.channel_id):
await unit_of_work.invite_links.create_link(link)
elif topic == config.rp_config.CONSUME_MERGE_ACCOUNTS_TOPIC:
uow = SQLAlchemyUnitOfWork(_session=session_factory())
async with uow as unit_of_work:
links = await unit_of_work.invite_links.get_all_links(
links_filter=InviteLinksFilter(
page_size=-1,
creator_id=message["old_account_id"],
)
)
links = links[0]
for link in links:
link.creator_id = message["new_account_id"]
await unit_of_work.invite_links.update_link(link)
await broker.commit()
except Exception as e:
logger.error(f"Error while consuming message: {e}", exc_info=True)
# break
< /code>
И вот место в коде, которое кажется мне подозрительным: < /p>
@dataclass
class ReplaceTagsUseCase(BaseUseCase):
uow: BaseUnitOfWork

async def execute(self, command: ReplacingTagsCommand) -> InviteLink:
LIMIT = 300
if len(command.links) > LIMIT:
raise InviteLinkTagsReplaceException(limit=LIMIT)

async with self.uow as uow:
# Создание карты ссылок и списка ID тегов для поиска
links_map = {link.id: link for link in command.links}
links_ids = list(links_map.keys())
tag_ids = {tag_id for link in command.links for tag_id in link.tags}

# Извлечение всех нужных ссылок и тегов одним запросом
links, _ = await uow.invite_links.get_all_links(
InviteLinksFilter(
links_ids=links_ids,
channel_id=command.channel_id,
page_size=LIMIT
)
)
if not links:
return []

all_tags, _ = await uow.tags.get_all_tags(
TagsFilter(
channel_id=command.channel_id,
tags_ids=tag_ids
)
)

# Создание карты тегов для быстрого доступа
tag_map = {tag.id: tag for tag in all_tags}

tasks = []
for link in links:
tags = [tag_map[tag_id] for tag_id in links_map[link.id].tags if tag_id in tag_map]
link.replace_tags(tags)
tasks.append(asyncio.create_task(uow.invite_links.update_link(link)))

await asyncio.gather(*tasks)

return links


Подробнее здесь: https://stackoverflow.com/questions/797 ... ction-in-a
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «Python»