Как запустить отправку сообщения через веб-сокет Fastapi за пределами приложения FastapiPython

Программы на Python
Ответить
Гость
 Как запустить отправку сообщения через веб-сокет Fastapi за пределами приложения Fastapi

Сообщение Гость »


У меня есть такой менеджер соединений WebSocket:

класс ConnectionManager: def __init__(self) -> Нет: self.connections = {} асинхронное определение подключения (self, user_id: str, websocket: WebSocket): ожидайте websocket.accept() self.connections[user_id] = веб-сокет асинхронное отключение по определению (self, user_id): веб-сокет: WebSocket = self.connections[user_id] ожидайте websocket.close() del self.connections[user_id] асинхронная защита send_messages(self, user_ids, message): для user_id в user_ids: веб-сокет: WebSocket = self.connections[user_id] ожидайте websocket.send_json(сообщение и маршрут WebSocket:

@router.websocket("/ws/{token}") async def ws(websocket: WebSocket, token: str, redis: Annotated [Redis, Depends(get_redis)]): user_id = redis.get(токен) если user_id: redis.expire(user_id) еще: поднять redis_error пытаться: менеджер.connect(user_id, WebSocket) кроме WebSocketException: менеджер.disconnect(user_id) Я хочу сохранить соединения пользователей, и когда придет сообщение Redis pubsub, обработать это сообщение, а затем отправить сообщение WebSocket некоторым пользователям. модуль, обрабатывающий сообщение, не является частью приложения Fastapi.

Я пытался реализовать это внутри приложения Fastapi, реализовав threading и asyncio, но они прервали работу Fastapi само приложение.

как я могу инициировать отправку сообщения объектов WebSocket за пределами приложения Fastapi?

Что я пробовал:

redis = Redis(redis_host, redis_port) pubsub = redis.pubsub() pubsub.subscribe("channel_signal") @router.websocket("/ws/{токен}") async def ws(websocket: WebSocket, token: str): сообщение = ожидание pubsub.get_message(ignore_subscribe_messages=True) если сообщение не «Нет»: # сделай что-нибудь пытаться: менеджер.connect(user_id, WebSocket) кроме WebSocketException: менеджер.disconnect(user_id) но я получаю сообщение об ошибке pubsub от Redis и говорит, что я еще не подписался, если не сделаю это следующим образом:
@router.websocket("/ws/{токен}") async def ws(websocket: WebSocket, token: str): redis = Redis(redis_host, redis_port) pubsub = redis.pubsub() pubsub.subscribe("channel_signal") сообщение = ожидание pubsub.get_message(ignore_subscribe_messages=True) если сообщение не «Нет»: # сделай что-нибудь пытаться: менеджер.connect(user_id, WebSocket) кроме WebSocketException: менеджер.disconnect(user_id)
но при этом создается соединение Redis для каждого пользователя, подключающегося к веб-сокету, можно ли как-нибудь глобально определить соединение Redis для всех пользователей?

Обновление 1:

На ответ VonC я написал следующее:

из fastapi import FastAPI, WebSocket, WebSocketException из v1.endpoints.user.auth импортируйте маршрутизатор как auth_router из v1.endpoints.signals импортируйте маршрутизатор как signal_router из configs.connection_config импортируйте redis_host, redis_port импортировать redis.asyncio как aioredis импорт потоков импортировать асинхронный код импортировать UUID приложение = ФастAPI() app.include_router(auth_router, prefix="/users/auth", tags = ["auth"]) app.include_router(signals_router, prefix="/signals", tags = ["signals"]) класс ConnectionManager: последнее_сообщение = "" def __init__(self) -> Нет: self.connections = {} асинхронное определение подключения (self, user_id: str, websocket: WebSocket): ожидайте websocket.accept() self.connections[user_id] = веб-сокет асинхронное отключение по определению (self, user_id): веб-сокет: WebSocket = self.connections[user_id] ожидайте websocket.close() del self.connections[user_id] асинхронная защита send_messages(self, user_ids, message): для user_id в user_ids: веб-сокет: WebSocket = self.connections[user_id] ожидайте websocket.send_json (сообщение) менеджер = Менеджер соединений() @app.websocket("/ws") асинхронная защита ws (websocket: WebSocket): пытаться: ждут менеджер.connect(str(uuid.uuid4()), вебсокет) кроме WebSocketException: ожидайте менеджера.disconnect(str(uuid.uuid4())) redis_client = Нет @app.on_event("запуск") асинхронная защита start_event_connect_redis(): глобальный redis_client redis_client = aioredis.Redis(хост=redis_host, порт=redis_port) защита Listen_to_redis(): pubsub = redis_client.pubsub() pubsub.subscribe("channel_signal") пока правда: сообщение = pubsub.get_message(ignore_subscribe_messages=True) если сообщение: печать(сообщение["данные"]) @app.on_event("запуск") асинхронная защита start_event_listen_redis(): # Запускаем отдельный поток для прослушивания сообщений Redis Pub/Sub threading.Thread(target=listen_to_redis, daemon=True).start() поскольку aioredis устарел, я использую версию redispy aioredis.

У меня проблема с этой частью:

пока True: сообщение = pubsub.get_message(ignore_subscribe_messages=True) если сообщение: напечатать("привет") if message всегда выполняется и постоянно печатает hi, поэтому любое событие будет запускаться бесконечно.

Обновление 2:

Хотя я не смог полностью проверить ответ из-за другой проблемы с fastapi (для которой я открою новую ветку и ссылку здесь), в ответ добавлено обновление, определяющее глобальное соединение Redis для всех пользователей и прослушивания его отдельно от жизненного цикла fastapi, мне пришлось использовать реальную библиотеку aioredis вместо версии redispy.
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «Python»