Мне нужно собрать все чаты в моей учетной записи Telegram и проверить, читает ли кто -нибудь мои сообщения и войти в консоль. Я попробовал Messageread Event и некоторые типы низкого уровня, такие как GetPeerDialogSrequest, GetDialogSrequest и т. Д., Но они странно работают или вообще не работают. Я видел много примеров, но только для 1 чата, но мне нужно прочитать все свои чаты с асинхроком.events.MessageRead() handler (Telethon).
Subscribed to raw updates (events.Raw) to see UpdateReadHistoryOutbox/Inbox.
GetPeerDialogsRequest / GetDialogsRequest to check read_outbox_max_id as a poll fallback.
< /code>
Кэширование недавних идентификаторов исходящих сообщений при запуске для сопоставления. /> Минимальные фрагменты телетона (полезные части) < /p>
Кэш, исходящий при запуске < /li>
< /ol>
async def build_outgoing_cache(client, per_chat=200):
pending = defaultdict(dict) # pending[chat_id][msg_id] = {'date': dt, 'reported': False}
async for dialog in client.iter_dialogs(limit=None):
if not dialog.is_user:
continue
chat = dialog.entity
msgs = await client.get_messages(chat, limit=per_chat)
for m in msgs:
if m and getattr(m, 'out', False):
pending[dialog.id][m.id] = {'date': m.date, 'reported': False}
return pending
< /code>
Handler Messageread (примечание: event.get_sender () не существует для Messageread - используйте event.chat_id и client.get_entity): < /li>
< /ol>
@client.on(events.MessageRead())
async def on_message_read(event):
# ignore if this event is about you reading inbound messages
if getattr(event, 'inbox', False):
return
chat_id = event.chat_id
if chat_id is None:
return
entity = await client.get_entity(chat_id)
read_up_to = getattr(event, 'max_message_id_read', None) or (max(event.message_ids) if event.message_ids else None)
# now match read_up_to with your cached outgoing IDs
< /code>
Обработчик с обновлением RAW (обнаружение UpdateReadHistoryOutbox): < /li>
< /ol>
from telethon.tl import types as tl_types
@client.on(events.Raw)
async def raw_update(update):
if isinstance(update, tl_types.UpdateReadHistoryOutbox):
peer = update.peer
max_id = update.max_id
# resolve peer -> entity and map max_id to cached outgoing message ids
< /code>
Ответственный запас с использованием getpeerdialogsrequest: < /li>
< /ol>
res = await client(functions.messages.GetPeerDialogsRequest(peers=[entity]))
ro = getattr(res.dialogs[0], 'read_outbox_max_id', 0)
# if ro > last_seen -> process outgoing ids
Подробнее здесь: https://stackoverflow.com/questions/797 ... -telethonh
Как я могу обнаружить, если мое сообщение было прочитано на Telegram с помощью телетона ⇐ Python
-
- Похожие темы
- Ответы
- Просмотры
- Последнее сообщение
-
-
Как обеспечить получение кода проверки SMS от Telegram с помощью телетона?
Anonymous » » в форуме Python - 0 Ответы
- 12 Просмотры
-
Последнее сообщение Anonymous
-
-
-
Свойство LOCATION не может быть прочитано из целевой библиотеки libprotobuf.
Anonymous » » в форуме C++ - 0 Ответы
- 14 Просмотры
-
Последнее сообщение Anonymous
-