Регулирование асинхронных функций в Python AsyncioPython

Программы на Python
Ответить
Anonymous
 Регулирование асинхронных функций в Python Asyncio

Сообщение Anonymous »

У меня есть список ожидающих, который я хочу передать в asyncio.AbstractEventLoop, но мне нужно ограничить запросы к стороннему API.

Мне бы хотелось избежать чего-то, что ожидает передачи будущего в цикл, потому что тем временем я блокирую ожидание цикла. Какие у меня есть варианты? Семафоры и ThreadPools будут ограничивать количество одновременно работающих, но это не моя проблема. Мне нужно ограничить скорость моих запросов до 100 в секунду, но не имеет значения, сколько времени потребуется для выполнения запроса.

Это очень краткий (не)рабочий вариант. пример использования стандартной библиотеки, демонстрирующий проблему. Предполагается, что скорость будет дросселироваться со скоростью 100/сек, но она дросселируется со скоростью 116,651/сек. Как лучше всего ограничить планирование асинхронного запроса в asyncio?

Рабочий код:

import asyncio
from threading import Lock

class PTBNL:

def __init__(self):
self._req_id_seq = 0
self._futures = {}
self._results = {}
self.token_bucket = TokenBucket()
self.token_bucket.set_rate(100)

def run(self, *awaitables):

loop = asyncio.get_event_loop()

if not awaitables:
loop.run_forever()
elif len(awaitables) == 1:
return loop.run_until_complete(*awaitables)
else:
future = asyncio.gather(*awaitables)
return loop.run_until_complete(future)

def sleep(self, secs) -> True:

self.run(asyncio.sleep(secs))
return True

def get_req_id(self) -> int:

new_id = self._req_id_seq
self._req_id_seq += 1
return new_id

def start_req(self, key):

loop = asyncio.get_event_loop()
future = loop.create_future()
self._futures[key] = future
return future

def end_req(self, key, result=None):

future = self._futures.pop(key, None)
if future:
if result is None:
result = self._results.pop(key, [])
if not future.done():
future.set_result(result)

def req_data(self, req_id, obj):
# Do Some Work Here
self.req_data_end(req_id)
pass

def req_data_end(self, req_id):
print(req_id, " has ended")
self.end_req(req_id)

async def req_data_async(self, obj):

req_id = self.get_req_id()
future = self.start_req(req_id)

self.req_data(req_id, obj)

await future
return future.result()

async def req_data_batch_async(self, contracts):

futures = []
FLAG = False

for contract in contracts:
req_id = self.get_req_id()
future = self.start_req(req_id)
futures.append(future)

nap = self.token_bucket.consume(1)

if FLAG is False:
FLAG = True
start = asyncio.get_event_loop().time()

asyncio.get_event_loop().call_later(nap, self.req_data, req_id, contract)

await asyncio.gather(*futures)
elapsed = asyncio.get_event_loop().time() - start

return futures, len(contracts)/elapsed

class TokenBucket:

def __init__(self):
self.tokens = 0
self.rate = 0
self.last = asyncio.get_event_loop().time()
self.lock = Lock()

def set_rate(self, rate):
with self.lock:
self.rate = rate
self.tokens = self.rate

def consume(self, tokens):
with self.lock:
if not self.rate:
return 0

now = asyncio.get_event_loop().time()
lapse = now - self.last
self.last = now
self.tokens += lapse * self.rate

if self.tokens > self.rate:
self.tokens = self.rate

self.tokens -= tokens

if self.tokens >= 0:
return 0
else:
return -self.tokens / self.rate

if __name__ == '__main__':

asyncio.get_event_loop().set_debug(True)
app = PTBNL()

objs = [obj for obj in range(500)]

l,t = app.run(app.req_data_batch_async(objs))

print(l)
print(t)


Изменить: я добавил сюда простой пример TrottleTestApp с использованием семафоров, но все равно не могу ограничить выполнение:

import asyncio
import time

class ThrottleTestApp:

def __init__(self):
self._req_id_seq = 0
self._futures = {}
self._results = {}
self.sem = asyncio.Semaphore()

async def allow_requests(self, sem):
"""Permit 100 requests per second; call
loop.create_task(allow_requests())
at the beginning of the program to start this routine. That call returns
a task handle that can be canceled to end this routine.

asyncio.Semaphore doesn't give us a great way to get at the value other
than accessing sem._value. We do that here, but creating a wrapper that
adds a current_value method would make this cleaner"""

while True:
while sem._value < 100: sem.release()
await asyncio.sleep(1) # Or spread more evenly
# with a shorter sleep and
# increasing the value less

async def do_request(self, req_id, obj):
await self.sem.acquire()

# this is the work for the request
self.req_data(req_id, obj)

def run(self, *awaitables):

loop = asyncio.get_event_loop()

if not awaitables:
loop.run_forever()
elif len(awaitables) == 1:
return loop.run_until_complete(*awaitables)
else:
future = asyncio.gather(*awaitables)
return loop.run_until_complete(future)

def sleep(self, secs: [float]=0.02) -> True:

self.run(asyncio.sleep(secs))
return True

def get_req_id(self) -> int:

new_id = self._req_id_seq
self._req_id_seq += 1
return new_id

def start_req(self, key):

loop = asyncio.get_event_loop()
future = loop.create_future()
self._futures[key] = future
return future

def end_req(self, key, result=None):

future = self._futures.pop(key, None)
if future:
if result is None:
result = self._results.pop(key, [])
if not future.done():
future.set_result(result)

def req_data(self, req_id, obj):
# This is the method that "does" something
self.req_data_end(req_id)
pass

def req_data_end(self, req_id):

print(req_id, " has ended")
self.end_req(req_id)

async def req_data_batch_async(self, objs):

futures = []
FLAG = False

for obj in objs:
req_id = self.get_req_id()
future = self.start_req(req_id)
futures.append(future)

if FLAG is False:
FLAG = True
start = time.time()

self.do_request(req_id, obj)

await asyncio.gather(*futures)
elapsed = time.time() - start
print("Roughly %s per second" % (len(objs)/elapsed))

return futures

if __name__ == '__main__':

asyncio.get_event_loop().set_debug(True)
app = ThrottleTestApp()

objs = [obj for obj in range(10000)]

app.run(app.req_data_batch_async(objs))


Подробнее здесь: https://stackoverflow.com/questions/454 ... on-asyncio
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «Python»