Я пытаюсь использовать evdev для асинхронного чтения входных событий в Linux, но мне нужно обрабатывать подключаемые и отключаемые устройства. Чтобы сделать мою систему относительно общей и совместимой с несколькими устройствами, мне нужен поток управления, который может отправлять или отменять потоки для устройств при обнаружении изменений. Я написал для этого несколько разных прототипов, и у каждого из них есть свои проблемы:
Первый использует наиболее очевидный подход, но, похоже, поток изменения устройства не имеет разрешения на добавление другие вызывают сбой при подключении устройств.
import asyncio
import evdev
import pyudev
# Function to handle device changes (connection/disconnection)
def handle_device_changes(devices, device_tasks):
current_devices = [evdev.InputDevice(path) for path in evdev.list_devices()]
added_devices = [device for device in current_devices if device not in devices]
removed_devices = [device for device in devices if device not in current_devices]
devices = current_devices
for device in added_devices:
print(f"Added device: {device.name}")
# Perform any necessary actions for added devices
if "GuliKit" in device.name:
device.grab()
task = asyncio.create_task(print_events(device))
device_tasks[device] = task
for device in removed_devices:
print(f"Removed device: {device.name}")
# Perform any necessary actions for removed devices
if device in device_tasks:
task = device_tasks[device]
task.cancel()
del device_tasks[device]
return devices
# Task created for each grabbed device
async def print_events(device):
try:
async for event in device.async_read_loop():
print(device.path, evdev.categorize(event), sep=': ')
except OSError:
print(f"Device {device.name} disconnected.")
#handle_device_changes()
# Entry point of the program
async def main():
devices = []
device_tasks = {}
devices = handle_device_changes(devices, device_tasks)
queue = asyncio.Queue()
monitor = pyudev.Monitor.from_netlink(pyudev.Context())
monitor.filter_by(subsystem='input')
def log_event(action, device):
queue.put(handle_device_changes(devices, device_tasks))
observer = pyudev.MonitorObserver(monitor, log_event)
observer.start()
try:
while True:
await queue.get()
except KeyboardInterrupt:
print("Keyboard interrupt received.")
finally:
observer.stop()
await asyncio.gather(*device_tasks.values(), return_exceptions=True)
if __name__ == '__main__':
asyncio.run(main(),debug=True)
Вторым была моя попытка переписать эту идею по-новому, но я понятия не имею, что в ней не так, и даже не могу интерпретировать ошибку.
import evdev
import asyncio
import pyudev
# Function to handle device events
async def handle_event(device):
async for event in device.async_read_loop():
# Process the event here
print("Event:", event)
# Function to handle device changes (connection/disconnection)
def handle_device_changes(devices, device_tasks):
current_devices = [evdev.InputDevice(path) for path in evdev.list_devices()]
added_devices = [device for device in current_devices if device not in devices]
removed_devices = [device for device in devices if device not in current_devices]
devices = current_devices
for device in added_devices:
print(f"Added device: {device.name}")
# Create a new event listener task for the added device
event_loop = asyncio.get_event_loop()
event_task = event_loop.create_task(handle_event(device))
device_tasks[device.path] = event_task
if "GuliKit" in device.name:
device.grab()
for device in removed_devices:
print(f"Removed device: {device.name}")
# Cancel and clean up the event listener task associated with the removed device
event_task = device_tasks.pop(device.path)
event_task.cancel()
return devices
async def main():
devices = []
device_tasks = {}
# Initial setup: handle device changes to set up event listeners
devices = handle_device_changes(devices, device_tasks)
# Monitor for device changes
context = pyudev.Context()
monitor = pyudev.Monitor.from_netlink(context)
monitor.filter_by(subsystem='input')
loop = asyncio.get_event_loop()
while True:
try:
async for device_event in loop.run_in_executor(None, monitor.poll):
# Handle device changes
devices = handle_device_changes(devices, device_tasks)
except KeyboardInterrupt:
break
if __name__ == '__main__':
asyncio.run(main())
Третьим была моя попытка использовать чатгпт, чтобы сохранить эту штуку. Я не знаю, что он делает, но на самом деле он довольно близок к работе и может точно вести список устройств. Вот только они не сообщают о событиях.
import evdev
import selectors
import pyudev
# Function to handle device events
def handle_event(device, selector):
for event in device.read():
# Process the event here
print("event")
# Function to handle device changes (connection/disconnection)
def handle_device_changes(devices, device_tasks, selector):
current_devices = [evdev.InputDevice(path) for path in evdev.list_devices()]
added_devices = [device for device in current_devices if device not in devices]
removed_devices = [device for device in devices if device not in current_devices]
devices = current_devices
for device in added_devices:
print(f"Added device: {device.name}")
# Create a new event listener task for the added device
event_listener = selectors.EVENT_READ | selectors.EVENT_WRITE
event_task = selector.register(device.fd, event_listener, handle_event)
device_tasks[device.path] = event_task
for device in removed_devices:
print(f"Removed device: {device.name}")
# Destroy the event listener task associated with the removed device
event_task = device_tasks.pop(device.path)
selector.unregister(device.fd)
return devices
def main():
devices = []
device_tasks = {}
selector = selectors.DefaultSelector()
# Initial setup: handle device changes to set up event listeners
devices = handle_device_changes(devices, device_tasks, selector)
# Monitor for device changes
context = pyudev.Context()
monitor = pyudev.Monitor.from_netlink(context)
monitor.filter_by(subsystem='input')
for device_event in iter(monitor.poll, None):
# Handle device changes
devices = handle_device_changes(devices, device_tasks, selector)
if __name__ == '__main__':
main()
Четвертая и последняя версия взята из сообщения здесь, хотя, похоже, она использует старые устаревшие функции, поэтому больше не работает.
import functools
import pyudev
from evdev import InputDevice
from select import select
context = pyudev.Context()
monitor = pyudev.Monitor.from_netlink(context)
monitor.filter_by(subsystem='input')
monitor.start()
fds = {monitor.fileno(): monitor}
finalizers = []
while True:
r, w, x = select(fds, [], [])
if monitor.fileno() in r:
r.remove(monitor.fileno())
for udev in iter(functools.partial(monitor.poll, 0), None):
# we're only interested in devices that have a device node
# (e.g. /dev/input/eventX)
if not udev.device_node:
break
# find the device we're interested in and add it to fds
for name in (i['NAME'] for i in udev.ancestors if 'NAME' in i):
# I used a virtual input device for this test - you
# should adapt this to your needs
if u'GuliKit' in name:
if udev.action == u'add':
print('Device added: %s' % udev)
print(udev.device_node)
fds[dev.fd] = InputDevice(udev.device_node)
break
if udev.action == u'remove':
print('Device removed: %s' % udev)
def helper():
global fds
fds = {monitor.fileno(): monitor}
finalizers.append(helper)
break
for fd in r:
dev = fds[fd]
for event in dev.read():
print(event)
for i in range(len(finalizers)):
finalizers.pop()()
Подробнее здесь: https://stackoverflow.com/questions/764 ... with-evdev
Как я могу управлять несколькими устройствами при их подключении и отключении с помощью evdev? ⇐ Python
Программы на Python
1737133099
Anonymous
Я пытаюсь использовать evdev для асинхронного чтения входных событий в Linux, но мне нужно обрабатывать подключаемые и отключаемые устройства. Чтобы сделать мою систему относительно общей и совместимой с несколькими устройствами, мне нужен поток управления, который может отправлять или отменять потоки для устройств при обнаружении изменений. Я написал для этого несколько разных прототипов, и у каждого из них есть свои проблемы:
Первый использует наиболее очевидный подход, но, похоже, поток изменения устройства не имеет разрешения на добавление другие вызывают сбой при подключении устройств.
import asyncio
import evdev
import pyudev
# Function to handle device changes (connection/disconnection)
def handle_device_changes(devices, device_tasks):
current_devices = [evdev.InputDevice(path) for path in evdev.list_devices()]
added_devices = [device for device in current_devices if device not in devices]
removed_devices = [device for device in devices if device not in current_devices]
devices = current_devices
for device in added_devices:
print(f"Added device: {device.name}")
# Perform any necessary actions for added devices
if "GuliKit" in device.name:
device.grab()
task = asyncio.create_task(print_events(device))
device_tasks[device] = task
for device in removed_devices:
print(f"Removed device: {device.name}")
# Perform any necessary actions for removed devices
if device in device_tasks:
task = device_tasks[device]
task.cancel()
del device_tasks[device]
return devices
# Task created for each grabbed device
async def print_events(device):
try:
async for event in device.async_read_loop():
print(device.path, evdev.categorize(event), sep=': ')
except OSError:
print(f"Device {device.name} disconnected.")
#handle_device_changes()
# Entry point of the program
async def main():
devices = []
device_tasks = {}
devices = handle_device_changes(devices, device_tasks)
queue = asyncio.Queue()
monitor = pyudev.Monitor.from_netlink(pyudev.Context())
monitor.filter_by(subsystem='input')
def log_event(action, device):
queue.put(handle_device_changes(devices, device_tasks))
observer = pyudev.MonitorObserver(monitor, log_event)
observer.start()
try:
while True:
await queue.get()
except KeyboardInterrupt:
print("Keyboard interrupt received.")
finally:
observer.stop()
await asyncio.gather(*device_tasks.values(), return_exceptions=True)
if __name__ == '__main__':
asyncio.run(main(),debug=True)
Вторым была моя попытка переписать эту идею по-новому, но я понятия не имею, что в ней не так, и даже не могу интерпретировать ошибку.
import evdev
import asyncio
import pyudev
# Function to handle device events
async def handle_event(device):
async for event in device.async_read_loop():
# Process the event here
print("Event:", event)
# Function to handle device changes (connection/disconnection)
def handle_device_changes(devices, device_tasks):
current_devices = [evdev.InputDevice(path) for path in evdev.list_devices()]
added_devices = [device for device in current_devices if device not in devices]
removed_devices = [device for device in devices if device not in current_devices]
devices = current_devices
for device in added_devices:
print(f"Added device: {device.name}")
# Create a new event listener task for the added device
event_loop = asyncio.get_event_loop()
event_task = event_loop.create_task(handle_event(device))
device_tasks[device.path] = event_task
if "GuliKit" in device.name:
device.grab()
for device in removed_devices:
print(f"Removed device: {device.name}")
# Cancel and clean up the event listener task associated with the removed device
event_task = device_tasks.pop(device.path)
event_task.cancel()
return devices
async def main():
devices = []
device_tasks = {}
# Initial setup: handle device changes to set up event listeners
devices = handle_device_changes(devices, device_tasks)
# Monitor for device changes
context = pyudev.Context()
monitor = pyudev.Monitor.from_netlink(context)
monitor.filter_by(subsystem='input')
loop = asyncio.get_event_loop()
while True:
try:
async for device_event in loop.run_in_executor(None, monitor.poll):
# Handle device changes
devices = handle_device_changes(devices, device_tasks)
except KeyboardInterrupt:
break
if __name__ == '__main__':
asyncio.run(main())
Третьим была моя попытка использовать чатгпт, чтобы сохранить эту штуку. Я не знаю, что он делает, но на самом деле он довольно близок к работе и может точно вести список устройств. Вот только они не сообщают о событиях.
import evdev
import selectors
import pyudev
# Function to handle device events
def handle_event(device, selector):
for event in device.read():
# Process the event here
print("event")
# Function to handle device changes (connection/disconnection)
def handle_device_changes(devices, device_tasks, selector):
current_devices = [evdev.InputDevice(path) for path in evdev.list_devices()]
added_devices = [device for device in current_devices if device not in devices]
removed_devices = [device for device in devices if device not in current_devices]
devices = current_devices
for device in added_devices:
print(f"Added device: {device.name}")
# Create a new event listener task for the added device
event_listener = selectors.EVENT_READ | selectors.EVENT_WRITE
event_task = selector.register(device.fd, event_listener, handle_event)
device_tasks[device.path] = event_task
for device in removed_devices:
print(f"Removed device: {device.name}")
# Destroy the event listener task associated with the removed device
event_task = device_tasks.pop(device.path)
selector.unregister(device.fd)
return devices
def main():
devices = []
device_tasks = {}
selector = selectors.DefaultSelector()
# Initial setup: handle device changes to set up event listeners
devices = handle_device_changes(devices, device_tasks, selector)
# Monitor for device changes
context = pyudev.Context()
monitor = pyudev.Monitor.from_netlink(context)
monitor.filter_by(subsystem='input')
for device_event in iter(monitor.poll, None):
# Handle device changes
devices = handle_device_changes(devices, device_tasks, selector)
if __name__ == '__main__':
main()
Четвертая и последняя версия взята из сообщения здесь, хотя, похоже, она использует старые устаревшие функции, поэтому больше не работает.
import functools
import pyudev
from evdev import InputDevice
from select import select
context = pyudev.Context()
monitor = pyudev.Monitor.from_netlink(context)
monitor.filter_by(subsystem='input')
monitor.start()
fds = {monitor.fileno(): monitor}
finalizers = []
while True:
r, w, x = select(fds, [], [])
if monitor.fileno() in r:
r.remove(monitor.fileno())
for udev in iter(functools.partial(monitor.poll, 0), None):
# we're only interested in devices that have a device node
# (e.g. /dev/input/eventX)
if not udev.device_node:
break
# find the device we're interested in and add it to fds
for name in (i['NAME'] for i in udev.ancestors if 'NAME' in i):
# I used a virtual input device for this test - you
# should adapt this to your needs
if u'GuliKit' in name:
if udev.action == u'add':
print('Device added: %s' % udev)
print(udev.device_node)
fds[dev.fd] = InputDevice(udev.device_node)
break
if udev.action == u'remove':
print('Device removed: %s' % udev)
def helper():
global fds
fds = {monitor.fileno(): monitor}
finalizers.append(helper)
break
for fd in r:
dev = fds[fd]
for event in dev.read():
print(event)
for i in range(len(finalizers)):
finalizers.pop()()
Подробнее здесь: [url]https://stackoverflow.com/questions/76495215/how-can-i-handle-multiple-devices-as-they-connect-and-disconnect-with-evdev[/url]
Ответить
1 сообщение
• Страница 1 из 1
Перейти
- Кемерово-IT
- ↳ Javascript
- ↳ C#
- ↳ JAVA
- ↳ Elasticsearch aggregation
- ↳ Python
- ↳ Php
- ↳ Android
- ↳ Html
- ↳ Jquery
- ↳ C++
- ↳ IOS
- ↳ CSS
- ↳ Excel
- ↳ Linux
- ↳ Apache
- ↳ MySql
- Детский мир
- Для души
- ↳ Музыкальные инструменты даром
- ↳ Печатная продукция даром
- Внешняя красота и здоровье
- ↳ Одежда и обувь для взрослых даром
- ↳ Товары для здоровья
- ↳ Физкультура и спорт
- Техника - даром!
- ↳ Автомобилистам
- ↳ Компьютерная техника
- ↳ Плиты: газовые и электрические
- ↳ Холодильники
- ↳ Стиральные машины
- ↳ Телевизоры
- ↳ Телефоны, смартфоны, плашеты
- ↳ Швейные машинки
- ↳ Прочая электроника и техника
- ↳ Фототехника
- Ремонт и интерьер
- ↳ Стройматериалы, инструмент
- ↳ Мебель и предметы интерьера даром
- ↳ Cантехника
- Другие темы
- ↳ Разное даром
- ↳ Давай меняться!
- ↳ Отдам\возьму за копеечку
- ↳ Работа и подработка в Кемерове
- ↳ Давай с тобой поговорим...
Мобильная версия