Как я могу управлять несколькими устройствами при их подключении и отключении с помощью evdev?Python

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 Как я могу управлять несколькими устройствами при их подключении и отключении с помощью evdev?

Сообщение Anonymous »

Я пытаюсь использовать evdev для асинхронного чтения входных событий в Linux, но мне нужно обрабатывать подключаемые и отключаемые устройства. Чтобы сделать мою систему относительно общей и совместимой с несколькими устройствами, мне нужен поток управления, который может отправлять или отменять потоки для устройств при обнаружении изменений. Я написал для этого несколько разных прототипов, и у каждого из них есть свои проблемы:
Первый использует наиболее очевидный подход, но, похоже, поток изменения устройства не имеет разрешения на добавление другие вызывают сбой при подключении устройств.
import asyncio
import evdev
import pyudev

# Function to handle device changes (connection/disconnection)
def handle_device_changes(devices, device_tasks):
current_devices = [evdev.InputDevice(path) for path in evdev.list_devices()]
added_devices = [device for device in current_devices if device not in devices]
removed_devices = [device for device in devices if device not in current_devices]
devices = current_devices
for device in added_devices:
print(f"Added device: {device.name}")
# Perform any necessary actions for added devices
if "GuliKit" in device.name:
device.grab()
task = asyncio.create_task(print_events(device))
device_tasks[device] = task
for device in removed_devices:
print(f"Removed device: {device.name}")
# Perform any necessary actions for removed devices
if device in device_tasks:
task = device_tasks[device]
task.cancel()
del device_tasks[device]
return devices

# Task created for each grabbed device
async def print_events(device):
try:
async for event in device.async_read_loop():
print(device.path, evdev.categorize(event), sep=': ')
except OSError:
print(f"Device {device.name} disconnected.")
#handle_device_changes()

# Entry point of the program
async def main():
devices = []
device_tasks = {}
devices = handle_device_changes(devices, device_tasks)
queue = asyncio.Queue()

monitor = pyudev.Monitor.from_netlink(pyudev.Context())
monitor.filter_by(subsystem='input')
def log_event(action, device):
queue.put(handle_device_changes(devices, device_tasks))
observer = pyudev.MonitorObserver(monitor, log_event)
observer.start()

try:
while True:
await queue.get()
except KeyboardInterrupt:
print("Keyboard interrupt received.")
finally:
observer.stop()
await asyncio.gather(*device_tasks.values(), return_exceptions=True)

if __name__ == '__main__':
asyncio.run(main(),debug=True)

Вторым была моя попытка переписать эту идею по-новому, но я понятия не имею, что в ней не так, и даже не могу интерпретировать ошибку.
import evdev
import asyncio
import pyudev

# Function to handle device events
async def handle_event(device):
async for event in device.async_read_loop():
# Process the event here
print("Event:", event)

# Function to handle device changes (connection/disconnection)
def handle_device_changes(devices, device_tasks):
current_devices = [evdev.InputDevice(path) for path in evdev.list_devices()]
added_devices = [device for device in current_devices if device not in devices]
removed_devices = [device for device in devices if device not in current_devices]
devices = current_devices

for device in added_devices:
print(f"Added device: {device.name}")
# Create a new event listener task for the added device
event_loop = asyncio.get_event_loop()
event_task = event_loop.create_task(handle_event(device))
device_tasks[device.path] = event_task
if "GuliKit" in device.name:
device.grab()

for device in removed_devices:
print(f"Removed device: {device.name}")
# Cancel and clean up the event listener task associated with the removed device
event_task = device_tasks.pop(device.path)
event_task.cancel()

return devices

async def main():
devices = []
device_tasks = {}

# Initial setup: handle device changes to set up event listeners
devices = handle_device_changes(devices, device_tasks)

# Monitor for device changes
context = pyudev.Context()
monitor = pyudev.Monitor.from_netlink(context)
monitor.filter_by(subsystem='input')

loop = asyncio.get_event_loop()
while True:
try:
async for device_event in loop.run_in_executor(None, monitor.poll):
# Handle device changes
devices = handle_device_changes(devices, device_tasks)
except KeyboardInterrupt:
break

if __name__ == '__main__':
asyncio.run(main())

Третьим была моя попытка использовать чатгпт, чтобы сохранить эту штуку. Я не знаю, что он делает, но на самом деле он довольно близок к работе и может точно вести список устройств. Вот только они не сообщают о событиях.
import evdev
import selectors
import pyudev

# Function to handle device events
def handle_event(device, selector):
for event in device.read():
# Process the event here
print("event")

# Function to handle device changes (connection/disconnection)
def handle_device_changes(devices, device_tasks, selector):
current_devices = [evdev.InputDevice(path) for path in evdev.list_devices()]
added_devices = [device for device in current_devices if device not in devices]
removed_devices = [device for device in devices if device not in current_devices]
devices = current_devices

for device in added_devices:
print(f"Added device: {device.name}")
# Create a new event listener task for the added device
event_listener = selectors.EVENT_READ | selectors.EVENT_WRITE
event_task = selector.register(device.fd, event_listener, handle_event)
device_tasks[device.path] = event_task

for device in removed_devices:
print(f"Removed device: {device.name}")
# Destroy the event listener task associated with the removed device
event_task = device_tasks.pop(device.path)
selector.unregister(device.fd)

return devices

def main():
devices = []
device_tasks = {}
selector = selectors.DefaultSelector()

# Initial setup: handle device changes to set up event listeners
devices = handle_device_changes(devices, device_tasks, selector)

# Monitor for device changes
context = pyudev.Context()
monitor = pyudev.Monitor.from_netlink(context)
monitor.filter_by(subsystem='input')

for device_event in iter(monitor.poll, None):
# Handle device changes
devices = handle_device_changes(devices, device_tasks, selector)

if __name__ == '__main__':
main()

Четвертая и последняя версия взята из сообщения здесь, хотя, похоже, она использует старые устаревшие функции, поэтому больше не работает.
import functools
import pyudev

from evdev import InputDevice
from select import select

context = pyudev.Context()
monitor = pyudev.Monitor.from_netlink(context)
monitor.filter_by(subsystem='input')
monitor.start()

fds = {monitor.fileno(): monitor}
finalizers = []

while True:
r, w, x = select(fds, [], [])

if monitor.fileno() in r:
r.remove(monitor.fileno())

for udev in iter(functools.partial(monitor.poll, 0), None):
# we're only interested in devices that have a device node
# (e.g. /dev/input/eventX)
if not udev.device_node:
break

# find the device we're interested in and add it to fds
for name in (i['NAME'] for i in udev.ancestors if 'NAME' in i):
# I used a virtual input device for this test - you
# should adapt this to your needs
if u'GuliKit' in name:
if udev.action == u'add':
print('Device added: %s' % udev)
print(udev.device_node)
fds[dev.fd] = InputDevice(udev.device_node)
break
if udev.action == u'remove':
print('Device removed: %s' % udev)
def helper():
global fds
fds = {monitor.fileno(): monitor}
finalizers.append(helper)
break

for fd in r:
dev = fds[fd]
for event in dev.read():
print(event)

for i in range(len(finalizers)):
finalizers.pop()()


Подробнее здесь: https://stackoverflow.com/questions/764 ... with-evdev
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «Python»