Ограничение скорости одновременных веб-запросов несколькими доменами, только stdlibPython

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 Ограничение скорости одновременных веб-запросов несколькими доменами, только stdlib

Сообщение Anonymous »

Вопрос
Как управлять одновременными веб-запросами с ограничением скорости к нескольким доменам, используя только stdlib Python? Я не спрашиваю об алгоритмах, таких как дырявое ведро или решения для одного домена, а о том, как подходить к структурам данных и кода для одновременных запросов к нескольким доменам.
По теме
Следующие сообщения содержат полезную информацию, но не решают мою проблему. Они не учитывают одновременные запросы и ограничения скорости для каждого домена, и во многих ответах используются сторонние модули.
  • Как ограничить частоту запросов к веб-службам в Python?
  • Какой алгоритм ограничения скорости является хорошим?
  • Какую структуру данных использовать для «динамической» приоритетной очереди?
  • Очередь приоритетов с динамическими приоритетами элементов
  • Ограничение параллелизма и скорости для потоков Python
  • Распределенное ограничение скорости
  • Распределенное сканирование и ограничение скорости/управление потоком
Этот вопрос ближе всего, очень похожая установка (хотя я не возвращаюсь к одним и тем же ссылкам). Но подход использует внешнюю базу данных и PHP/Java, а не Python. Слишком тяжело, нужно решение на Python.
  • Распределенное сканирование и ограничение скорости/управление потоком
Настройка
Загрузка данных из нескольких доменов. Типичная схема:
  • добавьте несколько элементов загрузки из первого домена
  • добавьте несколько элементов загрузки из второго домена< /li>
    пока загружаются элементы первого домена, установите ограничение скорости и отложите на некоторое время
  • продолжайте загрузку элементов из второго домена, пока первый домен ожидает возобновления < -- КАК РЕАЛИЗОВАТЬ ЭТУ ЧАСТЬ?
Это немного упрощено. На самом деле дополнительные элементы загрузки могут быть добавлены в любой момент на протяжении продолжающегося процесса.
Не могу/не хочу устанавливать сторонние библиотеки, например, из pypi, чтобы минимизировать зависимости. и поддерживать безопасность. Код должен быть автономным, используя только stdlib Python.
Текущий подход
Я использую реализацию производителя/потребителя с одной очередью и несколькими рабочими потоками. для одновременной загрузки. Он использует дырявое ведро для ограничения скорости с помощью временных меток (без отбрасывания, элементы с ограниченной скоростью просто ждут). Прекрасно работает для загрузок с одного домена. Концептуально так:

Код: Выделить всё

def dl_worker (que) :
while true :
item = que.get ()
if paused (item.domain) :   # waiting due to rate limit
time.sleep (backofftime (item.domain))
# dl item...

# start pool of dl threads
dlque = queue.Queue ()
workers = multiprocessing.pool.ThreadPool (5)
with workers :
for x in workers._pool :
workers.apply_async (dl_worker, dlque)
Проблема: для всех элементов загрузки используется одна очередь. Когда потоки dl приостанавливаются из-за ограничения скорости, любые элементы в очереди для второго домена зависают во время паузы в ожидании элементов первого домена.
Отклоненные решения
  • Наивным решением было бы, если бы рабочие потоки циклически проходили всю очередь, когда они достигают отсрочки для домена1. Что-то вроде:

Код: Выделить всё

def dl_worker () :
while true :
item = dlque.get ()
if paused (item.domain) :   # waiting due to rate limit
dlque.put (item)        # move item to back of queue
continue

# process item...
Это расточительно. Если очередь содержит только элементы из домена1, то рабочие будут постоянно перемещать элементы спереди назад во время периода отсрочки. Хотите избежать решений, требующих ожидания.
  • Другой вариант — создать отдельный пул рабочих для каждого домена. Это изолировало бы паузы только для работников этого домена. Но домены могут быть довольно большими, что приводит к огромному количеству потоков и истощению ресурсов. И очередь становится более сложной - нужен диспетчер, который будет потреблять элементы из входящей очереди и распределять их по отдельным очередям для каждого пула потоков/домена.
  • Можно также использовать Сторонние библиотеки, такие какpyrate-limiter илиratelimit, как показано здесь. Но это нарушает мое требование только к stdlib и решает только проблему ограничения скорости, а не проблему нескольких доменов.
Другое Решения
Основная проблема текущего решения — единая очередь. Я придумал еще несколько подходов, но все они имеют недостатки:
  • Использовать отдельную очередь для каждого домена. Кажется, не существует способа ожидания () в нескольких очередях в Python (например, select в дескрипторах файлов). Мне приходится вручную опрашивать все очереди, чтобы узнать, какие из них готовы. Что-то вроде (представьте подходящие блокировки резьбы там, где это необходимо):

Код: Выделить всё

allques = {}

def put (item):
que = allques.setdefault (item.domain, queue.Queue ())
que.put (item)

def dl_worker (que):
while true :
# find which queues have items ready to process
active = [ x for x in allques if x not in paused ]
ready  = [ a for a in active if me.allques [a].qsize () ]

if ready :
for domain in ready :
try :
item = me.allques [domain].get (block = false)
# process item ...
break
except Empty :
# qsize was wrong, no biggie, move on to next queue
pass

else :
# wait and poll again
time.sleep (0.5)
Мне не нравится опрос о сне. Вместо этого я мог бы использовать threading.Semaphore для отслеживания количества текущих элементов во всех очередях (вызовите semaphore.release для каждого que.put и semaphore.acquire перед попыткой que.get). Но это не поможет, если все очереди в настоящее время находятся в режиме отсрочки ограничения скорости: dl_worker будет занят ожиданием выполнения цикла while, а Ready будет пустым списком на каждом проходе.
Использование отдельных семафоров для каждой очереди создает одну и ту же проблему опроса.
Этот подход также не сохраняет порядок элементов dl. Просто берет первый элемент из первой найденной очереди готовых. Я мог бы использовать random.shuffle, чтобы хотя бы рандомизировать, из какой очереди выбирается. Сохранение порядка элементов кажется трудным. Потребуется отдельная структура данных для отслеживания порядка вставки во всех очередях. Кажется, больше проблем, чем пользы.
  • Я мог бы использовать две очереди: активную и приостановленную. Элементы извлекаются из активной очереди. Если домен находится в периоде отсрочки, сохраните элемент в приостановленной очереди до истечения срока отсрочки. Я думаю, что здесь та же проблема. А именно, мне нужен поток диспетчера, который будет следить за приостановленной очередью и перемещать элементы обратно в активную очередь по истечении периода отсрочки.
Что происходит, когда второй элемент приостановлен очередь истекает раньше, чем первый элемент? Он будет зависать в очереди до тех пор, пока первый элемент не будет удален и снова не помещен в активную очередь.
Или мне нужна структура данных, не относящаяся к очереди, чтобы я мог выполнить любой предмет готов. Но тогда мне снова понадобится опрос сна (блокировка вызова get недоступна).
  • Я мог бы использовать другую структуру данных, чем очередь для фильтрации элементов, в настоящее время находящихся в периоде отсрочки. Хотя я не уверен, какая это будет структура. PriorityQueue, похоже, не помогает.
Во-первых, его приоритеты статичны. Мои приоритеты динамичны: достижение предела скорости запускает паузу, и пауза может увеличиваться, пока элементы ожидают (например, два работника захватывают элементы из одного и того же домена одновременно. Первый заканчивается и запускает паузу, второй заканчивается позже и удлиняет паузу).< /p>
Во-вторых, PriorityQueue всегда возвращает элемент, если он доступен. Я не могу установить период отсрочки, чтобы сказать «держать этот элемент в очереди до времени X, а затем вернуть его».
  • Я мог бы использовать кучу вместо очереди для получения любого объекта. Но сложно поддерживать кучу в отсортированном порядке. По мере того, как происходят загрузки и срабатывают ограничения скорости, приоритет элемента меняется динамически. Использование кучи каждый раз, когда происходит откат, кажется неэффективным. И кучи Python не блокируют get(), они либо возвращают, либо выбрасывают. Все еще нужен опрос time.sleep.
Вывод
Пока вариант 1 кажется лучшим решением, несмотря на его недостатки. Есть ли у кого-нибудь идеи получше?
Я по своей сути не привязан к модели потоков. Асинхронность тоже может работать. Я ненавижу засорять свой код мусором async/await и проблемой с красной/синей функцией. Но если есть более чистое решение, его стоит рассмотреть.

Подробнее здесь: https://stackoverflow.com/questions/792 ... tdlib-only
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «Python»