Код, привязанный к процессору, блокирует основной поток с помощью faststream и fastapiPython

Программы на Python
Ответить
Anonymous
 Код, привязанный к процессору, блокирует основной поток с помощью faststream и fastapi

Сообщение Anonymous »

У меня есть zip-файл с таблицами CSV, которые читаются и форматируются с помощью поляров. Я хочу запустить его, чтобы эта операция не блокировала мой основной поток. Но он блокирует его, и fastapi не может обрабатывать входящие запросы, пока форматируются кадры данных.
У меня есть конечная точка fastapi, на которую пользователь отправляет zip-файл. Zip-файл отправляется в хранилище S3.
И у меня есть очередь faststream, которая получает этот zip-файл из S3, разархивирует его и обрабатывает CSV-файлы с помощью поляров.
Конечная точка Fastapi
@router.post("/upload",)
@inject
async def upload_file(
file: UploadFile,
file_processing_service: FileProcessingService = Depends(
Provide[Container.file_processing_service]
),
authrozied: dict = Depends(role_checker),
user_data: dict = Depends(verify_token)

):
file_id = str(uuid4())
zip_file = ZipFile(content=BytesIO(file.file.read()), filename=file.filename, id=file_id)
zip_file.filename = translit(zip_file.filename, "ru", reversed=True)
file_processing_service.save_zip_file(zip_file)
connection_id = user_data.get("sid")
await brocker().publish({"zip_id": file_id, "client_id":connection_id}, "file_preparing_queue")
return "File recieved"

Очередь Fastsream
router = RabbitRouter(RabbitMQSettings().get_url())

def brocker() -> RabbitBroker:
return router.broker

active_connections: Dict[str, WebSocket] = {}

@router.subscriber("file_preparing_queue")
@inject
async def file_preparing_queue(
zip_id_and_client: dict,
file_processing_service: FileProcessingService = Depends(
Provide[Container.file_processing_service]
),
):
zip_file = file_processing_service.get_zip_file(zip_id_and_client.get("zip_id"))
dataset = file_processing_service.process_zip_file(zip_file)
parquet_buffer = convert_to_parquet(dataset)
parquet_file = ParquetFile(
content=parquet_buffer,
id=zip_file.id,
filename=zip_file.filename.replace(".zip", ".parquet"),
)
client_id = zip_id_and_client.get("client_id")
dataset_metadata = DatasetMetadata(
id=zip_file.id,
state="loaded",
not_validated_messages=[],
filename=parquet_file.filename,
user_id=client_id,
)
dataset_metadata = DatasetMetadata.model_validate(dataset_metadata)
file_processing_service.save_as_parquet(parquet_file)
file_processing_service.add_dataset_metadata(dataset_metadata)
try:
await manager.send_message(
client_id, {"message": "Preparing finished"}
)
except Exception as e:
module_logging.error("Error sending validation response", e)
await router.broker.publish({"dataset_id": zip_file.id, "connection_id": client_id}, "validation_queue")


Подробнее здесь: https://stackoverflow.com/questions/792 ... nd-fastapi
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «Python»