Как правильно использовать asyncio.run_coroutine_threadsafe?Python

Программы на Python
Ответить
Anonymous
 Как правильно использовать asyncio.run_coroutine_threadsafe?

Сообщение Anonymous »

Коротко говоря, проблема в том, что будущее, возвращаемое asyncio.run_coroutine_threadsafe, блокируется, когда я вызываю Future.result()
Проблема также заключается в том, что задокументировано в следующем вопросе (на данный момент) без удовлетворительного ответа: Будущее из asyncio.run_coroutine_threadsafe зависает навсегда?
Я пытаюсь вызвать асинхронный код из кода синхронизации, где код синхронизации на самом деле сам заключен в асинхронный код с существующим циклом выполняемых событий (чтобы быть более конкретным: это блокнот Jupyter).
Я хотел бы отправлять асинхронные задачи из вложенной синхронизации код в существующий «внешний» цикл событий и «ожидайте» его результатов во вложенном коде синхронизации. Подразумеваемое ограничение: я не хочу запускать эти задачи в новом цикле событий (несколько причин).
Поскольку невозможно просто «ждать» асинхронного выполнения результат кода синхронизации без блокировки и без использования asyncio.run, который создает новый цикл событий, я подумал, что использование отдельного потока как-то поможет.
Из описания документации: asyncio.run_coroutine_threadsafe кажется идеальным кандидатом.
Но он все равно блокируется...
Ниже полный фрагмент с тайм-аутом, когда вызов будущего результата.
Как заставить этот код работать правильно?
import asyncio
from concurrent.futures import ThreadPoolExecutor

async def gather_coroutines(*coroutines):
return await asyncio.gather(*coroutines)

def run_th_safe(loop, coroutines):
future = asyncio.run_coroutine_threadsafe(gather_coroutines(*coroutines), loop)
res = future.result(timeout=3) # **** BLOCKING *****
return res

def async2sync(*coroutines):
try:
loop = asyncio.get_running_loop()
except RuntimeError:
return asyncio.run(gather_coroutines(*coroutines))

# BLOW DOESN'T WORK BECAUSE run_th_safe IS BLOCKING
with ThreadPoolExecutor(max_workers=1) as ex:
thread_future = ex.submit(run_th_safe, loop, coroutines)
return thread_future.result()

# Testing
async def some_async_task(n):
"""Some async function to test"""
print('Task running with n =', n)
await asyncio.sleep(n/10)
print('Inside coro', n)
return list(range(n))

async def main_async():
coro3 = some_async_task(30)
coro1 = some_async_task(10)
coro2 = some_async_task(20)
results = async2sync(coro3, coro1, coro2)
return results

def main_sync():
coro3 = some_async_task(30)
coro1 = some_async_task(10)
coro2 = some_async_task(20)
results = async2sync(coro3, coro1, coro2)
return results

if __name__ == '__main__':
# Testing functionality with asyncio.run()
# This works
print(main_sync())

# Testing functionality with outer-loop (asyncio.run) and nested asyncio.run_coroutine_threadsafe
# **DOESN'T WORK**
print(asyncio.run(main_async()))



Подробнее здесь: https://stackoverflow.com/questions/659 ... -correctly
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «Python»