Я использую PSQL в качестве базы данных, и у меня есть таблица «Категория», которая имеет уникальное ограничение для столбца «имя».
Код: Выделить всё
class Category(Base):
__tablename__ = "categories"
allowed_filters: set[str] = {"id", "name"}
id: Mapped[int] = mapped_column(primary_key=True, nullable=False)
created_at: Mapped[datetime] = mapped_column(nullable=False, default=func.now())
updated_at: Mapped[datetime] = mapped_column(
nullable=False, default=func.now(), onupdate=func.now()
)
name: Mapped[str] = mapped_column(String(255), nullable=False, unique=True)
customers: Mapped[List["Customer"]] = relationship(
"Customer",
back_populates="category"
)
def __repr__(self):
return f""
def __eq__(self, other):
return isinstance(other, Category) and self.name == other.name
Код: Выделить всё
class BaseService:
def __init__(self, session: Optional[Union[Session, scoped_session]] = None):
"""A custom session can be passed or the default db.session will be used"""
self._session = session or db.session
Код: Выделить всё
class CategoryService(BaseService):
def create_category(self, payload: CategoryPayload) -> Category:
self.check_duplicate_category(payload.name)
new_category = Category(**payload.model_dump())
with transaction(self._session):
self._session.add(new_category)
self._session.commit()
return new_category
Код: Выделить всё
@contextmanager
def transaction(session: Session | scoped_session):
session.begin_nested() # Start a nested transaction (savepoint)
try:
yield # Everything within this block is part of the transaction
session.commit() # Commit the changes if no errors occurred
except IntegrityError as e:
session.rollback()
handle_integrity_error(e)
except Exception:
session.rollback() # Rollback on any other exception
raise
Тестировать «create_category» в одном потоке просто, я могу использовать обычный сеанс или область видимости_сессии. и оберните весь тест во внешнюю транзакцию для отката.
что-то вроде этого:
Код: Выделить всё
@pytest.fixture(scope="function")
def db_connection():
connection = _db.engine.connect()
transaction = connection.begin()
yield connection
if transaction.is_active:
transaction.rollback()
connection.close()
@pytest.fixture(scope="function")
def session(db_connection):
"""session"""
session = ScopedSession(bind=db_connection)
session.begin_nested()
@event.listens_for(session, "after_transaction_end")
def restart_savepoint(sess, transaction):
if transaction.nested and not transaction._parent.nested:
sess.begin_nested()
yield session
session.close()
Код: Выделить всё
def test_concurrent_category_creation(app, db_connection):
def create_category_concurrent():
with app.app_context():
connection = _db.engine.connect()
# 1. Passing new connection works but I'm unable to rollback after the test
# 2. Passing the same db_connection form the fixture SOMETIMES works
# and I can successfully rollback the entire transaction
# (provided I pass in the session fixture into the test function)
# and other times throws a warning (I know connections are not thread safe)
session = ScopedSession(bind=connection) # tried with db_connection
category_service = CategoryService(session)
try:
return category_service.create_category(
CategoryPayload(name="Test Category")
)
except Exception as e:
return None
finally:
session.close()
with ThreadPoolExecutor(max_workers=2) as executor:
futures = [
executor.submit(create_category_concurrent),
executor.submit(create_category_concurrent)
]
# Collect results
results = [future.result() for future in as_completed(futures)]
assert sum(1 for item in results if isinstance(item, Category)) == 1
Код: Выделить всё
app/tests/functional/test_categories.py::test_concurrent_category_creation
app/utils/db_utils.py:37: SAWarning: nested transaction already deassociated from connection
session.rollback() # Rollback on any other exception
app/tests/functional/test_categories.py::test_concurrent_category_creation
app/utils/db_utils.py:34: SAWarning: transaction already deassociated from connection
session.rollback()
Примечание: я заполняю базу данных исходными данными с помощью Factory Boy в каждом сеансе тестирования. Я знаю, что могу удалять и воссоздавать таблицы при каждом тестовом запуске, но желательно просто откатить транзакцию вместо создания/удаления.
Подробнее здесь: https://stackoverflow.com/questions/792 ... -with-pyte
Мобильная версия