Параллельный (прокси) запрос и получение самого быстрого результатаPython

Программы на Python
Ответить
Anonymous
 Параллельный (прокси) запрос и получение самого быстрого результата

Сообщение Anonymous »

Я пытаюсь оптимизировать запросы через внешний прокси (ротатор). Иногда реакция быстрая, иногда очень медленная. Таким образом, идея состоит в том, чтобы отправить несколько запросов параллельно на один и тот же URL-запрос, получить самый быстрый ответ, вернуть данные, закрыть функцию, не дожидаясь другого, более медленного ответа(ов).
В Интернете есть много учебных пособий и вопросов SO, касающихся параллельных запросов в Python, но все они предназначены для параллельных запросов разных запросов, а не для дублирующего запроса. Кроме того, код ожидает завершения всех запросов. Я хочу отключить логику параллельных запросов (желательно чистым способом), как только ответит самый быстрый ответ.
Мое приложение работает в Python Flask и работает с Gunicorn + Eventlet. Я пробовал зеленые пулы Eventlet и Python Concurrent Futures, но использование Eventlet Greenpool кажется более подходящим, поскольку код будет работать в рабочих Gunicorn + Eventlet и Celery с рабочими Eventlet.
В настоящее время я использую Luminati Proxy Manager (LPM) для повторения неудачных запросов. Старая версия вроде бы по умолчанию поддерживала параллельные запросы, но текущие версии больше не поддерживают эту функцию. Поэтому я либо пытаюсь решить эту проблему с помощью кода в своем приложении Python, либо добавляю другой сервис/инструмент (например, LPM), который обрабатывает параллельные запросы и выбирает самый быстрый.
Прокси-сервис Luminati.io предоставляет пример кода «высокопроизводительного параллельного запроса» (на основе Eventlet Greenpool). См. «исходный пример».
Я отредактировал код без прокси-сервера и входа в систему, чтобы сделать его более повторяемым и избежать непредсказуемого времени ответа прокси-сервера. Я не получаю никакой поддержки от Luminati, поэтому пытаюсь выяснить это на SO.
Для этого теста я использую имитацию медленного 5-секундного ответа и быстрого ответа от httpstat.us:
['http://httpstat.us/200?sleep=5000','htt ... tat.us/200']

В отредактированный код я добавил операторы печати с указанием времени, чтобы увидеть, какой ответ придет первым.
У меня возникли две проблемы с этим кодом. Иногда я вижу, что сначала возвращается быстрый ответ, и он печатает данные ответа («ОК»), а медленный ответ - через 5 секунд. Однако часто кажется, что код ждет, пока не вернутся оба ответа (оба времени одинаковы).
Другая проблема заключается в том, что, хотя я могу сразу же распечатать и просмотреть данные «быстрого» ответа, логика все равно ждет, пока все ответы не будут завершены. Я хотел бы вернуть данные и закрыть функцию, как только придет первый ответ. В моем отредактированном коде вы можете увидеть некоторый код (закомментированные строки), в котором я безуспешно пытался завершить процесс (однако это просто перезапускает процесс события).
Исходный пример
import eventlet
from eventlet.green.urllib import request
import random
import socket

super_proxy = socket.gethostbyname('zproxy.lum-superproxy.io')

class SingleSessionRetriever:

url = "http://%s-session-%s:%s@"+super_proxy+":%d"
port = 22225

def __init__(self, username, password, requests_limit, failures_limit):
self._username = username
self._password = password
self._requests_limit = requests_limit
self._failures_limit = failures_limit
self._reset_session()

def _reset_session(self):
session_id = random.random()
proxy = SingleSessionRetriever.url % (self._username, session_id, self._password,
SingleSessionRetriever.port)
proxy_handler = request.ProxyHandler({'http': proxy, 'https': proxy})
self._opener = request.build_opener(proxy_handler)
self._requests = 0
self._failures = 0

def retrieve(self, url, timeout):
while True:
if self._requests == self._requests_limit:
self._reset_session()
self._requests += 1
try:
timer = eventlet.Timeout(timeout)
result = self._opener.open(url).read()
timer.cancel()
return result
except:
timer.cancel()
self._failures += 1
if self._failures == self._failures_limit:
self._reset_session()

class MultiSessionRetriever:

def __init__(self, username, password, session_requests_limit, session_failures_limit):
self._username = username
self._password = password
self._sessions_stack = []
self._session_requests_limit = session_requests_limit
self._session_failures_limit = session_failures_limit

def retrieve(self, urls, timeout, parallel_sessions_limit, callback):
pool = eventlet.GreenPool(parallel_sessions_limit)
for url, body in pool.imap(lambda url: self._retrieve_single(url, timeout), urls):
callback(url, body)

def _retrieve_single(self, url, timeout):
if self._sessions_stack:
session = self._sessions_stack.pop()
else:
session = SingleSessionRetriever(self._username, self._password,
self._session_requests_limit, self._session_failures_limit)
body = session.retrieve(url, timeout)
self._sessions_stack.append(session)
return url, body

def output(url, body):
print(body)

n_total_req = 100
req_timeout = 10
n_parallel_exit_nodes = 10
switch_ip_every_n_req = 10
max_failures = 2

MultiSessionRetriever('lum-customer-c_ba028d72-zone-static', 'akssw3iy6h3y', switch_ip_every_n_req, max_failures).retrieve(
["http://lumtest.com/myip.json"] * n_total_req, req_timeout, n_parallel_exit_nodes, output)

Отредактированный код (без логинов и прокси)
def high_perf_parallel_requests(search_url):

try:
import datetime
from eventlet.green.urllib import request

results2 = []
results1 = []

class SingleSessionRetriever:

def __init__(self, username, password, requests_limit, failures_limit):
self._username = username
self._password = password
self._requests_limit = requests_limit
self._failures_limit = failures_limit
self._reset_session()

def _reset_session(self):

self._requests = 0
self._failures = 0

def retrieve(self, url, timeout):

print("\n SingleSessionRetriever.retrieve init")
print(url)
print(datetime.datetime.now())

while True:

if self._requests == self._requests_limit:
self._reset_session()
self._requests += 1
try:
timer = eventlet.Timeout(timeout)

result = request.urlopen(url).read()
print("\n SingleSessionRetriever.retrieve result")
print(url)
print(result)
print(datetime.datetime.now())

results1.append(result)

timer.cancel()
# eventlet.kill(pool)
# raise Exception("Got fastest result. Kill eventlet")
#eventlet.kill(self)
#pool.kill()
return result

except:
timer.cancel()
self._failures += 1
if self._failures == self._failures_limit:
self._reset_session()

class MultiSessionRetriever:

def __init__(self, username, password, session_requests_limit, session_failures_limit):
self._returned = False
self._username = username
self._password = password
self._sessions_stack = []
self._session_requests_limit = session_requests_limit
self._session_failures_limit = session_failures_limit

def retrieve(self, urls, timeout, parallel_sessions_limit, callback):
pool = eventlet.GreenPool(parallel_sessions_limit)
try:
# for url in urls:
# print("spawn {}".format(url))
# pool.spawn_n(self._retrieve_single(url, timeout))
#pool.waitall()
for url, body in pool.imap(lambda url: self._retrieve_single(url, timeout), urls):

if body:
print("\n MultiSessionRetriever.retrieve: Body received")
print(datetime.datetime.now())
# eventlet.Event.send_exception
#return body
#eventlet.kill(self)
# pool.kill()

print("\n MultiSessionRetriever.retrieve: in for loop")
print(url)
print(body)
print(datetime.datetime.now())
callback(url, body)

except Exception as e:
# eventlet.kill(pool)
# eventlet.kill(self)
print(e)

print("\n MultiSessionRetriever.retrieve: after loop")
print(datetime.datetime.now())
# eventlet.kill(self)

def _retrieve_single(self, url, timeout):
print("\n MultiSessionRetriever._retrieve_single url:")
print(url)
print(datetime.datetime.now())
if self._sessions_stack:
session = self._sessions_stack.pop()
else:
session = SingleSessionRetriever(self._username, self._password,
self._session_requests_limit, self._session_failures_limit)
body = session.retrieve(url, timeout)
print("\n MultiSessionRetriever._retrieve_single body:")
print(body)
print(datetime.datetime.now())
self._sessions_stack.append(session)
return url, body

def output(url, body):
print("\n MultiSessionRetriever.output:")
print(url)
print(body)
print(datetime.datetime.now())
results2.append(body)

# n_total_req = 2
req_timeout = 10
n_parallel_exit_nodes = 2
switch_ip_every_n_req = 1
max_failures = 2

urls = ['http://httpstat.us/200?sleep=5000','htt ... tat.us/200']

print("start")
print(datetime.datetime.now())

x = MultiSessionRetriever('', '', switch_ip_every_n_req, max_failures).retrieve(
urls, req_timeout, n_parallel_exit_nodes, output)

print("result1:")
print(results1)

print("result2:")
print(results2)

return results2

Вывод в консоль (в качестве текста ответа я использовал два других URL-адреса, которые отвечают быстро и медленно).
web_1 | high_perf_parallel_requests: start
web_1 | start
web_1 | 2021-02-04 02:28:17.503574
web_1 |
web_1 | MultiSessionRetriever._retrieve_single url:
web_1 | http://httpstat.us/200?sleep=5000
web_1 | 2021-02-04 02:28:17.503903
web_1 |
web_1 | SingleSessionRetriever.retrieve init
web_1 | http://httpstat.us/200?sleep=5000
web_1 | 2021-02-04 02:28:17.503948
web_1 |
web_1 | MultiSessionRetriever._retrieve_single url:
web_1 | http://httpstat.us/200
web_1 | 2021-02-04 02:28:17.511720
web_1 |
web_1 | SingleSessionRetriever.retrieve init
web_1 | http://httpstat.us/200
web_1 | 2021-02-04 02:28:17.511783
web_1 |
web_1 | SingleSessionRetriever.retrieve result
web_1 | http://httpstat.us/200
web_1 | b'"fast response result"\n'
web_1 | 2021-02-04 02:28:18.269042
web_1 |
web_1 | MultiSessionRetriever._retrieve_single body:
web_1 | b'"fast response result"\n'
web_1 | 2021-02-04 02:28:18.269220
web_1 |
web_1 | SingleSessionRetriever.retrieve result
web_1 | http://httpstat.us/200?sleep=5000
web_1 | b'"slow response result"\n'
web_1 | 2021-02-04 02:28:24.458372
web_1 |
web_1 | MultiSessionRetriever._retrieve_single body:
web_1 | b'"slow response result"\n'
web_1 | 2021-02-04 02:28:24.458499
web_1 |
web_1 | MultiSessionRetriever.retrieve: Body received
web_1 | 2021-02-04 02:28:24.458814
web_1 |
web_1 | MultiSessionRetriever.retrieve: in for loop
web_1 | http://httpstat.us/200?sleep=5000
web_1 | b'"slow response result"\n'
web_1 | 2021-02-04 02:28:24.458857
web_1 |
web_1 | MultiSessionRetriever.output:
web_1 | http://httpstat.us/200?sleep=5000
web_1 | b'"slow response result"\n'
web_1 | 2021-02-04 02:28:24.458918
web_1 |
web_1 | MultiSessionRetriever.retrieve: Body received
web_1 | 2021-02-04 02:28:24.459057
web_1 |
web_1 | MultiSessionRetriever.retrieve: in for loop
web_1 | http://httpstat.us/200
web_1 | b'"fast response result"\n'
web_1 | 2021-02-04 02:28:24.459158
web_1 |
web_1 | MultiSessionRetriever.output:
web_1 | http://httpstat.us/200
web_1 | b'"fast response result"\n'
web_1 | 2021-02-04 02:28:24.459206
web_1 |
web_1 | MultiSessionRetriever.retrieve: after loop
web_1 | 2021-02-04 02:28:24.459482
web_1 | result1
web_1 | [b'"fast response result"\n', b'"slow response result"\n']
web_1 | result2
web_1 | [b'"slow response result"\n', b'"fast response result"\n']
web_1 | Parallel resp = [b'"slow response result"\n', b'"fast response result"\n']

Другие попытки с Eventlet и Concurrent Futures
def parallel_request(url):

fastest_result = None

try:
import datetime
import eventlet
from eventlet.green.urllib.request import urlopen

# urls = ["http://www.google.com/intl/en_ALL/images/logo.gif",
# "https://www.python.org/static/img/python-logo.png",
# "http://us.i1.yimg.com/us.yimg.com/i/ww/beta/y3.gif"]

urls = ['http://httpstat.us/200?sleep=5000','htt ... tat.us/200']

def fetch(url):
print("\n Fetch start")
print(url)
print(datetime.datetime.now())
result = urlopen(url).read()
print("\n Fetch result")
print(result)
print(datetime.datetime.now())

return result

pool = eventlet.GreenPool()
print("\n Parallel start")
print(datetime.datetime.now())
for body in pool.imap(fetch, urls):
print("\n Pool result")
print(body)
print(datetime.datetime.now())

print("\n Parallel end")
print(datetime.datetime.now())

except Exception as e:
print(e)

print("Fastest result= {}".format(fastest_result))

Фьючерсы
def request_futures(url):

try:
import datetime
import concurrent.futures
import urllib.request

urls = ['http://httpstat.us/200?sleep=5000','htt ... tat.us/200']

print("\n Start Futures")
print(datetime.datetime.now())

# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
with urllib.request.urlopen(url, timeout=timeout) as conn:
print("\n load url")
print(datetime.datetime.now())
result = conn.read()
print(result)
print(datetime.datetime.now())

return result

# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor() as executor:
# Start the load operations and mark each future with its URL
future_to_url = {executor.submit(load_url, url, 60): url for url in urls}
for future in concurrent.futures.as_completed(future_to_url):
print("\n Iterate future")
print(datetime.datetime.now())

url = future_to_url[future]
try:
print("\n Try future")
print(url)
print(datetime.datetime.now())
data = future.result()
print("\n Data future")
print(data)
print(datetime.datetime.now())

except Exception as exc:
print('%r generated an exception: %s' % (url, exc))
else:
print('%r page is %d bytes' % (url, len(data)))

print("\n End Futures")
print(datetime.datetime.now())

except Exception as e:
print(e)


Подробнее здесь: https://stackoverflow.com/questions/660 ... est-result
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «Python»