Как интегрировать пользовательскую задачу в исполнитель айограммы?Python

Программы на Python
Ответить
Anonymous
 Как интегрировать пользовательскую задачу в исполнитель айограммы?

Сообщение Anonymous »


import asyncio
from threading import Thread
from datetime import datetime
from aiogram import Bot, Dispatcher, executor, types

API_TOKEN = ''

bot = Bot(token=API_TOKEN)
dp = Dispatcher(bot)

chat_ids = {}

@dp.message_handler()
async def echo(message: types.Message):
# old style:
# await bot.send_message(message.chat.id, message.text)

chat_ids[message.message_id] = message.from_user
text = f'{message.message_id} {message.from_user} {message.text}'
await message.reply(text, reply=False)

async def periodic(sleep_for, queue):
while True:
await asyncio.sleep(sleep_for)
now = datetime.utcnow()
print(f"{now}")
for id in chat_ids:
queue.put_nowait((id, f"{now}"))
# await bot.send_message(id, f"{now}", disable_notification=True)

def run_tick(queue):
newloop = asyncio.new_event_loop()
asyncio.set_event_loop(newloop)
asyncio.run(periodic(3, queue))

if __name__ == '__main__':
queue = asyncio.Queue()
Thread(target=run_tick, args=(queue,), daemon=True).start()
executor.start_polling(dp, skip_updates=True)


Я хочу отправлять сообщения зарегистрированным пользователям с помощью bot.send_message, когда происходит событие, но на данный момент это не удалось.
Вот что я попробовал.
  • bot.send_message аварийно завершает работу, поскольку он вызывается из другого потока. (Менеджер контекста тайм-аута должен использоваться внутри задачи)
  • Итак, я попытался обойти эту проблему с помощью очереди, но нет возможности добавить свою собственную задачу в исполнитель.
Есть ли простой способ сделать это?



Редактировать: 3 января 2020 г.

Вот рабочий пример согласно @user4815162342.

import asyncio
from datetime import datetime
from aiogram import Bot, Dispatcher, executor, types

API_TOKEN = ''

bot = Bot(token=API_TOKEN)
dp = Dispatcher(bot)

chat_ids = {}

@dp.message_handler()
async def echo(message: types.Message):
chat_ids[message.from_user.id] = message.from_user
text = f'{message.message_id} {message.from_user} {message.text}'
await message.reply(text, reply=False)

async def periodic(sleep_for):
while True:
await asyncio.sleep(sleep_for)
now = datetime.utcnow()
print(f"{now}")
for id in chat_ids:
await bot.send_message(id, f"{now}", disable_notification=True)

if __name__ == '__main__':
dp.loop.create_task(periodic(10))
executor.start_polling(dp)



Подробнее здесь: https://stackoverflow.com/questions/595 ... m-executor
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «Python»