У меня есть эта функция поиска. Асинхронное исполнение SQLALCHEMY. Однако, когда я издевается над запросом базы данных, Exepute (). Scalars (). All () возвращает пустой список вместо ожидаемого [1, 2, 3]. < /P>
import numpy as np
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.sql import text
from src.backend.database.config import AsyncSessionLocal
from src.services.ingestion_service.embedding_generator import EmbeddingGenerator
import asyncio
class RetrievalService:
"""
Handles retrieval of similar documents based on query embeddings.
"""
def __init__(self):
self.embedding_generator = EmbeddingGenerator()
async def retrieve_relevant_docs(self, query: str, top_k: int = 5):
"""
Converts the query into an embedding and retrieves the most similar documents asynchronously.
"""
async with AsyncSessionLocal() as db:
async with db.begin():
#
query_embedding = await self.embedding_generator.generate_embedding(query)
print('query_embedding', query_embedding)
#
query_embedding_str = "[" + ",".join(map(str, query_embedding)) + "]"
#
selected_ids_result = await db.execute(text("SELECT document_id FROM selected_documents;"))
print('selected_ids_result', selected_ids_result)
selected_ids = (await selected_ids_result.scalars()).all()
#
if not selected_ids:
selected_ids = [-1] # Dummy ID to avoid SQL failure
#
search_query = text("""
SELECT document_id FROM embeddings
WHERE document_id = ANY(:selected_ids)
ORDER BY vector CAST(:query_embedding AS vector)
LIMIT :top_k;
""").execution_options(cacheable=False)
results = await db.execute(
search_query,
{
"query_embedding": query_embedding_str, # Pass as string
"top_k": top_k,
"selected_ids": selected_ids,
},
)
# document_ids = (await results.scalars()).all()\
print('debug results', vars(results))
document_ids = list(await results.scalars())
print('document_ids', document_ids)
return document_ids
async def get_document_texts(self, document_ids: list[int]):
"""
Fetches the actual document texts for the given document IDs.
"""
if not document_ids:
return []
async with AsyncSessionLocal() as db:
async with db.begin():
query = text("SELECT content FROM documents WHERE id = ANY(:document_ids);")
results = await db.execute(query, {"document_ids": document_ids})
return (await results.scalars()).all()
< /code>
У меня есть простой файл тестов: < /p>
import pytest
from unittest.mock import patch, AsyncMock
from src.services.retrieval_service.retrieval import RetrievalService
from sqlalchemy.ext.asyncio import AsyncSession
import pytest
from unittest.mock import patch, AsyncMock
from src.services.retrieval_service.retrieval import RetrievalService
from sqlalchemy.ext.asyncio import AsyncSession
@pytest.mark.asyncio
async def test_retrieve_relevant_docs_valid_query():
service = RetrievalService()
query = "What is AI?"
top_k = 3
with patch.object(service.embedding_generator, 'generate_embedding', new_callable=AsyncMock) as mock_generate_embedding, \
patch.object(AsyncSession, 'execute', new_callable=AsyncMock) as mock_execute:
mock_generate_embedding.return_value = [0.1] * 384
# Mock DB query for selected documents
mock_scalars_selected = AsyncMock()
mock_scalars_selected.scalars.return_value.all.return_value = [1, 2, 3]
mock_execute.side_effect = [mock_scalars_selected, mock_scalars_selected]
# Mock the `execute` method
mock_execute.return_value = mock_scalars_selected
# Call the method
document_ids = await service.retrieve_relevant_docs(query, top_k)
# Assertion
assert document_ids == [1, 2, 3], f"Expected [1, 2, 3] but got {document_ids}"
@pytest.mark.asyncio
async def test_retrieve_relevant_docs_valid_query_1():
service = RetrievalService()
query = "What is AI?"
top_k = 3
with patch.object(service.embedding_generator, 'generate_embedding', new_callable=AsyncMock) as mock_generate_embedding, \
patch.object(AsyncSession, 'execute', new_callable=AsyncMock) as mock_execute:
mock_generate_embedding.return_value = [0.1] * 384
# Mock DB query for selected documents
mock_scalars_selected = AsyncMock()
mock_scalars_selected.all = AsyncMock(return_value=[1, 2, 3])
mock_execute.side_effect = [AsyncMock(return_value=AsyncMock(scalars=mock_scalars_selected)), AsyncMock(return_value=AsyncMock(scalars=mock_scalars_selected))]
document_ids = await service.retrieve_relevant_docs(query, top_k)
assert document_ids == [1, 2, 3]
@pytest.mark.asyncio
async def test_retrieve_relevant_docs_no_selected_docs():
service = RetrievalService()
query = "What is AI?"
top_k = 3
with patch.object(service.embedding_generator, 'generate_embedding', new_callable=AsyncMock) as mock_generate_embedding, \
patch.object(AsyncSession, 'execute', new_callable=AsyncMock) as mock_execute:
mock_generate_embedding.return_value = [0.1] * 384
# Mock DB returning no selected docs
mock_scalars_selected = AsyncMock()
mock_scalars_selected.all.return_value = []
mock_execute.return_value = mock_scalars_selected
document_ids = await service.retrieve_relevant_docs(query, top_k)
assert document_ids == []
@pytest.mark.asyncio
async def test_retrieve_relevant_docs_empty_query():
service = RetrievalService()
query = ""
top_k = 3
with patch.object(service.embedding_generator, 'generate_embedding', new_callable=AsyncMock) as mock_generate_embedding, \
patch.object(AsyncSession, 'execute', new_callable=AsyncMock) as mock_execute:
mock_generate_embedding.return_value = [0.1] * 384
# Mock DB returning no documents
mock_scalars_selected = AsyncMock()
mock_scalars_selected.all.return_value = []
mock_execute.return_value = mock_scalars_selected
document_ids = await service.retrieve_relevant_docs(query, top_k)
assert document_ids == []
@pytest.mark.asyncio
async def test_get_document_texts_valid_ids():
service = RetrievalService()
document_ids = [1, 2, 3]
with patch.object(AsyncSession, 'execute', new_callable=AsyncMock) as mock_execute:
# Mock query result
mock_scalars = AsyncMock()
mock_scalars.all.return_value = ["Document 1 text", "Document 2 text", "Document 3 text"]
mock_execute.return_value = mock_scalars
document_texts = await service.get_document_texts(document_ids)
assert document_texts == ["Document 1 text", "Document 2 text", "Document 3 text"]
@pytest.mark.asyncio
async def test_get_document_texts_no_ids():
service = RetrievalService()
document_ids = []
with patch.object(AsyncSession, 'execute', new_callable=AsyncMock) as mock_execute:
document_texts = await service.get_document_texts(document_ids)
assert document_texts == []
< /code>
Я добавил так много информации отладки, но я не понимаю, почему, когда я издеваюсь над ретревальсором, чтобы иметь побочный эффект [1,2,3] и просто приносить это значение после прохождения службы. Я продолжаю получать ошибку, которая показывает, что мой mock_execute.side_effect < /code> вообще не работает. '_mock_parent': none, '_mock_name': none, '_mock_new_name': '()', '_mock_new_parent' :, '_mock_sealed': false, '_spec_class': none '_spec_set': none, '_spec_signature': none '_spec_signature': none '_spec_signature None, '_spec_asyncs': [], '_mock_children': {'scalars' :, ' str ':}, '_mock_wraps': none, '_mock_delegate': none, '_mock_called': false, '_mock_call_args': none, '_mock_called 0, '_mock_call_args_list': [], '_mock_mock_calls': [call. str (), call.scalars (), call.scalars (). All (), 'method_calls': [call.scalars (), '_mock_unsafe': false'sect_mock_mock_mock_mock_mock_mock_mock_mock_mock_mock_mock_mock_mock_mock_mock_mock_mock. '_is_coroutine': , '_mock_await_count': 0, '_mock_await_args': none, '_mock_await_args_list': [], ' cod [] < /p>
< /blockquote>
И эти ошибки: < /p>
Информация об резюме короткого теста ================================================================================== src/tests/unit/test_retrive_docs.py :: test_retrive_relevant_docs_valid_query - assertionerr: ожидается [1, 2, 3], но []
неудачные src/tests/unit/test_retrive_docs.py :: test_retrive_relevant_docrisheriad_query_query_query_qure ASSERT [] == [1, 2, 3]
Fail Src/tests/unit/test_retrive_docs.py :: test_get_document_texts_valid_ids - assertionerr: assert наблюдаемое поведение: < /strong>
results.scalars (). All () Неожиданно возвращает [], хотя я пытался высмеивать его.
Отладка VAR (результаты) показывает _mock_side_effect = none, предполагая, что макет не работает как ожидаемый. [1, 2, 3], соответствие высмеиваемому возвращению. await session.execute (...).. Scalars (). All () вместо того, чтобы обернуть его в list ().
Что такое правильный способ издеваться над асинхронным исполнением Sqlalchemy (session.execute (). Scalars (). Все ()) в тесте Fastapi с использованием asyncmock? к? Я не новичок в Python, но очень новичок в sqlalchemy
Подробнее здесь: https://stackoverflow.com/questions/794 ... in-fastapi