Код: Выделить всё
S3 server --files chunks--> My server --tar.gz chunks--> User
она записывает кусочки в файловых объектах (с помощью write method),
Это нелегко быть взаимосвязанным с stralette. /> Моя идея состояла в том, чтобы переписать метод StreamingResponse.stream_response < /code> (CF здесь). Что-то вроде этого: < /p>Код: Выделить всё
"""A script to test the tar.gz streaming."""
import os
import tarfile
from pathlib import Path
from typing import Mapping
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from fastapi.testclient import TestClient
from starlette.background import BackgroundTask
from starlette.types import Send
TAR_FILE_PATH = Path("archive.tar.gz")
CHUNK_SIZE = 1024
app = FastAPI()
class FileStreamingResponse(StreamingResponse):
def __init__(
self,
files_to_tar: list[Path],
status_code: int = 200,
headers: Mapping[str, str] | None = None,
media_type: str | None = None,
background: BackgroundTask | None = None,
) -> None:
self.files_to_tar = files_to_tar
self.status_code = status_code
self.media_type = self.media_type if media_type is None else media_type
self.background = background
self.init_headers(headers)
async def stream_response(self, send: Send) -> None:
await send(
{
"type": "http.response.start",
"status": self.status_code,
"headers": self.raw_headers,
}
)
class DumpWriter:
async def write(buffer):
print(f"Really sending {len(buffer)} bytes")
await send(
{"type": "http.response.body", "body": buffer, "more_body": True}
)
async with tarfile.open(
mode="w|gz", fileobj=DumpWriter(), bufsize=CHUNK_SIZE
) as file:
for input_file in self.files_to_tar:
await file.add(input_file.open("rb"))
await send({"type": "http.response.body", "body": b"", "more_body": False})
FILES_TO_TAR = Path("src").iterdir()
@app.get("/")
def send_tar() -> StreamingResponse:
"""Send a tar file."""
return FileStreamingResponse(
files_to_tar=FILES_TO_TAR,
media_type="application/tar+gzip",
headers={"Content-Disposition": f'attachment; filename="{TAR_FILE_PATH.name}"'},
)
#
# TESTS
#
TEST_TAR_FILE_PATH = Path("archive.tar.gz")
client = TestClient(app)
def test_main():
if TEST_TAR_FILE_PATH.exists():
os.remove(TEST_TAR_FILE_PATH)
response = client.get("/")
response.raise_for_status()
with TEST_TAR_FILE_PATH.open("wb") as file:
for chunk in response.iter_bytes(CHUNK_SIZE):
file.write(chunk)
with tarfile.open(TEST_TAR_FILE_PATH, "r:gz") as tar_file:
files = [tarinfo.name for tarinfo in tar_file.getmembers()]
src_files = [file.name for file in FILES_TO_TAR]
assert set(files) == set(src_files)
он записывает их synchronuosly
ОК, а затем правильно его написать, мне нужно запустить Suppwriter.write синхронно? Я пытался сделать < /p>Код: Выделить всё
class DumpWriter:
def write(buffer):
print(f"Really sending {len(buffer)} bytes")
asyncio.run_coroutine_threadsafe(
send(
{
"type": "http.response.body",
"body": buffer,
"more_body": True,
}
),
asyncio.get_running_loop(),
)
< /code>
Но кусочки данных не были переданы. MWE (запустить pytest main.py
Код: Выделить всё
"""A script to test the tar.gz streaming."""
import asyncio
import os
import tarfile
from pathlib import Path
from typing import Mapping
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from fastapi.testclient import TestClient
from starlette.background import BackgroundTask
from starlette.types import Send
TAR_FILE_PATH = Path("archive.tar.gz")
CHUNK_SIZE = 1024
app = FastAPI()
FILES_TO_TAR = [("tarfile.py", Path(tarfile.__file__)), ("os.py", Path(os.__file__))]
class FileStreamingResponse(StreamingResponse):
def __init__(
self,
files_to_tar: list[tuple[str, Path]],
status_code: int = 200,
headers: Mapping[str, str] | None = None,
media_type: str | None = None,
background: BackgroundTask | None = None,
) -> None:
self.files_to_tar = files_to_tar
self.status_code = status_code
self.media_type = self.media_type if media_type is None else media_type
self.background = background
self.init_headers(headers)
async def stream_response(self, send: Send) -> None:
await send(
{
"type": "http.response.start",
"status": self.status_code,
"headers": self.raw_headers,
}
)
class DumpWriter:
def write(self, buffer):
print(f"Really sending {len(buffer)} bytes")
asyncio.run_coroutine_threadsafe(
send(
{
"type": "http.response.body",
"body": buffer,
"more_body": True,
}
),
asyncio.get_running_loop(),
)
async with tarfile.open(
mode="w|gz", fileobj=DumpWriter(), bufsize=CHUNK_SIZE
) as file:
for name, input_path in self.files_to_tar:
await file.addfile(tarfile.TarInfo(name), input_path.open("rb"))
await send({"type": "http.response.body", "body": b"", "more_body": False})
@app.get("/")
def send_tar() -> StreamingResponse:
"""Send a tar file."""
return FileStreamingResponse(
files_to_tar=FILES_TO_TAR,
media_type="application/tar+gzip",
headers={"Content-Disposition": f'attachment; filename="{TAR_FILE_PATH.name}"'},
)
#
# TESTS
#
TEST_TAR_FILE_PATH = Path("archive.tar.gz")
client = TestClient(app)
def test_main():
if TEST_TAR_FILE_PATH.exists():
os.remove(TEST_TAR_FILE_PATH)
response = client.get("/")
response.raise_for_status()
with TEST_TAR_FILE_PATH.open("wb") as file:
for chunk in response.iter_bytes(CHUNK_SIZE):
file.write(chunk)
with tarfile.open(TEST_TAR_FILE_PATH, "r:gz") as tar_file:
files = [tarinfo.name for tarinfo in tar_file.getmembers()]
src_files_names = [file[0] for file in FILES_TO_TAR]
assert set(files) == set(src_files_names)
< /code>
Как объяснялось ранее, я получаю такую ошибку: < /p>
> async with tarfile.open(
mode="w|gz", fileobj=DumpWriter(), bufsize=CHUNK_SIZE
) as file:
E TypeError: 'TarFile' object does not support the asynchronous context manager protocol
[*] или сделать Tarfile асинхронно писать в своем объекте FileObjet (но в потоковом режиме, он записывает через Sub _Stream , который не является асинхронным), но и Syrchrony
или to trankwritewritewrite. Функция внутренней отправки асинхронно (мои предыдущие тесты дали странные результаты, как если бы это было не передаваемое данные. Тем не менее, я не очень хорош с циклами, потоками и т. Д.) /> Я не могу понять, какое решение использовать и как это сделать. < /p>
справка! < /p>
Подробнее здесь: https://stackoverflow.com/questions/796 ... in-fastapi