Как сгенерировать поток tar.gz, который будет возвращен в качестве потокового ответа в Fastapi/Starlette?Python

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 Как сгенерировать поток tar.gz, который будет возвращен в качестве потокового ответа в Fastapi/Starlette?

Сообщение Anonymous »

Я делаю сервер Fastapi/Starlette, который запрашивает другой сервер (ну, S3) большие файлы. Эти куски подают объект tarfile.tarfile для создания потока .tar.gz . Этот поток должен быть отправлен на лету на StreamingResponse .

Код: Выделить всё

S3 server --files chunks--> My server --tar.gz chunks--> User
Существует большая проблема с реализацией tarfile.tarfile как:

она записывает кусочки в файловых объектах (с помощью write method),

Это нелегко быть взаимосвязанным с stralette. /> Моя идея состояла в том, чтобы переписать метод StreamingResponse.stream_response < /code> (CF здесь). Что-то вроде этого: < /p>

Код: Выделить всё

"""A script to test the tar.gz streaming."""

import os
import tarfile
from pathlib import Path
from typing import Mapping

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from fastapi.testclient import TestClient
from starlette.background import BackgroundTask
from starlette.types import Send

TAR_FILE_PATH = Path("archive.tar.gz")

CHUNK_SIZE = 1024

app = FastAPI()

class FileStreamingResponse(StreamingResponse):

def __init__(
self,
files_to_tar: list[Path],
status_code: int = 200,
headers: Mapping[str, str] | None = None,
media_type: str | None = None,
background: BackgroundTask | None = None,
) -> None:
self.files_to_tar = files_to_tar
self.status_code = status_code
self.media_type = self.media_type if media_type is None else media_type
self.background = background
self.init_headers(headers)

async def stream_response(self, send: Send) -> None:
await send(
{
"type": "http.response.start",
"status": self.status_code,
"headers": self.raw_headers,
}
)

class DumpWriter:
async def write(buffer):
print(f"Really sending {len(buffer)} bytes")
await send(
{"type": "http.response.body", "body": buffer, "more_body": True}
)

async with tarfile.open(
mode="w|gz", fileobj=DumpWriter(), bufsize=CHUNK_SIZE
) as file:
for input_file in self.files_to_tar:
await file.add(input_file.open("rb"))

await send({"type": "http.response.body", "body": b"", "more_body": False})

FILES_TO_TAR = Path("src").iterdir()

@app.get("/")
def send_tar() -> StreamingResponse:
"""Send a tar file."""

return FileStreamingResponse(
files_to_tar=FILES_TO_TAR,
media_type="application/tar+gzip",
headers={"Content-Disposition": f'attachment; filename="{TAR_FILE_PATH.name}"'},
)

#
# TESTS
#

TEST_TAR_FILE_PATH = Path("archive.tar.gz")
client = TestClient(app)

def test_main():
if TEST_TAR_FILE_PATH.exists():
os.remove(TEST_TAR_FILE_PATH)

response = client.get("/")
response.raise_for_status()

with TEST_TAR_FILE_PATH.open("wb") as file:
for chunk in response.iter_bytes(CHUNK_SIZE):
file.write(chunk)

with tarfile.open(TEST_TAR_FILE_PATH, "r:gz") as tar_file:
files = [tarinfo.name for tarinfo in tar_file.getmembers()]

src_files = [file.name for file in FILES_TO_TAR]
assert set(files) == set(src_files)
Но проблема в том, что ...

он записывает их synchronuosly

ОК, а затем правильно его написать, мне нужно запустить Suppwriter.write синхронно? Я пытался сделать < /p>

Код: Выделить всё

        class DumpWriter:
def write(buffer):
print(f"Really sending {len(buffer)} bytes")
asyncio.run_coroutine_threadsafe(
send(
{
"type": "http.response.body",
"body": buffer,
"more_body": True,
}
),
asyncio.get_running_loop(),
)
< /code>
Но кусочки данных не были переданы.  MWE (запустить pytest main.py 
)

Код: Выделить всё

"""A script to test the tar.gz streaming."""

import asyncio
import os
import tarfile
from pathlib import Path
from typing import Mapping

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from fastapi.testclient import TestClient
from starlette.background import BackgroundTask
from starlette.types import Send

TAR_FILE_PATH = Path("archive.tar.gz")

CHUNK_SIZE = 1024

app = FastAPI()

FILES_TO_TAR = [("tarfile.py", Path(tarfile.__file__)), ("os.py", Path(os.__file__))]

class FileStreamingResponse(StreamingResponse):

def __init__(
self,
files_to_tar: list[tuple[str, Path]],
status_code: int = 200,
headers: Mapping[str, str] | None = None,
media_type: str | None = None,
background: BackgroundTask | None = None,
) -> None:
self.files_to_tar = files_to_tar
self.status_code = status_code
self.media_type = self.media_type if media_type is None else media_type
self.background = background
self.init_headers(headers)

async def stream_response(self, send: Send) -> None:
await send(
{
"type": "http.response.start",
"status": self.status_code,
"headers": self.raw_headers,
}
)

class DumpWriter:
def write(self, buffer):
print(f"Really sending {len(buffer)} bytes")
asyncio.run_coroutine_threadsafe(
send(
{
"type": "http.response.body",
"body": buffer,
"more_body": True,
}
),
asyncio.get_running_loop(),
)

async with tarfile.open(
mode="w|gz", fileobj=DumpWriter(), bufsize=CHUNK_SIZE
) as file:
for name, input_path in self.files_to_tar:
await file.addfile(tarfile.TarInfo(name), input_path.open("rb"))

await send({"type": "http.response.body", "body": b"", "more_body": False})

@app.get("/")
def send_tar() -> StreamingResponse:
"""Send a tar file."""

return FileStreamingResponse(
files_to_tar=FILES_TO_TAR,
media_type="application/tar+gzip",
headers={"Content-Disposition": f'attachment; filename="{TAR_FILE_PATH.name}"'},
)

#
# TESTS
#

TEST_TAR_FILE_PATH = Path("archive.tar.gz")
client = TestClient(app)

def test_main():
if TEST_TAR_FILE_PATH.exists():
os.remove(TEST_TAR_FILE_PATH)

response = client.get("/")
response.raise_for_status()

with TEST_TAR_FILE_PATH.open("wb") as file:
for chunk in response.iter_bytes(CHUNK_SIZE):
file.write(chunk)

with tarfile.open(TEST_TAR_FILE_PATH, "r:gz") as tar_file:
files = [tarinfo.name for tarinfo in tar_file.getmembers()]

src_files_names = [file[0] for file in FILES_TO_TAR]
assert set(files) == set(src_files_names)
< /code>
Как объяснялось ранее, я получаю такую ошибку: < /p>
>       async with tarfile.open(
mode="w|gz", fileobj=DumpWriter(), bufsize=CHUNK_SIZE
) as file:
E       TypeError: 'TarFile' object does not support the asynchronous context manager protocol
Моя проблема здесь заключается в том, что мне понадобится

[*] или сделать Tarfile асинхронно писать в своем объекте FileObjet (но в потоковом режиме, он записывает через Sub _Stream , который не является асинхронным), но и Syrchrony
или to trankwritewritewrite. Функция внутренней отправки асинхронно (мои предыдущие тесты дали странные результаты, как если бы это было не передаваемое данные. Тем не менее, я не очень хорош с циклами, потоками и т. Д.) /> Я не могу понять, какое решение использовать и как это сделать. < /p>
справка! < /p>

Подробнее здесь: https://stackoverflow.com/questions/796 ... in-fastapi
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «Python»