У меня есть zip-файл с таблицами CSV, которые читаются и форматируются с помощью поляров. Я хочу запустить его, чтобы эта операция не блокировала мой основной поток. Но он блокирует его, и fastapi не может обрабатывать входящие запросы, пока форматируются кадры данных.
У меня есть конечная точка fastapi, на которую пользователь отправляет zip-файл. Zip-файл отправляется в хранилище S3.
И у меня есть очередь faststream, которая получает этот zip-файл из S3, разархивирует его и обрабатывает CSV-файлы с помощью поляров.
Конечная точка Fastapi
@router.post("/upload",)
@inject
async def upload_file(
file: UploadFile,
file_processing_service: FileProcessingService = Depends(
Provide[Container.file_processing_service]
),
authrozied: dict = Depends(role_checker),
user_data: dict = Depends(verify_token)
):
file_id = str(uuid4())
zip_file = ZipFile(content=BytesIO(file.file.read()), filename=file.filename, id=file_id)
zip_file.filename = translit(zip_file.filename, "ru", reversed=True)
file_processing_service.save_zip_file(zip_file)
connection_id = user_data.get("sid")
await brocker().publish({"zip_id": file_id, "client_id":connection_id}, "file_preparing_queue")
return "File recieved"
Очередь Fastsream
router = RabbitRouter(RabbitMQSettings().get_url())
def brocker() -> RabbitBroker:
return router.broker
active_connections: Dict[str, WebSocket] = {}
@router.subscriber("file_preparing_queue")
@inject
async def file_preparing_queue(
zip_id_and_client: dict,
file_processing_service: FileProcessingService = Depends(
Provide[Container.file_processing_service]
),
):
zip_file = file_processing_service.get_zip_file(zip_id_and_client.get("zip_id"))
dataset = file_processing_service.process_zip_file(zip_file)
parquet_buffer = convert_to_parquet(dataset)
parquet_file = ParquetFile(
content=parquet_buffer,
id=zip_file.id,
filename=zip_file.filename.replace(".zip", ".parquet"),
)
client_id = zip_id_and_client.get("client_id")
dataset_metadata = DatasetMetadata(
id=zip_file.id,
state="loaded",
not_validated_messages=[],
filename=parquet_file.filename,
user_id=client_id,
)
dataset_metadata = DatasetMetadata.model_validate(dataset_metadata)
file_processing_service.save_as_parquet(parquet_file)
file_processing_service.add_dataset_metadata(dataset_metadata)
try:
await manager.send_message(
client_id, {"message": "Preparing finished"}
)
except Exception as e:
module_logging.error("Error sending validation response", e)
await router.broker.publish({"dataset_id": zip_file.id, "connection_id": client_id}, "validation_queue")
Подробнее здесь: https://stackoverflow.com/questions/792 ... nd-fastapi
Код, привязанный к процессору, блокирует основной поток с помощью faststream и fastapi ⇐ Python
Программы на Python
1732317898
Anonymous
У меня есть zip-файл с таблицами CSV, которые читаются и форматируются с помощью поляров. Я хочу запустить его, чтобы эта операция не блокировала мой основной поток. Но он блокирует его, и fastapi не может обрабатывать входящие запросы, пока форматируются кадры данных.
У меня есть конечная точка fastapi, на которую пользователь отправляет zip-файл. Zip-файл отправляется в хранилище S3.
И у меня есть очередь faststream, которая получает этот zip-файл из S3, разархивирует его и обрабатывает CSV-файлы с помощью поляров.
Конечная точка Fastapi
@router.post("/upload",)
@inject
async def upload_file(
file: UploadFile,
file_processing_service: FileProcessingService = Depends(
Provide[Container.file_processing_service]
),
authrozied: dict = Depends(role_checker),
user_data: dict = Depends(verify_token)
):
file_id = str(uuid4())
zip_file = ZipFile(content=BytesIO(file.file.read()), filename=file.filename, id=file_id)
zip_file.filename = translit(zip_file.filename, "ru", reversed=True)
file_processing_service.save_zip_file(zip_file)
connection_id = user_data.get("sid")
await brocker().publish({"zip_id": file_id, "client_id":connection_id}, "file_preparing_queue")
return "File recieved"
Очередь Fastsream
router = RabbitRouter(RabbitMQSettings().get_url())
def brocker() -> RabbitBroker:
return router.broker
active_connections: Dict[str, WebSocket] = {}
@router.subscriber("file_preparing_queue")
@inject
async def file_preparing_queue(
zip_id_and_client: dict,
file_processing_service: FileProcessingService = Depends(
Provide[Container.file_processing_service]
),
):
zip_file = file_processing_service.get_zip_file(zip_id_and_client.get("zip_id"))
dataset = file_processing_service.process_zip_file(zip_file)
parquet_buffer = convert_to_parquet(dataset)
parquet_file = ParquetFile(
content=parquet_buffer,
id=zip_file.id,
filename=zip_file.filename.replace(".zip", ".parquet"),
)
client_id = zip_id_and_client.get("client_id")
dataset_metadata = DatasetMetadata(
id=zip_file.id,
state="loaded",
not_validated_messages=[],
filename=parquet_file.filename,
user_id=client_id,
)
dataset_metadata = DatasetMetadata.model_validate(dataset_metadata)
file_processing_service.save_as_parquet(parquet_file)
file_processing_service.add_dataset_metadata(dataset_metadata)
try:
await manager.send_message(
client_id, {"message": "Preparing finished"}
)
except Exception as e:
module_logging.error("Error sending validation response", e)
await router.broker.publish({"dataset_id": zip_file.id, "connection_id": client_id}, "validation_queue")
Подробнее здесь: [url]https://stackoverflow.com/questions/79216832/cpu-bound-code-blocks-main-thread-with-faststream-and-fastapi[/url]
Ответить
1 сообщение
• Страница 1 из 1
Перейти
- Кемерово-IT
- ↳ Javascript
- ↳ C#
- ↳ JAVA
- ↳ Elasticsearch aggregation
- ↳ Python
- ↳ Php
- ↳ Android
- ↳ Html
- ↳ Jquery
- ↳ C++
- ↳ IOS
- ↳ CSS
- ↳ Excel
- ↳ Linux
- ↳ Apache
- ↳ MySql
- Детский мир
- Для души
- ↳ Музыкальные инструменты даром
- ↳ Печатная продукция даром
- Внешняя красота и здоровье
- ↳ Одежда и обувь для взрослых даром
- ↳ Товары для здоровья
- ↳ Физкультура и спорт
- Техника - даром!
- ↳ Автомобилистам
- ↳ Компьютерная техника
- ↳ Плиты: газовые и электрические
- ↳ Холодильники
- ↳ Стиральные машины
- ↳ Телевизоры
- ↳ Телефоны, смартфоны, плашеты
- ↳ Швейные машинки
- ↳ Прочая электроника и техника
- ↳ Фототехника
- Ремонт и интерьер
- ↳ Стройматериалы, инструмент
- ↳ Мебель и предметы интерьера даром
- ↳ Cантехника
- Другие темы
- ↳ Разное даром
- ↳ Давай меняться!
- ↳ Отдам\возьму за копеечку
- ↳ Работа и подработка в Кемерове
- ↳ Давай с тобой поговорим...
Мобильная версия