Блокирующий вызов на socket.socket.accept - при использовании langchain_postgres.v2.vectorstoresPython

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 Блокирующий вызов на socket.socket.accept - при использовании langchain_postgres.v2.vectorstores

Сообщение Anonymous »

blockbuster.blockbuster.blockingError: блокировка звонка на socket.socket.accep Pgvectorstore.create () или даже asyncpgvectorstore.create (), но все еще получил ошибку блокировки, как это исправить? < /P>
@contextmanager
def make_pg_retriever(
configuration: BaseConfiguration, embedding_model: Embeddings
) -> Iterator[BaseRetriever]:
connection_string = os.environ.get("PGVECTOR_CONNECTION_STRING",)
engine = PGEngine.from_connection_string(url=connection_string)

try:
vectorstore = PGVectorStore.create_sync(
engine=engine,
embedding_service=embedding_model,
table_name=WEAVIATE_DOCS_INDEX_NAME, # The table must already exist!
metadata_columns=["source", "title", "description", "language"]
# ... other optional parameters
)
search_kwargs = {**configuration.search_kwargs, "return_uuids": True}
retriever = vectorstore.as_retriever(search_kwargs=search_kwargs)
yield retriever

finally:
engine.close()
< /code>
или это с соответствующим Async Call < /p>
@asynccontextmanager # Use asynccontextmanager for async operations
async def make_pg_retriever(
configuration: BaseConfiguration, embedding_model: Embeddings
) -> AsyncIterator[BaseRetriever]:
# Establish an asynchronous connection pool to PostgreSQL
# You'll need to configure your PostgreSQL connection string
# For example, using environment variables
connection_string = os.environ.get(
"PGVECTOR_CONNECTION_STRING",
)
# engine = await asyncio.to_thread(PGEngine.from_connection_string,url= connection_string)
engine = await asyncio.to_thread(lambda: PGEngine.from_connection_string(url=connection_string))

# engine = PGEngine.from_connection_string(
# url=connection_string
# )
vectorstore = await AsyncPGVectorStore.create(
engine=engine,
embedding_service=embedding_model, #why dont use like ingest.py?
table_name=WEAVIATE_DOCS_INDEX_NAME, # The table must already exist!
metadata_columns=["source", "title", "description", "language"]
# ... other optional parameters
)

search_kwargs = {**configuration.search_kwargs}
# AsyncPGVectorStore's as_retriever might not directly support 'return_uuids'
# The returned documents will likely have the primary key or a unique identifier
# as part of their metadata, which you can access.
yield vectorstore.as_retriever(search_kwargs=search_kwargs)
< /code>
Ошибка для метода .create_sync () < /p>
Traceback (most recent call last):
File "mylocation\Downloads\Project\chat-langchain\.venv\Lib\site-packages\langgraph_api\worker.py", line 170, in worker
await asyncio.wait_for(
File "mylocation\AppData\Local\Programs\Python\Python311\Lib\asyncio\tasks.py", line 489, in wait_for
return fut.result()
^^^^^^^^^^^^
File "mylocation\Downloads\Project\chat-langchain\.venv\Lib\site-packages\langgraph_api\stream.py", line 307, in consume
raise e from g
File "mylocation\Downloads\Project\chat-langchain\.venv\Lib\site-packages\langgraph_api\stream.py", line 290, in consume
async for mode, payload in stream:
File "mylocation\Downloads\Project\chat-langchain\.venv\Lib\site-packages\langgraph_api\stream.py", line 225, in astream_state
event = await wait_if_not_done(anext(stream, sentinel), done)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "mylocation\Downloads\Project\chat-langchain\.venv\Lib\site-packages\langgraph_api\asyncio.py", line 89, in wait_if_not_done
raise e.exceptions[0] from None
File "mylocation\Downloads\Project\chat-langchain\.venv\Lib\site-packages\langgraph\pregel\__init__.py", line 2830, in astream
async for _ in runner.atick(
File "mylocation\Downloads\Project\chat-langchain\.venv\Lib\site-packages\langgraph\pregel\runner.py", line 400, in atick
_panic_or_proceed(
File "mylocation\Downloads\Project\chat-langchain\.venv\Lib\site-packages\langgraph\pregel\runner.py", line 507, in _panic_or_proceed
raise exc
File "mylocation\Downloads\Project\chat-langchain\.venv\Lib\site-packages\langgraph\pregel\retry.py", line 136, in arun_with_retry
return await task.proc.ainvoke(task.input, config)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "mylocation\Downloads\Project\chat-langchain\.venv\Lib\site-packages\langgraph\utils\runnable.py", line 672, in ainvoke
input = await asyncio.create_task(
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "mylocation\Downloads\Project\chat-langchain\.venv\Lib\site-packages\langgraph\utils\runnable.py", line 440, in ainvoke
ret = await self.afunc(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "mylocation\Downloads\Project\chat-langchain\./backend/retrieval_graph/graph.py", line 181, in conduct_research
result = await researcher_graph.ainvoke({"question": state.steps[0]})
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "mylocation\Downloads\Project\chat-langchain\.venv\Lib\site-packages\langgraph\pregel\__init__.py", line 2963, in ainvoke
async for chunk in self.astream(
File "mylocation\Downloads\Project\chat-langchain\.venv\Lib\site-packages\langgraph\pregel\__init__.py", line 2830, in astream
async for _ in runner.atick(
File "mylocation\Downloads\Project\chat-langchain\.venv\Lib\site-packages\langgraph\pregel\runner.py", line 400, in atick
_panic_or_proceed(
File "mylocation\Downloads\Project\chat-langchain\.venv\Lib\site-packages\langgraph\pregel\runner.py", line 507, in _panic_or_proceed
raise exc
File "mylocation\Downloads\Project\chat-langchain\.venv\Lib\site-packages\langgraph\pregel\retry.py", line 136, in arun_with_retry
return await task.proc.ainvoke(task.input, config)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "mylocation\Downloads\Project\chat-langchain\.venv\Lib\site-packages\langgraph\utils\runnable.py", line 672, in ainvoke
input = await asyncio.create_task(
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "mylocation\Downloads\Project\chat-langchain\.venv\Lib\site-packages\langgraph\utils\runnable.py", line 440, in ainvoke
ret = await self.afunc(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "mylocation\Downloads\Project\chat-langchain\backend\retrieval_graph\researcher_graph\graph.py", line 70, in retrieve_documents
with retrieval.make_retriever(config) as retriever:
File "mylocation\AppData\Local\Programs\Python\Python311\Lib\contextlib.py", line 137, in __enter__
return next(self.gen)
^^^^^^^^^^^^^^
File "mylocation\Downloads\Project\chat-langchain\backend\retrieval.py", line 60, in make_retriever
with make_pg_retriever(configuration, embedding_model) as retriever:
File "mylocation\AppData\Local\Programs\Python\Python311\Lib\contextlib.py", line 137, in __enter__
return next(self.gen)
^^^^^^^^^^^^^^
File "mylocation\Downloads\Project\chat-langchain\backend\retrieval.py", line 33, in make_pg_retriever
engine = PGEngine.from_connection_string(url=connection_string)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "mylocation\Downloads\Project\chat-langchain\.venv\Lib\site-packages\langchain_postgres\v2\engine.py", line 104, in from_connection_string
cls._default_loop = asyncio.new_event_loop()
^^^^^^^^^^^^^^^^^^^^^^^^
File "mylocation\AppData\Local\Programs\Python\Python311\Lib\asyncio\events.py", line 810, in new_event_loop
return get_event_loop_policy().new_event_loop()
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "mylocation\AppData\Local\Programs\Python\Python311\Lib\asyncio\events.py", line 699, in new_event_loop
return self._loop_factory()
^^^^^^^^^^^^^^^^^^^^
File "mylocation\AppData\Local\Programs\Python\Python311\Lib\asyncio\selector_events.py", line 56, in __init__
self._make_self_pipe()
File "mylocation\AppData\Local\Programs\Python\Python311\Lib\asyncio\selector_events.py", line 107, in _make_self_pipe
self._ssock, self._csock = socket.socketpair()
^^^^^^^^^^^^^^^^^^^
File "mylocation\AppData\Local\Programs\Python\Python311\Lib\socket.py", line 645, in socketpair
ssock, _ = lsock.accept()
^^^^^^^^^^^^^^
File "mylocation\Downloads\Project\chat-langchain\.venv\Lib\site-packages\blockbuster\blockbuster.py", line 109, in wrapper
raise BlockingError(func_name)
blockbuster.blockbuster.BlockingError: Blocking call to socket.socket.accept
< /code>
Ошибка происходит от < /p>
engine = PGEngine.from_connection_string(url=connection_string)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
< /code>
И рекомендуем это, но я попробовал много и не работаю. (кроме опции 3) < /p>
Here are your options to fix this:

1. Best approach: Convert any blocking code to use async/await patterns
For example, use 'await aiohttp.get()' instead of 'requests.get()'

2. Quick fix: Move blocking operations to a separate thread
Example: 'await asyncio.to_thread(your_blocking_function)'

3. Override (if you can't change the code):
- For development: Run 'langgraph dev --allow-blocking'
- For deployment: Set 'BG_JOB_ISOLATED_LOOPS=true' environment variable


Подробнее здесь: https://stackoverflow.com/questions/796 ... v2-vectors
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «Python»