Блокировка Python Asyncio при использовании очередейPython

Программы на Python
Ответить
Anonymous
 Блокировка Python Asyncio при использовании очередей

Сообщение Anonymous »

Я пытаюсь использовать asyncio как с синхронизацией (это будет остальная часть программы Python), так и с асинхронным блоком, и блок синхронизации отправляет данные через asyncio.queues.
Без очереди все работает нормально.
но когда я отправляю данные в очередь, кажется, что они блокируются.
Я пробую разные способы с get_nowait и т. д... но пока безуспешно .
import asyncio
import time

queue = asyncio.Queue()

async def processor() -> None:
print("Started proc")
while True:
print("waiting for quee")
msg = await queue.get()
print(f"Got command from queue: {msg}")
# do something
await asyncio.sleep(5)

def run_sync(url: str) -> int:
while 1:
print("Sending HTTP request")
input("enter to send message to queue\n")
queue.put_nowait(url)

#do other work
time.sleep(10)

async def run_sync_threaded( url: str) -> int:
return await asyncio.to_thread(run_sync, url)

async def main() -> None:

await asyncio.gather(
processor(),
run_sync_threaded("https://www.example.com"),
)

asyncio.run(main())

РЕДАКТИРОВАТЬ:
Это работает, но выглядит как обходной путь, а не как правильное решение. Не знаю, ощущения не очень стабильные
import asyncio
import time

queue = asyncio.Queue()
async def processor() -> None:
print("Started proc")
while True:
print("waiting for quee")
msg = await queue.get()
print(f"Got command from queue: {msg}")
# do something
await asyncio.sleep(5)

async def async_send(url):
print(f'Adding {url} to queue')
queue.put_nowait(url)

def send(url, loop):
asyncio.run_coroutine_threadsafe(async_send(url), loop)

def run_sync(url: str, loop) -> int:
while 1:
input("enter to send message to queue\n")
send(url, loop)
#do other work
time.sleep(3)

async def run_sync_threaded( url: str, loop) -> int:
return await asyncio.to_thread(run_sync, url, loop)

async def main() -> None:
loop = asyncio.get_event_loop()
t = asyncio.create_task( processor())
t2 = asyncio.create_task(run_sync_threaded("https://www.example.com", loop))

asyncio.gather(
await t,
await t2
)

# This does not work
# asyncio.gather(
# await processor(),
# await run_sync_threaded("https://www.example.com", loop)
# )

asyncio.run(main())


Подробнее здесь: https://stackoverflow.com/questions/733 ... ing-queues
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «Python»