Почему await length.get() блокирует и приводит к зависанию моих асинхронных потребителей?Python

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 Почему await length.get() блокирует и приводит к зависанию моих асинхронных потребителей?

Сообщение Anonymous »

У меня проблемы с асинхронной очередью. В частности, await length.get(), кажется, блокирует остальную часть моего приложения, вызывая его зависание.
Я столкнулся с аналогичной проблемой, обсуждаемой здесь: Почему asyncio очередь ожидает блокировки get()? блокировки, но мне все еще трудно понять, как решить проблему в моем случае.
В своей настройке я пытаюсь создать API веб-скрапинга (с FastAPI), который позволит пользователям отправлять URL-адреса, которые группируются и обрабатываются различными веб-скребками, выполняющимися в потребительских задачах. Однако похоже, что как только потребители ждут новых пакетов из очереди, приложение останавливается. В частности, я вызываю start_consumers() во время события жизненного цикла FastAPI перед запуском приложения, но приложение никогда не инициализируется полностью, поскольку оно блокируется.
Есть ли способ изменить мою настройку, чтобы потребители могли ждать новых элементов в очереди, не блокируя остальную часть приложения? Или этот подход обречен?
import asyncio
from asyncio.queues import Queue

from internal.services.webscraping.base import BaseScraper

class QueueController:

_logger = create_logger(__name__)

def __init__(
self,
scrapers: list[BaseScraper],
batch_size: int = 50
):
self.queue = Queue()
self.batch_size = batch_size
self.scrapers = scrapers
self.consumers = []
self.running = False

async def put(self, urls: list[str]) -> None:
"""
Add batches of URLs to the queue.
"""
# Split the list of URLs into batches of size self.batch_size and add them to the queue.
# If the queue is full, wait until there is space available.
for i in range(0, len(urls), self.batch_size):
batch = urls[i:i + self.batch_size]
await self.queue.put(batch)

async def get(self) -> list[str]:
"""
Retrieve a batch from the queue.
"""
# Get a batch of URLs from the queue.
# If queue is empty, wait until an item is available.
return await self.queue.get()

async def consumer(self, scraper: BaseScraper) -> None:
"""
Consumer coroutine that processes batches of URLs.
"""
# Consumer tasks are designed to run in an infinite loop (as long as self.running is
# True) and fetch batches of URLs from the queue.
while self.running:
try:
batch = await self.get()
if batch:
records = await scraper.run(batch)
# TODO: Handle saving of result
except Exception as e:
# TODO: Add proper error handling
...
raise e

async def start_consumers(self) -> None:
"""
Start the consumer tasks.

Notes:
https://docs.python.org/3/library/asyncio-task.html
https://superfastpython.com/asyncio-task/
"""
self.running = True
self.consumers = [
asyncio.create_task(self.consumer(scraper)) for scraper in self.scrapers
]
await asyncio.gather(*self.consumers)

async def stop_consumers(self) -> None:
"""
Stop all consumer tasks gracefully.
"""
self.running = False
for task in self.consumers:
task.cancel()


Подробнее здесь: https://stackoverflow.com/questions/789 ... rs-to-hang
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «Python»