Код: Выделить всё
S3 server --files chunks--> My server --tar.gz chunks--> User
она записывает кусочки в файловых объектах (с помощью write method),
Это нелегко быть взаимосвязанным с stralette. /> Моя идея состояла в том, чтобы переписать метод StreamingResponse.stream_response < /code> (CF здесь). Что-то вроде этого: < /p>Код: Выделить всё
"""A script to test the tar.gz streaming."""
import os
import tarfile
from pathlib import Path
from typing import Mapping
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from fastapi.testclient import TestClient
from starlette.background import BackgroundTask
from starlette.types import Send
TAR_FILE_PATH = Path("archive.tar.gz")
CHUNK_SIZE = 1024
app = FastAPI()
class FileStreamingResponse(StreamingResponse):
def __init__(
self,
files_to_tar: list[Path],
status_code: int = 200,
headers: Mapping[str, str] | None = None,
media_type: str | None = None,
background: BackgroundTask | None = None,
) -> None:
self.files_to_tar = files_to_tar
self.status_code = status_code
self.media_type = self.media_type if media_type is None else media_type
self.background = background
self.init_headers(headers)
async def stream_response(self, send: Send) -> None:
await send(
{
"type": "http.response.start",
"status": self.status_code,
"headers": self.raw_headers,
}
)
class DumpWriter:
async def write(buffer):
print(f"Really sending {len(buffer)} bytes")
await send(
{"type": "http.response.body", "body": buffer, "more_body": True}
)
async with tarfile.open(
mode="w|gz", fileobj=DumpWriter(), bufsize=CHUNK_SIZE
) as file:
for input_file in self.files_to_tar:
await file.add(input_file.open("rb"))
await send({"type": "http.response.body", "body": b"", "more_body": False})
FILES_TO_TAR = Path("src").iterdir()
@app.get("/")
def send_tar() -> StreamingResponse:
"""Send a tar file."""
return FileStreamingResponse(
files_to_tar=FILES_TO_TAR,
media_type="application/tar+gzip",
headers={"Content-Disposition": f'attachment; filename="{TAR_FILE_PATH.name}"'},
)
#
# TESTS
#
TEST_TAR_FILE_PATH = Path("archive.tar.gz")
client = TestClient(app)
def test_main():
if TEST_TAR_FILE_PATH.exists():
os.remove(TEST_TAR_FILE_PATH)
response = client.get("/")
response.raise_for_status()
with TEST_TAR_FILE_PATH.open("wb") as file:
for chunk in response.iter_bytes(CHUNK_SIZE):
file.write(chunk)
with tarfile.open(TEST_TAR_FILE_PATH, "r:gz") as tar_file:
files = [tarinfo.name for tarinfo in tar_file.getmembers()]
src_files = [file.name for file in FILES_TO_TAR]
assert set(files) == set(src_files)
он записывает их synchronuosly
ОК, а затем правильно его написать, мне нужно запустить Suppwriter.write синхронно? Я пытался сделать < /p>Код: Выделить всё
class DumpWriter:
def write(buffer):
print(f"Really sending {len(buffer)} bytes")
asyncio.run_coroutine_threadsafe(
send(
{
"type": "http.response.body",
"body": buffer,
"more_body": True,
}
),
asyncio.get_running_loop(),
)
Подробнее здесь: https://stackoverflow.com/questions/796 ... -in-fastap