У меня проблемы с асинхронной очередью. В частности, await length.get(), кажется, блокирует остальную часть моего приложения, вызывая его зависание.
Я столкнулся с аналогичной проблемой, обсуждаемой здесь: Почему asyncio очередь ожидает блокировки get()? блокировки, но мне все еще трудно понять, как решить проблему в моем случае.
В своей настройке я пытаюсь создать API веб-скрапинга (с FastAPI), который позволит пользователям отправлять URL-адреса, которые группируются и обрабатываются различными веб-скребками, выполняющимися в потребительских задачах. Однако похоже, что как только потребители ждут новых пакетов из очереди, приложение останавливается. В частности, я вызываю start_consumers() во время события жизненного цикла FastAPI перед запуском приложения, но приложение никогда не инициализируется полностью, поскольку оно блокируется.
Есть ли способ изменить мою настройку, чтобы потребители могли ждать новых элементов в очереди, не блокируя остальную часть приложения? Или этот подход обречен?
import asyncio
from asyncio.queues import Queue
from internal.services.webscraping.base import BaseScraper
class QueueController:
_logger = create_logger(__name__)
def __init__(
self,
scrapers: list[BaseScraper],
batch_size: int = 50
):
self.queue = Queue()
self.batch_size = batch_size
self.scrapers = scrapers
self.consumers = []
self.running = False
async def put(self, urls: list[str]) -> None:
"""
Add batches of URLs to the queue.
"""
# Split the list of URLs into batches of size self.batch_size and add them to the queue.
# If the queue is full, wait until there is space available.
for i in range(0, len(urls), self.batch_size):
batch = urls[i:i + self.batch_size]
await self.queue.put(batch)
async def get(self) -> list[str]:
"""
Retrieve a batch from the queue.
"""
# Get a batch of URLs from the queue.
# If queue is empty, wait until an item is available.
return await self.queue.get()
async def consumer(self, scraper: BaseScraper) -> None:
"""
Consumer coroutine that processes batches of URLs.
"""
# Consumer tasks are designed to run in an infinite loop (as long as self.running is
# True) and fetch batches of URLs from the queue.
while self.running:
try:
batch = await self.get()
if batch:
records = await scraper.run(batch)
# TODO: Handle saving of result
except Exception as e:
# TODO: Add proper error handling
...
raise e
async def start_consumers(self) -> None:
"""
Start the consumer tasks.
Notes:
https://docs.python.org/3/library/asyncio-task.html
https://superfastpython.com/asyncio-task/
"""
self.running = True
self.consumers = [
asyncio.create_task(self.consumer(scraper)) for scraper in self.scrapers
]
await asyncio.gather(*self.consumers)
async def stop_consumers(self) -> None:
"""
Stop all consumer tasks gracefully.
"""
self.running = False
for task in self.consumers:
task.cancel()
Подробнее здесь: https://stackoverflow.com/questions/789 ... rs-to-hang
Почему await length.get() блокирует и приводит к зависанию моих асинхронных потребителей? ⇐ Python
-
- Похожие темы
- Ответы
- Просмотры
- Последнее сообщение