Как запустить отправку сообщения через веб-сокет Fastapi за пределами приложения Fastapi ⇐ Python
-
Гость
Как запустить отправку сообщения через веб-сокет Fastapi за пределами приложения Fastapi
У меня есть такой менеджер соединений WebSocket:
класс ConnectionManager: def __init__(self) -> Нет: self.connections = {} асинхронное определение подключения (self, user_id: str, websocket: WebSocket): ожидайте websocket.accept() self.connections[user_id] = веб-сокет асинхронное отключение по определению (self, user_id): веб-сокет: WebSocket = self.connections[user_id] ожидайте websocket.close() del self.connections[user_id] асинхронная защита send_messages(self, user_ids, message): для user_id в user_ids: веб-сокет: WebSocket = self.connections[user_id] ожидайте websocket.send_json(сообщение и маршрут WebSocket:
@router.websocket("/ws/{token}") async def ws(websocket: WebSocket, token: str, redis: Annotated [Redis, Depends(get_redis)]): user_id = redis.get(токен) если user_id: redis.expire(user_id) еще: поднять redis_error пытаться: менеджер.connect(user_id, WebSocket) кроме WebSocketException: менеджер.disconnect(user_id) Я хочу сохранить соединения пользователей, и когда придет сообщение Redis pubsub, обработать это сообщение, а затем отправить сообщение WebSocket некоторым пользователям. модуль, обрабатывающий сообщение, не является частью приложения Fastapi.
Я пытался реализовать это внутри приложения Fastapi, реализовав threading и asyncio, но они прервали работу Fastapi само приложение.
как я могу инициировать отправку сообщения объектов WebSocket за пределами приложения Fastapi?
Что я пробовал:
redis = Redis(redis_host, redis_port) pubsub = redis.pubsub() pubsub.subscribe("channel_signal") @router.websocket("/ws/{токен}") async def ws(websocket: WebSocket, token: str): сообщение = ожидание pubsub.get_message(ignore_subscribe_messages=True) если сообщение не «Нет»: # сделай что-нибудь пытаться: менеджер.connect(user_id, WebSocket) кроме WebSocketException: менеджер.disconnect(user_id) но я получаю сообщение об ошибке pubsub от Redis и говорит, что я еще не подписался, если не сделаю это следующим образом:
@router.websocket("/ws/{токен}") async def ws(websocket: WebSocket, token: str): redis = Redis(redis_host, redis_port) pubsub = redis.pubsub() pubsub.subscribe("channel_signal") сообщение = ожидание pubsub.get_message(ignore_subscribe_messages=True) если сообщение не «Нет»: # сделай что-нибудь пытаться: менеджер.connect(user_id, WebSocket) кроме WebSocketException: менеджер.disconnect(user_id)
но при этом создается соединение Redis для каждого пользователя, подключающегося к веб-сокету, можно ли как-нибудь глобально определить соединение Redis для всех пользователей?
Обновление 1:
На ответ VonC я написал следующее:
из fastapi import FastAPI, WebSocket, WebSocketException из v1.endpoints.user.auth импортируйте маршрутизатор как auth_router из v1.endpoints.signals импортируйте маршрутизатор как signal_router из configs.connection_config импортируйте redis_host, redis_port импортировать redis.asyncio как aioredis импорт потоков импортировать асинхронный код импортировать UUID приложение = ФастAPI() app.include_router(auth_router, prefix="/users/auth", tags = ["auth"]) app.include_router(signals_router, prefix="/signals", tags = ["signals"]) класс ConnectionManager: последнее_сообщение = "" def __init__(self) -> Нет: self.connections = {} асинхронное определение подключения (self, user_id: str, websocket: WebSocket): ожидайте websocket.accept() self.connections[user_id] = веб-сокет асинхронное отключение по определению (self, user_id): веб-сокет: WebSocket = self.connections[user_id] ожидайте websocket.close() del self.connections[user_id] асинхронная защита send_messages(self, user_ids, message): для user_id в user_ids: веб-сокет: WebSocket = self.connections[user_id] ожидайте websocket.send_json (сообщение) менеджер = Менеджер соединений() @app.websocket("/ws") асинхронная защита ws (websocket: WebSocket): пытаться: ждут менеджер.connect(str(uuid.uuid4()), вебсокет) кроме WebSocketException: ожидайте менеджера.disconnect(str(uuid.uuid4())) redis_client = Нет @app.on_event("запуск") асинхронная защита start_event_connect_redis(): глобальный redis_client redis_client = aioredis.Redis(хост=redis_host, порт=redis_port) защита Listen_to_redis(): pubsub = redis_client.pubsub() pubsub.subscribe("channel_signal") пока правда: сообщение = pubsub.get_message(ignore_subscribe_messages=True) если сообщение: печать(сообщение["данные"]) @app.on_event("запуск") асинхронная защита start_event_listen_redis(): # Запускаем отдельный поток для прослушивания сообщений Redis Pub/Sub threading.Thread(target=listen_to_redis, daemon=True).start() поскольку aioredis устарел, я использую версию redispy aioredis.
У меня проблема с этой частью:
пока True: сообщение = pubsub.get_message(ignore_subscribe_messages=True) если сообщение: напечатать("привет") if message всегда выполняется и постоянно печатает hi, поэтому любое событие будет запускаться бесконечно.
Обновление 2:
Хотя я не смог полностью проверить ответ из-за другой проблемы с fastapi (для которой я открою новую ветку и ссылку здесь), в ответ добавлено обновление, определяющее глобальное соединение Redis для всех пользователей и прослушивания его отдельно от жизненного цикла fastapi, мне пришлось использовать реальную библиотеку aioredis вместо версии redispy.
У меня есть такой менеджер соединений WebSocket:
класс ConnectionManager: def __init__(self) -> Нет: self.connections = {} асинхронное определение подключения (self, user_id: str, websocket: WebSocket): ожидайте websocket.accept() self.connections[user_id] = веб-сокет асинхронное отключение по определению (self, user_id): веб-сокет: WebSocket = self.connections[user_id] ожидайте websocket.close() del self.connections[user_id] асинхронная защита send_messages(self, user_ids, message): для user_id в user_ids: веб-сокет: WebSocket = self.connections[user_id] ожидайте websocket.send_json(сообщение и маршрут WebSocket:
@router.websocket("/ws/{token}") async def ws(websocket: WebSocket, token: str, redis: Annotated [Redis, Depends(get_redis)]): user_id = redis.get(токен) если user_id: redis.expire(user_id) еще: поднять redis_error пытаться: менеджер.connect(user_id, WebSocket) кроме WebSocketException: менеджер.disconnect(user_id) Я хочу сохранить соединения пользователей, и когда придет сообщение Redis pubsub, обработать это сообщение, а затем отправить сообщение WebSocket некоторым пользователям. модуль, обрабатывающий сообщение, не является частью приложения Fastapi.
Я пытался реализовать это внутри приложения Fastapi, реализовав threading и asyncio, но они прервали работу Fastapi само приложение.
как я могу инициировать отправку сообщения объектов WebSocket за пределами приложения Fastapi?
Что я пробовал:
redis = Redis(redis_host, redis_port) pubsub = redis.pubsub() pubsub.subscribe("channel_signal") @router.websocket("/ws/{токен}") async def ws(websocket: WebSocket, token: str): сообщение = ожидание pubsub.get_message(ignore_subscribe_messages=True) если сообщение не «Нет»: # сделай что-нибудь пытаться: менеджер.connect(user_id, WebSocket) кроме WebSocketException: менеджер.disconnect(user_id) но я получаю сообщение об ошибке pubsub от Redis и говорит, что я еще не подписался, если не сделаю это следующим образом:
@router.websocket("/ws/{токен}") async def ws(websocket: WebSocket, token: str): redis = Redis(redis_host, redis_port) pubsub = redis.pubsub() pubsub.subscribe("channel_signal") сообщение = ожидание pubsub.get_message(ignore_subscribe_messages=True) если сообщение не «Нет»: # сделай что-нибудь пытаться: менеджер.connect(user_id, WebSocket) кроме WebSocketException: менеджер.disconnect(user_id)
но при этом создается соединение Redis для каждого пользователя, подключающегося к веб-сокету, можно ли как-нибудь глобально определить соединение Redis для всех пользователей?
Обновление 1:
На ответ VonC я написал следующее:
из fastapi import FastAPI, WebSocket, WebSocketException из v1.endpoints.user.auth импортируйте маршрутизатор как auth_router из v1.endpoints.signals импортируйте маршрутизатор как signal_router из configs.connection_config импортируйте redis_host, redis_port импортировать redis.asyncio как aioredis импорт потоков импортировать асинхронный код импортировать UUID приложение = ФастAPI() app.include_router(auth_router, prefix="/users/auth", tags = ["auth"]) app.include_router(signals_router, prefix="/signals", tags = ["signals"]) класс ConnectionManager: последнее_сообщение = "" def __init__(self) -> Нет: self.connections = {} асинхронное определение подключения (self, user_id: str, websocket: WebSocket): ожидайте websocket.accept() self.connections[user_id] = веб-сокет асинхронное отключение по определению (self, user_id): веб-сокет: WebSocket = self.connections[user_id] ожидайте websocket.close() del self.connections[user_id] асинхронная защита send_messages(self, user_ids, message): для user_id в user_ids: веб-сокет: WebSocket = self.connections[user_id] ожидайте websocket.send_json (сообщение) менеджер = Менеджер соединений() @app.websocket("/ws") асинхронная защита ws (websocket: WebSocket): пытаться: ждут менеджер.connect(str(uuid.uuid4()), вебсокет) кроме WebSocketException: ожидайте менеджера.disconnect(str(uuid.uuid4())) redis_client = Нет @app.on_event("запуск") асинхронная защита start_event_connect_redis(): глобальный redis_client redis_client = aioredis.Redis(хост=redis_host, порт=redis_port) защита Listen_to_redis(): pubsub = redis_client.pubsub() pubsub.subscribe("channel_signal") пока правда: сообщение = pubsub.get_message(ignore_subscribe_messages=True) если сообщение: печать(сообщение["данные"]) @app.on_event("запуск") асинхронная защита start_event_listen_redis(): # Запускаем отдельный поток для прослушивания сообщений Redis Pub/Sub threading.Thread(target=listen_to_redis, daemon=True).start() поскольку aioredis устарел, я использую версию redispy aioredis.
У меня проблема с этой частью:
пока True: сообщение = pubsub.get_message(ignore_subscribe_messages=True) если сообщение: напечатать("привет") if message всегда выполняется и постоянно печатает hi, поэтому любое событие будет запускаться бесконечно.
Обновление 2:
Хотя я не смог полностью проверить ответ из-за другой проблемы с fastapi (для которой я открою новую ветку и ссылку здесь), в ответ добавлено обновление, определяющее глобальное соединение Redis для всех пользователей и прослушивания его отдельно от жизненного цикла fastapi, мне пришлось использовать реальную библиотеку aioredis вместо версии redispy.
Мобильная версия