Я хотел использовать синхронизированную потоковую передачу в langchain и интегрировать ее с FastAPI. Однако при запуске request-sender.py я получаю следующую ошибку:
import requests
url = "http://localhost:8000"
data = {'prompt':'How did dinosaurs get extinct from earth?'}
response = requests.post(url, json=data, stream=True)
for chunk in response.iter_content(chunk_size=1024):
if chunk:
print(chunk.decode('utf-8'), end='',flush=True)
PS D:\fastapi-learning> python request-sender.py
Traceback (most recent call last):
File "C:\Users\Director Academics\AppData\Local\Programs\Python\Python312\Lib\site-packages\requests\models.py", line 816, in generate
yield from self.raw.stream(chunk_size, decode_content=True)
File "C:\Users\Director Academics\AppData\Local\Programs\Python\Python312\Lib\site-packages\urllib3\response.py", line 1040, in stream
yield from self.read_chunked(amt, decode_content=decode_content)
File "C:\Users\Director Academics\AppData\Local\Programs\Python\Python312\Lib\site-packages\urllib3\response.py", line 1184, in read_chunked
self._update_chunk_length()
File "C:\Users\Director Academics\AppData\Local\Programs\Python\Python312\Lib\site-packages\urllib3\response.py", line 1119, in _update_chunk_length
raise ProtocolError("Response ended prematurely") from None
urllib3.exceptions.ProtocolError: Response ended prematurely
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "D:\fastapi-learning\request-sender.py", line 8, in
for chunk in response.iter_content(chunk_size=1024):
File "C:\Users\Director Academics\AppData\Local\Programs\Python\Python312\Lib\site-packages\requests\models.py", line 818, in generate
raise ChunkedEncodingError(e)
requests.exceptions.ChunkedEncodingError: Response ended prematurely
from fastapi import FastAPI
from pydantic import BaseModel
from langchain_groq import ChatGroq
from langchain_core.prompts import ChatPromptTemplate
from fastapi.responses import StreamingResponse
model = ChatGroq(model_name='llama3-70b-8192', api_key='MY_API_KEY_HERE')
prompt = ChatPromptTemplate.from_messages([
("system", "You are a helpful AI assistant!"),
("human", "{input}")
])
chain = prompt | model
app = FastAPI()
class Prompt(BaseModel):
prompt: str = None
def generate_responses(input_text: str):
for event in chain.stream_events(input_text, version="v1"):
if event['event'] == 'on_chain_stream':
yield event['data']['chunk'].content
@app.post("/")
def respond(prompt: Prompt):
input_text = prompt.prompt
response_stream = generate_responses(input_text)
return StreamingResponse(response_stream, media_type="text/plain")
Я попробовал определить функции генерации_репонсов() и реагирования() как асинхронные и использовал astream_events() в сгенерированном_ответах(), и это сработало нормально, но я хотел знать, почему вышеперечисленное не работает код работает. На самом деле, было бы здорово, если бы кто-нибудь мог уточнить, похожи ли выходные и await или где использовать async def и просто def. Меня немного смущает концепция асинхронных функций, поскольку я новичок в создании API и создаю его впервые.
Я хотел использовать синхронизированную потоковую передачу в langchain и интегрировать ее с FastAPI. Однако при запуске request-sender.py я получаю следующую ошибку: [code]import requests
url = "http://localhost:8000" data = {'prompt':'How did dinosaurs get extinct from earth?'}
for chunk in response.iter_content(chunk_size=1024): if chunk: print(chunk.decode('utf-8'), end='',flush=True) [/code] [code]PS D:\fastapi-learning> python request-sender.py Traceback (most recent call last): File "C:\Users\Director Academics\AppData\Local\Programs\Python\Python312\Lib\site-packages\requests\models.py", line 816, in generate yield from self.raw.stream(chunk_size, decode_content=True) File "C:\Users\Director Academics\AppData\Local\Programs\Python\Python312\Lib\site-packages\urllib3\response.py", line 1040, in stream yield from self.read_chunked(amt, decode_content=decode_content) File "C:\Users\Director Academics\AppData\Local\Programs\Python\Python312\Lib\site-packages\urllib3\response.py", line 1184, in read_chunked self._update_chunk_length() File "C:\Users\Director Academics\AppData\Local\Programs\Python\Python312\Lib\site-packages\urllib3\response.py", line 1119, in _update_chunk_length raise ProtocolError("Response ended prematurely") from None urllib3.exceptions.ProtocolError: Response ended prematurely
During handling of the above exception, another exception occurred:
Traceback (most recent call last): File "D:\fastapi-learning\request-sender.py", line 8, in for chunk in response.iter_content(chunk_size=1024): File "C:\Users\Director Academics\AppData\Local\Programs\Python\Python312\Lib\site-packages\requests\models.py", line 818, in generate raise ChunkedEncodingError(e) requests.exceptions.ChunkedEncodingError: Response ended prematurely [/code] Мой FastAPI main.py: [code]from fastapi import FastAPI from pydantic import BaseModel from langchain_groq import ChatGroq from langchain_core.prompts import ChatPromptTemplate from fastapi.responses import StreamingResponse
model = ChatGroq(model_name='llama3-70b-8192', api_key='MY_API_KEY_HERE')
prompt = ChatPromptTemplate.from_messages([ ("system", "You are a helpful AI assistant!"), ("human", "{input}") ])
chain = prompt | model
app = FastAPI()
class Prompt(BaseModel): prompt: str = None
def generate_responses(input_text: str): for event in chain.stream_events(input_text, version="v1"): if event['event'] == 'on_chain_stream': yield event['data']['chunk'].content
[/code] Я попробовал определить функции генерации_репонсов() и реагирования() как асинхронные и использовал astream_events() в сгенерированном_ответах(), и это сработало нормально, но я хотел знать, почему вышеперечисленное не работает код работает. На самом деле, было бы здорово, если бы кто-нибудь мог уточнить, похожи ли выходные и await или где использовать async def и просто def. Меня немного смущает концепция асинхронных функций, поскольку я новичок в создании API и создаю его впервые.
У меня есть конечная точка, которая возвращает Fastapi StreamingResponse:
async def stream_results():
...
async for _output in result_generator:
...
yield (json.dumps({'Field': value}, ensure_ascii=False)) + '\n'
Я делаю сервер Fastapi/Starlette, который запрашивает другой сервер (ну, S3) большие файлы. Эти куски подают объект tarfile.tarfile для создания потока .tar.gz . Этот поток должен быть отправлен на лету на StreamingResponse .
S3 server --files...
Я делаю сервер Fastapi/Starlette, который запрашивает другой сервер (ну, S3) большие файлы. Эти куски подают объект tarfile.tarfile для создания потока .tar.gz . Этот поток должен быть отправлен на лету на StreamingResponse .
S3 server --files...
Я хотел бы вернуть поток акций в качестве ответа от сервера с помощью Spring (по сути, используя возможности потоковой передачи HTTP2).
@GetMapping( /stock_stream )
List all() {
while(true){
Stock stock_price = new Stock('APPL', 100.00);
// return...