Использование Pycharm. Отладчик работает повсюду, за исключением функций, которые запущены в очереди. Пытался использовать pydevd-pycharm с ним также ничего не сработало.from bot_contacts.app import app
from bot_contacts.log import configure_logs
from bot_contacts.config import load_config
from bot_contacts.accounts_server.accounts_server import run_accounts_server
logger = configure_logs()
config = load_config()
if __name__ == '__main__':
run_accounts_server()
< /code>
запустить сервер aionttp < /p>
import asyncio
from aiohttp import web
from bot_contacts.accounts_server.accounts_observer import accounts_observer
from bot_contacts.accounts_server.fix_accounts_status import fix_accounts_status
from bot_contacts.accounts_server.handlers import auth, get_accounts, callback_2fa
from bot_contacts.aiohttp_app import AioHttpApplication
from bot_contacts.clean_background_tasks import cleanup_background_tasks
from bot_contacts.config import load_config
from bot_contacts.core.abstract import AioHttpApplicationAbstract
from bot_contacts.log import configure_logs
from bot_contacts.queue.queue import Queue
from bot_contacts.queue.result_handler.auth_result_handler import AuthResultHandler
logger = configure_logs()
config = load_config()
async def create_queues(app: AioHttpApplicationAbstract):
app.auth_queue = Queue(
max_workers=config.get("REQUESTS_MAX_WORKERS", 5),
default_max_retries=config.get("REQUESTS_SERVER_MAX_RETRIES", 3),
result_handler=AuthResultHandler(),
name="AuthQueue",
)
app.check_2fa_queue = Queue(
max_workers=config.get("REQUESTS_MAX_WORKERS", 5),
default_max_retries=config.get("REQUESTS_SERVER_MAX_RETRIES", 3),
name="2FACheckQueue",
)
await app.auth_queue.start()
await app.check_2fa_queue.start()
logger.info("Queues initialized")
async def cleanup_queues(app: AioHttpApplicationAbstract):
"""Корректно завершает работу очереди"""
if app.auth_queue:
await app.auth_queue.close()
logger.info("Auth queue shutdown")
if app.check_2fa_queue:
await app.check_2fa_queue.close()
logger.info("Check 2fa queue shutdown")
async def start_background_tasks(app: AioHttpApplicationAbstract):
tasks = [
asyncio.create_task(accounts_observer(app)),
]
app.background_tasks.extend(tasks)
def run_accounts_server():
app = AioHttpApplication()
app.on_startup.extend([
create_queues,
fix_accounts_status,
start_background_tasks
])
app.on_shutdown.extend([
cleanup_background_tasks,
cleanup_queues,
])
app.router.add_post("/auth", auth)
app.router.add_get("/get-accounts", get_accounts)
app.router.add_post("/2fa-callback", callback_2fa)
web.run_app(
app,
host=config["ACCOUNTS_SERVER_ADDR"],
port=config["ACCOUNTS_SERVER_PORT"],
)
< /code>
Обработчик сервера AIONTTP < /p>
from aiohttp import web
from playhouse.shortcuts import model_to_dict
from bot_contacts.accounts_server.authorization import authorize_account, code_2fa_found
from bot_contacts.api_server.api_responses import api_error, api_success
from bot_contacts.ati_accounts_methods import get_ati_account_by_id
from bot_contacts.core.abstract import QueueAbstract
from bot_contacts.log import configure_logs
from bot_contacts.models.AtiAccounts import ON_AUTH, WAIT_2FA, AtiAccounts, AUTH, NOT_AUTH
from bot_contacts.models.model_helpers import datetime_to_string
from bot_contacts.queue.task_context.auth_task_context import AuthTaskContext
from bot_contacts.queue.task_context.check_2fa_task_context import Check2faTaskContext
logger = configure_logs()
async def auth(request: web.Request) -> web.Response:
auth_queue: QueueAbstract = request.app.auth_queue
try:
request_json = await request.json()
except Exception as e:
return api_error(message="Ошибка парсинга JSON")
if "account_id" not in request_json:
return api_error(message="Account_id не найден в JSON")
account_id = request_json["account_id"]
account = await get_ati_account_by_id(account_id)
if not account:
return api_error(message="Аккаунт не найден")
if account.account_status == ON_AUTH or account.account_status == WAIT_2FA:
return api_error(message="Аккаунт уже на авторизации или ожидает кода 2fa")
account.account_status = ON_AUTH
await account.aio_save()
task_id = await auth_queue.add_task(func=authorize_account, kwargs={account: account}, max_retries=1,
task_context=AuthTaskContext(request.app, account))
return api_success({
"account_id": account_id,
"task_id": task_id,
}, "Задача поставлена в очередь")
< /code>
Функция, которая работает в очереди и в которой отладчик Pyharm не останавливается < /p>
async def authorize_account(account: AtiAccounts) -> bool:
ua = UserAgent()
user_agent = ua.random
headers_dict = create_headers_dict(user_agent)
if not await check_need_change_proxy(account):
raise NoNeedReauthorize("Переавторизовывать аккаунт не требуется, запросы проходят")
proxy = None
cookies_dict_did = None
proxy_string = None
if ACCOUNTS_SERVER_USE_PROXIES:
proxies = await get_proxies()
if not proxies or proxies and len(proxies) == 0:
raise NoProxies("Нет прокси")
for _proxy in proxies:
_proxy_string = f"{_proxy['proxy_proto']}://{_proxy['proxy_string']}"
result, cookies_dict_did = await get_did_cookie(headers_dict, _proxy_string)
if result:
proxy = _proxy
proxy_string = _proxy_string
break
else:
result, cookies_dict_did = await get_did_cookie(headers_dict)
if not result:
raise NoCookies("Не удалось получить did куки")
result, status, cookies_dict_sid = await get_sid_cookie(account, headers_dict, cookies_dict_did, proxy_string)
if not result and status == GET_SID_COOKIE_STATUS_ERROR:
raise NoCookies("Не удалось получить sid куки")
elif not result and status == GET_SID_COOKIE_STATUS_INVALID_AUTH_DATA:
raise InvalidAuthData("Неверный логин или пароль")
elif status == GET_SID_COOKIE_STATUS_2FA or status == GET_SID_COOKIE_STATUS_SUCCESS:
cookies_dict = {
"did": cookies_dict_did["did"],
"sid": cookies_dict_sid["sid"],
}
account.cookies = json.dumps(cookies_dict)
await account.aio_save()
await update_ati_account_data(account.id, "user_agent", user_agent)
if ACCOUNTS_SERVER_USE_PROXIES:
await update_ati_account_data(account.id, "proxy", proxy["proxy_string"])
await update_ati_account_data(account.id, "proxy_proto", proxy["proxy_proto"])
else:
await update_ati_account_data(account.id, "proxy", None)
await update_ati_account_data(account.id, "proxy_proto", None)
if not result:
raise Needed2FA("Нужен код 2FA")
return True
< /code>
очередь, в которой выполняются асинхронные функции < /p>
import asyncio
from uuid import uuid4
from typing import Callable, Optional
from bot_contacts.core.abstract import QueueAbstract, ResultHandlerAbstract, TaskAbstract, TaskContextAbstract
from bot_contacts.core.queue.task_status import TaskStatus
from bot_contacts.log import configure_logs
from bot_contacts.queue.task import Task
logger = configure_logs()
class Queue(QueueAbstract):
def __init__(
self,
max_workers: int = 5,
default_max_retries: int = 3,
retry_delay: Callable[[int], float] = lambda attempt: min(2**attempt, 10),
result_handler: Optional[ResultHandlerAbstract] = None,
name: str = "Queue",
):
self.queue = asyncio.Queue(maxsize=max_workers * 2)
self.active_tasks: dict[str, TaskAbstract] = {}
self.lock = asyncio.Lock()
self.default_max_retries = default_max_retries
self.retry_delay = retry_delay
self.max_workers = max_workers
self._workers = []
self._running = False
self._result_handler = result_handler
self.name = name
async def start(self):
"""Start workers."""
if self._running:
return
self._running = True
self._workers = [asyncio.create_task(self._worker()) for _ in range(self.max_workers)]
logger.info(f"Queue {self.name} started with {self.max_workers} workers")
async def _worker(self):
while self._running:
try:
task = await self.queue.get()
task_id = task.task_id
try:
async with self.lock:
task.status = TaskStatus.PROCESSING
result = await self._execute_with_retries(
func=task.func,
args=task.args,
kwargs=task.kwargs,
max_retries=task.max_retries,
task_id=task_id
)
async with self.lock:
task.status = TaskStatus.COMPLETED
task.result = result
task.retries_left = 0
if self._result_handler:
await self._result_handler.process_success(self, task_id, task)
del self.active_tasks[task_id]
except Exception as e:
async with self.lock:
if task_id in self.active_tasks:
task.status = TaskStatus.FAILED
task.error = e
task.retries_left = 0
if self._result_handler:
await self._result_handler.process_failure(self, task_id, task)
del self.active_tasks[task_id]
logger.error(f"Queue {self.name} Task {task_id} failed: {e}")
finally:
self.queue.task_done()
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"Queue {self.name} Worker error: {e}")
async def _execute_with_retries(self, func: Callable, args: tuple, kwargs: dict, max_retries: int, task_id: str):
last_exception = None
for attempt in range(max_retries + 1):
try:
if attempt > 0:
delay = self.retry_delay(attempt - 1)
logger.info(f"Queue {self.name} Retry {attempt}/{max_retries} after {delay:.1f}s")
await asyncio.sleep(delay)
if asyncio.iscoroutinefunction(func):
result = await func(*args, **kwargs)
else:
result = await asyncio.to_thread(func, *args, **kwargs)
return result
except (KeyboardInterrupt, asyncio.CancelledError, MemoryError) as e:
raise e
except Exception as e:
last_exception = e
async with self.lock:
if task_id in self.active_tasks:
self.active_tasks[task_id].retries_left = max_retries - attempt
logger.warning(f"Queue {self.name} Attempt {attempt + 1} failed: {str(e)}")
raise last_exception or Exception("All retries failed")
async def add_task(
self,
func: Callable,
args: Optional[tuple] = None,
kwargs: Optional[dict] = None,
max_retries: Optional[int] = None,
task_context: Optional[TaskContextAbstract] = None,
) -> str:
task_id = str(uuid4())
max_retries = max_retries if max_retries is not None else self.default_max_retries
async with self.lock:
position = self.queue.qsize() + len(
[t for t in self.active_tasks.values() if t.status == TaskStatus.PROCESSING]
)
task = Task(
task_id=task_id,
func=func,
args=args or (),
kwargs=kwargs or {},
max_retries=max_retries,
position=position,
task_context=task_context,
)
self.active_tasks[task_id] = task
await self.queue.put(task)
return task_id
async def get_task_status(self, task_id: str) -> Optional[TaskAbstract]:
async with self.lock:
if task_id not in self.active_tasks:
return None
task = self.active_tasks[task_id]
if task.status == TaskStatus.PENDING:
task.position = self._calculate_position(task_id)
return task
def _calculate_position(self, task_id: str) -> int:
pending_tasks = [
tid for tid, task in self.active_tasks.items()
if task.status == TaskStatus.PENDING and tid != task_id
]
return len(pending_tasks)
async def close(self):
"""Gracefully shut down the queue."""
self._running = False
while not self.queue.empty():
self.queue.get_nowait()
self.queue.task_done()
for worker in self._workers:
worker.cancel()
await asyncio.gather(*self._workers, return_exceptions=True)
logger.info(f"Queue {self.name} shutdown completed")
class QueueSingleton(Queue):
_instance = None
def __new__(cls, *args, **kwargs):
if cls._instance is None:
cls._instance = Queue(*args, **kwargs)
return cls._instance
@classmethod
async def initialize(cls, *args, **kwargs):
"""Инициализация с запуском workers"""
instance = cls(*args, **kwargs)
await instance.start()
return instance
Подробнее здесь: https://stackoverflow.com/questions/796 ... in-pycharm
Почему отладчик не останавливается в асинхронных функциях, которые работают в очереди в Pycharm? ⇐ Python
-
- Похожие темы
- Ответы
- Просмотры
- Последнее сообщение
-
-
В асинхронных функциях строки выполняются в произвольном порядке и сумме OT? [закрыто]
Anonymous » » в форуме Javascript - 0 Ответы
- 16 Просмотры
-
Последнее сообщение Anonymous
-