Как реализовать периодически выполняемую функцию в веб-приложении Python?Python

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 Как реализовать периодически выполняемую функцию в веб-приложении Python?

Сообщение Anonymous »


Если аккаунт заблокирован более чем на 7 дней, в моем случае его необходимо удалить. На данный момент у меня есть три идеи:
[*]Создайте еще одну папку cronjob и напишите сценарий, который будет использовать cronjob для периодического выполнения этого файла Python. (Могу ли я использовать метод подключения MongoDB в base.py для совместного использования или мне нужно написать его отдельно?) [*]Реализуйте API для удаления учетных записей в authentication.py и настройте cronjob для периодического вызова этого API. [*]
Напишите функциональность в user.py и используйте if __name__ == "__main__": для вызова этой функции.

(Вы можете проверить функцию delete_suspended_accounts ниже.)

Как лучше всего реализовать эту функцию? Или у вас есть другие предложения? Спасибо.

Для получения дополнительной информации вы можете проверить структуру моего кода.
[*]Реализуйте API-интерфейсы учетной записи в authentication.py, такие как регистрация, сброс пароля, блокировка учетной записи, повторная активация учетной записи и блокировка учетной записи. [*]Реализовать соединение MongoDB в base.py. [*]Реализовать логические операции базы данных для API учетной записи в user.py. . └── fastapi-бэкэнд ├── активы ├── статический ├── источник │ ├── апи │ │ ├── api_router.py │ │ └── аутентификация.py │ ├── дб │ │ ├── base.py │ │ └── user.py │ ├── модели │ └── услуги └── утилиты base.py
из pymongo импортировать MongoClient импортировать URL-библиотеку из импорта fastapi Зависит из настройки импорта src.core.config от ввода import Callable, Type База классов: def __init__(self, Connection: MongoClient) -> Нет: self._conn = Connection.demo @свойство соединение по умолчанию (самостоятельное): вернуть self._conn защита _get_mongo_client(): вернуть MongoClient (setting.MONGO_URL, maxPoolSize = None) def get_repository(repo_type: Type[Base]) -> Callable: async def _get_repo(conn: MongoClient = Depends(_get_mongo_client)): выход repo_type (подключение) вернуть _get_repo user.py класс UserRepository(Base): защита __init__(self, conn): супер().__init__(подключение) def suspend_user(self, user_id: str): the_user = self.connection.user.find_one({'_id': ObjectId(user_id)}) if the_user и the_user['status'] == Status.ACTIVATED.value: self.connection.user.update(the_user, {"$set": {'status': Status.SUSPENDED.value, 'suspended_date': datetime.datetime.now()}}) еще: поднять HTTPException (status_code = 400, подробно = u «Не активировано») защита delete_suspended_accounts (сам): query = {"status": 'Status.SUSPENDED.value', "suspended_date": {"$lt": datetime.datetime.utcnow() - timedelta(days=7)}} результат = self.connection.user.delete_many(запрос) вернуть результат.deleted_count authentication.py @authentication.post('/suspend') async def suspend_account (запрос: запрос, токен: str, user_repository: UserRepository = Зависит(get_repository(UserRepository))): пытаться: # декодируем токен доступа access_token = jwt.decode(токен, ключ=setting.SECERT_KEY) # заблокировать пользователя user_id = access_token['sub'] user_repository.suspend_user(user_id=user_id) кроме DecodeError: return {'message': 'ошибка декодирования'} кроме ExpiredSignatureError: return {'message': 'ошибка с истекшим сроком действия подписи'} вернуть {'сообщение': 'ок'} Я попробовал первый метод следующим образом: написал сценарий, а затем выполнил его с помощью cronjob, и это сработало успешно. Однако мой технический руководитель предложил мне написать это в user.py. Но поскольку FastAPI использует внедрение зависимостей (Depends) для подключения к MongoDB, как мне лучше его написать? Или есть какие-то другие методы, которые вы бы порекомендовали? Спасибо.
из pymongo импортировать MongoClient из даты и времени импорта даты и времени, timedelta клиент = MongoClient("MONGO_URL") БД = клиент['демо'] коллекция = БД['пользователь'] защита delete_suspended_accounts(): запрос = {"status": 3, "suspended_date": {"$lt": datetime.utcnow() - timedelta(days=7)}} результат = коллекция.delete_many(запрос) print(f"Удалены заблокированные аккаунты: {result.deleted_count}.") если __name__ == "__main__": delete_suspended_accounts()
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «Python»