Как управлять одновременными веб-запросами с ограничением скорости к нескольким доменам, используя только stdlib Python? Я не спрашиваю об алгоритмах, таких как дырявое ведро или решения для одного домена, а о том, как подходить к структурам данных и кода для одновременных запросов к нескольким доменам.
По теме
Следующие сообщения содержат полезную информацию, но не решают мою проблему. Они не учитывают одновременные запросы и ограничения скорости для каждого домена, и во многих ответах используются сторонние модули.
- Как ограничить частоту запросов к веб-службам в Python?
- Какой алгоритм ограничения скорости является хорошим?
- Какую структуру данных использовать для «динамической» приоритетной очереди?
- Очередь приоритетов с динамическими приоритетами элементов
Загрузка данных с нескольких доменов. Типичная схема:
- добавьте несколько элементов загрузки из первого домена
- добавьте несколько элементов загрузки из второго домена< /li>
пока загружаются элементы первого домена, установите ограничение скорости и отложите на некоторое время - продолжайте загрузку элементов из второго домена, пока первый домен ожидает возобновления < -- КАК РЕАЛИЗОВАТЬ ЭТУ ЧАСТЬ?
Не могу/не хочу устанавливать сторонние библиотеки, например, из pypi, чтобы минимизировать зависимости. и поддерживать безопасность. Код должен быть автономным, используя только stdlib Python.
Текущий подход
Я использую реализацию производителя/потребителя с одной очередью и несколькими рабочими потоками. для одновременной загрузки. Он использует дырявое ведро для ограничения скорости с помощью временных меток (без отбрасывания, элементы с ограниченной скоростью просто ждут). Прекрасно работает для загрузок с одного домена. Концептуально так:
Код: Выделить всё
def dl_worker (que) :
while true :
item = que.get ()
if paused (item.domain) : # waiting due to rate limit
time.sleep (backofftime (item.domain))
# dl item...
# start pool of dl threads
dlque = queue.Queue ()
workers = multiprocessing.pool.ThreadPool (5)
with workers :
for x in workers._pool :
workers.apply_async (dl_worker, dlque)
Отклоненные решения
- Наивным решением было бы, если бы рабочие потоки циклически проходили всю очередь, когда они достигают отсрочки для домена1. Что-то вроде:
Код: Выделить всё
def dl_worker () :
while true :
item = dlque.get ()
if paused (item.domain) : # waiting due to rate limit
dlque.put (item) # move item to back of queue
continue
# process item...
- Другой вариант — создать отдельный пул рабочих для каждого домена. Это изолировало бы паузы только для работников этого домена. Но домены могут быть довольно большими, что приводит к огромному количеству потоков и истощению ресурсов. И очередь становится более сложной - нужен диспетчер, который будет потреблять элементы из входящей очереди и распределять их по отдельным очередям для каждого пула потоков/домена.
- Можно также использовать Сторонние библиотеки, такие какpyrate-limiter илиratelimit, как показано здесь. Но это нарушает мое требование только к stdlib и решает только проблему ограничения скорости, а не проблему нескольких доменов.
Основная проблема текущего решения — единая очередь. Я придумал еще несколько подходов, но все они имеют недостатки:
- Использовать отдельную очередь для каждого домена. Кажется, не существует способа ожидания () в нескольких очередях в Python (например, select в дескрипторах файлов). Мне приходится вручную опрашивать все очереди, чтобы узнать, какие из них готовы. Что-то вроде (представьте подходящие блокировки резьбы там, где это необходимо):
Код: Выделить всё
allques = {}
def put (item):
que = allques.setdefault (item.domain, queue.Queue ())
que.put (item)
def dl_worker (que):
while true :
# find which queues have items ready to process
active = [ x for x in allques if x not in paused ]
ready = [ a for a in active if me.allques [a].qsize () ]
if ready :
for domain in ready :
try :
item = me.allques [domain].get (block = false)
# process item ...
break
except Empty :
# qsize was wrong, no biggie, move on to next queue
pass
else :
# wait and poll again
time.sleep (0.5)
Использование отдельных семафоров для каждой очереди создает одну и ту же проблему опроса.
Этот подход также не сохраняет порядок элементов dl. Просто берет первый элемент из первой найденной очереди готовых. Я мог бы использовать random.shuffle, чтобы хотя бы рандомизировать, из какой очереди выбирается. Сохранение порядка элементов кажется трудным. Потребуется отдельная структура данных для отслеживания порядка вставки во всех очередях. Кажется, больше проблем, чем пользы.
- Я мог бы использовать две очереди: активную и приостановленную. Элементы извлекаются из активной очереди. Если домен находится в периоде отсрочки, сохраните элемент в приостановленной очереди до истечения срока отсрочки. Я думаю, что здесь та же проблема. А именно, мне нужен поток диспетчера, который будет следить за приостановленной очередью и перемещать элементы обратно в активную очередь по истечении периода отсрочки.
Или мне нужна структура данных, не относящаяся к очереди, чтобы я мог выполнить любой предмет готов. Но тогда мне снова понадобится опрос сна (блокировка вызова get недоступна).
- Я мог бы использовать другую структуру данных, чем очередь для фильтрации элементов, в настоящее время находящихся в периоде отсрочки. Хотя я не уверен, какая это будет структура. PriorityQueue, похоже, не помогает.
Во-вторых, PriorityQueue всегда возвращает элемент, если он доступен. Я не могу установить период отсрочки, чтобы сказать «держать этот элемент в очереди до момента X, а затем вернуть его».
- Я мог бы использовать кучу вместо очереди для получения любого объекта. Но сложно поддерживать кучу в отсортированном порядке. По мере загрузки и срабатывания ограничений приоритет элемента меняется динамически. Использование кучи каждый раз, когда происходит откат, кажется неэффективным. А кучи Python не блокируют get(), они либо возвращают, либо выбрасывают. Все еще нужен опрос time.sleep.
Пока вариант 1 кажется лучшим решением, несмотря на его недостатки. Есть ли у кого-нибудь идеи получше?
Я по своей сути не привязан к модели потоков. Асинхронность тоже может работать. Я ненавижу засорять свой код мусором async/await и проблемой с красной/синей функцией. Но если есть более чистое решение, его стоит рассмотреть.
Подробнее здесь: https://stackoverflow.com/questions/792 ... tdlib-only