SQLAlchemy: тестирование параллельной вставки при сохранении изоляции тестов с помощью pytestPython

Программы на Python
Ответить
Anonymous
 SQLAlchemy: тестирование параллельной вставки при сохранении изоляции тестов с помощью pytest

Сообщение Anonymous »

У меня есть простой тестовый пример, и я пытаюсь поближе познакомиться с SQLAlchemy и самим PSQL, я много читал по этой теме и пробовал разные решения; но ничего не помогло.
Я использую PSQL в качестве базы данных, и у меня есть таблица «Категория», которая имеет уникальное ограничение для столбца «имя».

Код: Выделить всё

class Category(Base):
__tablename__ = "categories"
allowed_filters: set[str] = {"id", "name"}

id: Mapped[int] = mapped_column(primary_key=True, nullable=False)
created_at: Mapped[datetime] = mapped_column(nullable=False, default=func.now())
updated_at: Mapped[datetime] = mapped_column(
nullable=False, default=func.now(), onupdate=func.now()
)

name: Mapped[str] = mapped_column(String(255), nullable=False, unique=True)
customers: Mapped[List["Customer"]] = relationship(
"Customer",
back_populates="category"
)

def __repr__(self):
return f""

def __eq__(self, other):
return isinstance(other, Category) and self.name == other.name
У меня есть простой сервис, который вставляет данные в контекст транзакции:

Код: Выделить всё

class BaseService:
def __init__(self, session: Optional[Union[Session, scoped_session]] = None):
"""A custom session can be passed or the default db.session will be used"""
self._session = session or db.session

Код: Выделить всё

class CategoryService(BaseService):
def create_category(self, payload: CategoryPayload) -> Category:
self.check_duplicate_category(payload.name)
new_category = Category(**payload.model_dump())

with transaction(self._session):
self._session.add(new_category)
self._session.commit()

return new_category

Код: Выделить всё

@contextmanager
def transaction(session: Session | scoped_session):
session.begin_nested()  # Start a nested transaction (savepoint)

try:
yield  # Everything within this block is part of the transaction
session.commit()  # Commit the changes if no errors occurred
except IntegrityError as e:
session.rollback()
handle_integrity_error(e)
except Exception:
session.rollback()  # Rollback on any other exception
raise
Цель — протестировать функцию create_category, выполнив ее одновременно, и убедиться, что вставляется только одна запись, а другая выдает ошибку UniqueConstraint. В то же время в Pytest я хочу отменить все изменения, внесенные в каждом тесте, чтобы сохранить изоляцию тестов.
Тестировать «create_category» в одном потоке просто, я могу использовать обычный сеанс или область видимости_сессии. и оберните весь тест во внешнюю транзакцию для отката.
что-то вроде этого:

Код: Выделить всё

@pytest.fixture(scope="function")
def db_connection():
connection = _db.engine.connect()
transaction = connection.begin()

yield connection

if transaction.is_active:
transaction.rollback()

connection.close()

@pytest.fixture(scope="function")
def session(db_connection):
"""session"""
session = ScopedSession(bind=db_connection)

session.begin_nested()

@event.listens_for(session, "after_transaction_end")
def restart_savepoint(sess, transaction):
if transaction.nested and not transaction._parent.nested:
sess.begin_nested()

yield session
session.close()
Я изо всех сил пытаюсь охватить свой тестовый сценарий при попытке выполнить create_category в двух отдельных потоках в моей тестовой функции Pytest.

Код: Выделить всё

def test_concurrent_category_creation(app, db_connection):
def create_category_concurrent():
with app.app_context():
connection = _db.engine.connect()

# 1. Passing new connection works but I'm unable to rollback after the test
# 2.  Passing the same db_connection form the fixture SOMETIMES works
# and I can successfully rollback the entire transaction
# (provided I pass in the session fixture into the test function)
# and other times throws a warning (I know connections are not thread safe)
session = ScopedSession(bind=connection) # tried with db_connection

category_service = CategoryService(session)

try:
return category_service.create_category(
CategoryPayload(name="Test Category")
)
except Exception as e:
return None
finally:
session.close()

with ThreadPoolExecutor(max_workers=2) as executor:
futures = [
executor.submit(create_category_concurrent),
executor.submit(create_category_concurrent)
]

# Collect results
results = [future.result() for future in as_completed(futures)]
assert sum(1 for item in results if isinstance(item, Category)) == 1
Предупреждения, которые я получаю из комментария к сценарию 2 выше, следующие:

Код: Выделить всё

app/tests/functional/test_categories.py::test_concurrent_category_creation
app/utils/db_utils.py:37: SAWarning: nested transaction already deassociated from connection
session.rollback()  # Rollback on any other exception

app/tests/functional/test_categories.py::test_concurrent_category_creation
app/utils/db_utils.py:34: SAWarning: transaction already deassociated from connection
session.rollback()
Возможно, я упускаю какую-то идею или, может быть, я неправильно думаю об этом тесте. Я безуспешно пытался везде искать тест такого сценария. Поможет ли pytest-asyncio в этом сценарии? Я еще не особо вникал в это.
Примечание: я заполняю базу данных исходными данными с помощью Factory Boy в каждом сеансе тестирования. Я знаю, что могу удалять и воссоздавать таблицы при каждом тестовом запуске, но желательно просто откатить транзакцию вместо создания/удаления.

Подробнее здесь: https://stackoverflow.com/questions/792 ... -with-pyte
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «Python»