Асинхронные функции с дросселем в Python AsyncioPython

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 Асинхронные функции с дросселем в Python Asyncio

Сообщение Anonymous »

У меня есть список awaitables , который я хочу передать в Asyncio.abstractEventLoop , но мне нужно привести к углублению запросов сторонним API. Какие варианты у меня есть? Semaphores и Threadpools ограничат, сколько запускается одновременно, но это не моя проблема. Мне нужно запустить мои запросы до 100 /с, но не имеет значения, сколько времени требуется для выполнения запроса. Предполагается, что это должно дроссель на 100/с, но дроссели в 116,651/с. Как лучше всего запустить планирование асинхронного запроса в Asyncio < /strong>?import asyncio
from threading import Lock

class PTBNL:

def __init__(self):
self._req_id_seq = 0
self._futures = {}
self._results = {}
self.token_bucket = TokenBucket()
self.token_bucket.set_rate(100)

def run(self, *awaitables):

loop = asyncio.get_event_loop()

if not awaitables:
loop.run_forever()
elif len(awaitables) == 1:
return loop.run_until_complete(*awaitables)
else:
future = asyncio.gather(*awaitables)
return loop.run_until_complete(future)

def sleep(self, secs) -> True:

self.run(asyncio.sleep(secs))
return True

def get_req_id(self) -> int:

new_id = self._req_id_seq
self._req_id_seq += 1
return new_id

def start_req(self, key):

loop = asyncio.get_event_loop()
future = loop.create_future()
self._futures[key] = future
return future

def end_req(self, key, result=None):

future = self._futures.pop(key, None)
if future:
if result is None:
result = self._results.pop(key, [])
if not future.done():
future.set_result(result)

def req_data(self, req_id, obj):
# Do Some Work Here
self.req_data_end(req_id)
pass

def req_data_end(self, req_id):
print(req_id, " has ended")
self.end_req(req_id)

async def req_data_async(self, obj):

req_id = self.get_req_id()
future = self.start_req(req_id)

self.req_data(req_id, obj)

await future
return future.result()

async def req_data_batch_async(self, contracts):

futures = []
FLAG = False

for contract in contracts:
req_id = self.get_req_id()
future = self.start_req(req_id)
futures.append(future)

nap = self.token_bucket.consume(1)

if FLAG is False:
FLAG = True
start = asyncio.get_event_loop().time()

asyncio.get_event_loop().call_later(nap, self.req_data, req_id, contract)

await asyncio.gather(*futures)
elapsed = asyncio.get_event_loop().time() - start

return futures, len(contracts)/elapsed

class TokenBucket:

def __init__(self):
self.tokens = 0
self.rate = 0
self.last = asyncio.get_event_loop().time()
self.lock = Lock()

def set_rate(self, rate):
with self.lock:
self.rate = rate
self.tokens = self.rate

def consume(self, tokens):
with self.lock:
if not self.rate:
return 0

now = asyncio.get_event_loop().time()
lapse = now - self.last
self.last = now
self.tokens += lapse * self.rate

if self.tokens > self.rate:
self.tokens = self.rate

self.tokens -= tokens

if self.tokens >= 0:
return 0
else:
return -self.tokens / self.rate

if __name__ == '__main__':

asyncio.get_event_loop().set_debug(True)
app = PTBNL()

objs = [obj for obj in range(500)]

l,t = app.run(app.req_data_batch_async(objs))

print(l)
print(t)
< /code>

Редактировать: я добавил простой пример Trottletestapp < ​​/code> здесь, используя семафоры, но все еще не могу запустить выполнение: < /p>

import asyncio
import time

class ThrottleTestApp:

def __init__(self):
self._req_id_seq = 0
self._futures = {}
self._results = {}
self.sem = asyncio.Semaphore()

async def allow_requests(self, sem):
"""Permit 100 requests per second; call
loop.create_task(allow_requests())
at the beginning of the program to start this routine. That call returns
a task handle that can be canceled to end this routine.

asyncio.Semaphore doesn't give us a great way to get at the value other
than accessing sem._value. We do that here, but creating a wrapper that
adds a current_value method would make this cleaner"""

while True:
while sem._value < 100: sem.release()
await asyncio.sleep(1) # Or spread more evenly
# with a shorter sleep and
# increasing the value less

async def do_request(self, req_id, obj):
await self.sem.acquire()

# this is the work for the request
self.req_data(req_id, obj)

def run(self, *awaitables):

loop = asyncio.get_event_loop()

if not awaitables:
loop.run_forever()
elif len(awaitables) == 1:
return loop.run_until_complete(*awaitables)
else:
future = asyncio.gather(*awaitables)
return loop.run_until_complete(future)

def sleep(self, secs: [float]=0.02) -> True:

self.run(asyncio.sleep(secs))
return True

def get_req_id(self) -> int:

new_id = self._req_id_seq
self._req_id_seq += 1
return new_id

def start_req(self, key):

loop = asyncio.get_event_loop()
future = loop.create_future()
self._futures[key] = future
return future

def end_req(self, key, result=None):

future = self._futures.pop(key, None)
if future:
if result is None:
result = self._results.pop(key, [])
if not future.done():
future.set_result(result)

def req_data(self, req_id, obj):
# This is the method that "does" something
self.req_data_end(req_id)
pass

def req_data_end(self, req_id):

print(req_id, " has ended")
self.end_req(req_id)

async def req_data_batch_async(self, objs):

futures = []
FLAG = False

for obj in objs:
req_id = self.get_req_id()
future = self.start_req(req_id)
futures.append(future)

if FLAG is False:
FLAG = True
start = time.time()

self.do_request(req_id, obj)

await asyncio.gather(*futures)
elapsed = time.time() - start
print("Roughly %s per second" % (len(objs)/elapsed))

return futures

if __name__ == '__main__':

asyncio.get_event_loop().set_debug(True)
app = ThrottleTestApp()

objs = [obj for obj in range(10000)]

app.run(app.req_data_batch_async(objs))


Подробнее здесь: https://stackoverflow.com/questions/454 ... on-asyncio
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение
  • Асинхронные функции с дросселем в Python Asyncio
    Anonymous » » в форуме Python
    0 Ответы
    2 Просмотры
    Последнее сообщение Anonymous
  • Чего ожидать при использовании CombineLatestStream с дросселем для потоков с разными частотами?
    Anonymous » » в форуме Android
    0 Ответы
    12 Просмотры
    Последнее сообщение Anonymous
  • Asyncio Async Funcitons вешает с Asyncio.gather. (Код работает без Asyncio.gather)
    Anonymous » » в форуме Python
    0 Ответы
    24 Просмотры
    Последнее сообщение Anonymous
  • Как эффективно обрабатывать гнездовые асинхронные операции с помощью Python asyncio lib?
    Anonymous » » в форуме Python
    0 Ответы
    22 Просмотры
    Последнее сообщение Anonymous
  • Все асинхронные работники ушли. Когда asyncio.run не возвращается?
    Anonymous » » в форуме Python
    0 Ответы
    9 Просмотры
    Последнее сообщение Anonymous

Вернуться в «Python»