Я пытаюсь создать в Python крючок мыши для обработки событий мыши. Моя цель — реализовать механизм, в котором события мыши обрабатываются асинхронно с тайм-аутом, позволяя приложению передавать событие следующему перехватчику, если обработка занимает слишком много времени.
В настоящее время перехватчик мыши Сначала работает, но после перемещения мыши в течение нескольких секунд приложение перестает обрабатывать сообщения. В результате события мыши блокируются до тех пор, пока я не закрою приложение Python. Я подозреваю, что проблема связана с тем, как я управляю циклом сообщений или обрабатываю события в обратном вызове asyncio. Кроме того, когда перехватчик перестает обрабатывать сообщения, журнал ошибок не регистрируется, указывающий на возникновение какой-либо проблемы.
Это код:
Я пытаюсь создать в Python крючок мыши для обработки событий мыши. Моя цель — реализовать механизм, в котором события мыши обрабатываются асинхронно с тайм-аутом, позволяя приложению передавать событие следующему перехватчику, если обработка занимает слишком много времени. В настоящее время перехватчик мыши Сначала работает, но после перемещения мыши в течение нескольких секунд приложение перестает обрабатывать сообщения. В результате события мыши блокируются до тех пор, пока я не закрою приложение Python. Я подозреваю, что проблема связана с тем, как я управляю циклом сообщений или обрабатываю события в обратном вызове asyncio. Кроме того, когда перехватчик перестает обрабатывать сообщения, журнал ошибок не регистрируется, указывающий на возникновение какой-либо проблемы. Это код: [code]import asyncio import ctypes import logging import multiprocessing from ctypes import wintypes from dataclasses import dataclass
from colorama import Fore, Style
logger = logging.getLogger(__name__)
@dataclass class MouseEvent: x: int y: int dx: int = 0 dy: int = 0
# Main application logic async def main(): # Create a pipe for IPC parent_conn, child_conn = multiprocessing.Pipe() mouse_hook = MouseHook(child_conn)
# Start the hook manager process hook_process = multiprocessing.Process(target=mouse_hook.register_hook) hook_process.start()
try: while True: # Listen for events from the hook manager if parent_conn.poll(): event = parent_conn.recv() try: # Run the async callback with a timeout result = await asyncio.wait_for(callback(event), timeout=0.01) if result: parent_conn.send("block") else: parent_conn.send("continue") except asyncio.TimeoutError: print("Callback exceeded time limit; continuing without waiting.") finally: # Send "continue" to allow the OS to process the event parent_conn.send("continue") except KeyboardInterrupt: pass finally: # Clean up hook_process.terminate() hook_process.join()
if __name__ == "__main__": asyncio.run(main()) [/code] Вот что я сделал на данный момент: [list] [*]Использовал многопроцессорность. < li>Канал для межпроцессного взаимодействия между основным приложением и процессом-перехватчиком. [*]Реализована асинхронная система обработки сообщений с тайм-аутом, чтобы гарантировать, что события мыши не блокируются на неопределенный срок. li> [*]Убедился, что CallNextHookEx вызывается в случае истечения времени ожидания или неожиданных ответов. [/list] Несмотря на эти усилия, перехватчик блокируется после непрерывного движения мыши.