Почему отладчик не останавливается в асинхронных функциях, которые работают в очереди в Pycharm?Python

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 Почему отладчик не останавливается в асинхронных функциях, которые работают в очереди в Pycharm?

Сообщение Anonymous »

Использование Pycharm. Отладчик работает повсюду, за исключением функций, которые запущены в очереди. Пытался использовать pydevd-pycharm с ним также ничего не сработало.from bot_contacts.app import app

from bot_contacts.log import configure_logs
from bot_contacts.config import load_config

from bot_contacts.accounts_server.accounts_server import run_accounts_server

logger = configure_logs()
config = load_config()

if __name__ == '__main__':
run_accounts_server()

< /code>
запустить сервер aionttp < /p>
import asyncio

from aiohttp import web

from bot_contacts.accounts_server.accounts_observer import accounts_observer
from bot_contacts.accounts_server.fix_accounts_status import fix_accounts_status
from bot_contacts.accounts_server.handlers import auth, get_accounts, callback_2fa
from bot_contacts.aiohttp_app import AioHttpApplication
from bot_contacts.clean_background_tasks import cleanup_background_tasks
from bot_contacts.config import load_config
from bot_contacts.core.abstract import AioHttpApplicationAbstract
from bot_contacts.log import configure_logs
from bot_contacts.queue.queue import Queue
from bot_contacts.queue.result_handler.auth_result_handler import AuthResultHandler

logger = configure_logs()
config = load_config()

async def create_queues(app: AioHttpApplicationAbstract):
app.auth_queue = Queue(
max_workers=config.get("REQUESTS_MAX_WORKERS", 5),
default_max_retries=config.get("REQUESTS_SERVER_MAX_RETRIES", 3),
result_handler=AuthResultHandler(),
name="AuthQueue",
)

app.check_2fa_queue = Queue(
max_workers=config.get("REQUESTS_MAX_WORKERS", 5),
default_max_retries=config.get("REQUESTS_SERVER_MAX_RETRIES", 3),
name="2FACheckQueue",
)

await app.auth_queue.start()
await app.check_2fa_queue.start()

logger.info("Queues initialized")

async def cleanup_queues(app: AioHttpApplicationAbstract):
"""Корректно завершает работу очереди"""
if app.auth_queue:
await app.auth_queue.close()
logger.info("Auth queue shutdown")
if app.check_2fa_queue:
await app.check_2fa_queue.close()
logger.info("Check 2fa queue shutdown")

async def start_background_tasks(app: AioHttpApplicationAbstract):
tasks = [
asyncio.create_task(accounts_observer(app)),
]

app.background_tasks.extend(tasks)

def run_accounts_server():
app = AioHttpApplication()

app.on_startup.extend([
create_queues,
fix_accounts_status,
start_background_tasks
])

app.on_shutdown.extend([
cleanup_background_tasks,
cleanup_queues,
])

app.router.add_post("/auth", auth)
app.router.add_get("/get-accounts", get_accounts)
app.router.add_post("/2fa-callback", callback_2fa)

web.run_app(
app,
host=config["ACCOUNTS_SERVER_ADDR"],
port=config["ACCOUNTS_SERVER_PORT"],
)

< /code>
Обработчик сервера AIONTTP < /p>
from aiohttp import web
from playhouse.shortcuts import model_to_dict

from bot_contacts.accounts_server.authorization import authorize_account, code_2fa_found
from bot_contacts.api_server.api_responses import api_error, api_success
from bot_contacts.ati_accounts_methods import get_ati_account_by_id
from bot_contacts.core.abstract import QueueAbstract
from bot_contacts.log import configure_logs
from bot_contacts.models.AtiAccounts import ON_AUTH, WAIT_2FA, AtiAccounts, AUTH, NOT_AUTH
from bot_contacts.models.model_helpers import datetime_to_string
from bot_contacts.queue.task_context.auth_task_context import AuthTaskContext
from bot_contacts.queue.task_context.check_2fa_task_context import Check2faTaskContext

logger = configure_logs()

async def auth(request: web.Request) -> web.Response:
auth_queue: QueueAbstract = request.app.auth_queue

try:
request_json = await request.json()
except Exception as e:
return api_error(message="Ошибка парсинга JSON")

if "account_id" not in request_json:
return api_error(message="Account_id не найден в JSON")

account_id = request_json["account_id"]
account = await get_ati_account_by_id(account_id)

if not account:
return api_error(message="Аккаунт не найден")

if account.account_status == ON_AUTH or account.account_status == WAIT_2FA:
return api_error(message="Аккаунт уже на авторизации или ожидает кода 2fa")

account.account_status = ON_AUTH
await account.aio_save()

task_id = await auth_queue.add_task(func=authorize_account, kwargs={account: account}, max_retries=1,
task_context=AuthTaskContext(request.app, account))

return api_success({
"account_id": account_id,
"task_id": task_id,
}, "Задача поставлена в очередь")
< /code>
Функция, которая работает в очереди и в которой отладчик Pyharm не останавливается < /p>
async def authorize_account(account: AtiAccounts) -> bool:
ua = UserAgent()
user_agent = ua.random

headers_dict = create_headers_dict(user_agent)

if not await check_need_change_proxy(account):
raise NoNeedReauthorize("Переавторизовывать аккаунт не требуется, запросы проходят")

proxy = None
cookies_dict_did = None
proxy_string = None

if ACCOUNTS_SERVER_USE_PROXIES:
proxies = await get_proxies()

if not proxies or proxies and len(proxies) == 0:
raise NoProxies("Нет прокси")

for _proxy in proxies:
_proxy_string = f"{_proxy['proxy_proto']}://{_proxy['proxy_string']}"
result, cookies_dict_did = await get_did_cookie(headers_dict, _proxy_string)
if result:
proxy = _proxy
proxy_string = _proxy_string
break
else:
result, cookies_dict_did = await get_did_cookie(headers_dict)
if not result:
raise NoCookies("Не удалось получить did куки")

result, status, cookies_dict_sid = await get_sid_cookie(account, headers_dict, cookies_dict_did, proxy_string)

if not result and status == GET_SID_COOKIE_STATUS_ERROR:
raise NoCookies("Не удалось получить sid куки")
elif not result and status == GET_SID_COOKIE_STATUS_INVALID_AUTH_DATA:
raise InvalidAuthData("Неверный логин или пароль")
elif status == GET_SID_COOKIE_STATUS_2FA or status == GET_SID_COOKIE_STATUS_SUCCESS:
cookies_dict = {
"did": cookies_dict_did["did"],
"sid": cookies_dict_sid["sid"],
}

account.cookies = json.dumps(cookies_dict)
await account.aio_save()
await update_ati_account_data(account.id, "user_agent", user_agent)
if ACCOUNTS_SERVER_USE_PROXIES:
await update_ati_account_data(account.id, "proxy", proxy["proxy_string"])
await update_ati_account_data(account.id, "proxy_proto", proxy["proxy_proto"])
else:
await update_ati_account_data(account.id, "proxy", None)
await update_ati_account_data(account.id, "proxy_proto", None)
if not result:
raise Needed2FA("Нужен код 2FA")

return True
< /code>
очередь, в которой выполняются асинхронные функции < /p>
import asyncio
from uuid import uuid4
from typing import Callable, Optional

from bot_contacts.core.abstract import QueueAbstract, ResultHandlerAbstract, TaskAbstract, TaskContextAbstract
from bot_contacts.core.queue.task_status import TaskStatus
from bot_contacts.log import configure_logs
from bot_contacts.queue.task import Task

logger = configure_logs()

class Queue(QueueAbstract):
def __init__(
self,
max_workers: int = 5,
default_max_retries: int = 3,
retry_delay: Callable[[int], float] = lambda attempt: min(2**attempt, 10),
result_handler: Optional[ResultHandlerAbstract] = None,
name: str = "Queue",
):
self.queue = asyncio.Queue(maxsize=max_workers * 2)
self.active_tasks: dict[str, TaskAbstract] = {}
self.lock = asyncio.Lock()
self.default_max_retries = default_max_retries
self.retry_delay = retry_delay
self.max_workers = max_workers
self._workers = []
self._running = False
self._result_handler = result_handler
self.name = name

async def start(self):
"""Start workers."""
if self._running:
return

self._running = True
self._workers = [asyncio.create_task(self._worker()) for _ in range(self.max_workers)]
logger.info(f"Queue {self.name} started with {self.max_workers} workers")

async def _worker(self):
while self._running:
try:
task = await self.queue.get()
task_id = task.task_id

try:
async with self.lock:
task.status = TaskStatus.PROCESSING

result = await self._execute_with_retries(
func=task.func,
args=task.args,
kwargs=task.kwargs,
max_retries=task.max_retries,
task_id=task_id
)

async with self.lock:
task.status = TaskStatus.COMPLETED
task.result = result
task.retries_left = 0
if self._result_handler:
await self._result_handler.process_success(self, task_id, task)
del self.active_tasks[task_id]

except Exception as e:
async with self.lock:
if task_id in self.active_tasks:
task.status = TaskStatus.FAILED
task.error = e
task.retries_left = 0
if self._result_handler:
await self._result_handler.process_failure(self, task_id, task)
del self.active_tasks[task_id]
logger.error(f"Queue {self.name} Task {task_id} failed: {e}")
finally:
self.queue.task_done()

except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"Queue {self.name} Worker error: {e}")

async def _execute_with_retries(self, func: Callable, args: tuple, kwargs: dict, max_retries: int, task_id: str):
last_exception = None

for attempt in range(max_retries + 1):
try:
if attempt > 0:
delay = self.retry_delay(attempt - 1)
logger.info(f"Queue {self.name} Retry {attempt}/{max_retries} after {delay:.1f}s")
await asyncio.sleep(delay)

if asyncio.iscoroutinefunction(func):
result = await func(*args, **kwargs)
else:
result = await asyncio.to_thread(func, *args, **kwargs)
return result

except (KeyboardInterrupt, asyncio.CancelledError, MemoryError) as e:
raise e
except Exception as e:
last_exception = e
async with self.lock:
if task_id in self.active_tasks:
self.active_tasks[task_id].retries_left = max_retries - attempt
logger.warning(f"Queue {self.name} Attempt {attempt + 1} failed: {str(e)}")

raise last_exception or Exception("All retries failed")

async def add_task(
self,
func: Callable,
args: Optional[tuple] = None,
kwargs: Optional[dict] = None,
max_retries: Optional[int] = None,
task_context: Optional[TaskContextAbstract] = None,
) -> str:
task_id = str(uuid4())
max_retries = max_retries if max_retries is not None else self.default_max_retries

async with self.lock:
position = self.queue.qsize() + len(
[t for t in self.active_tasks.values() if t.status == TaskStatus.PROCESSING]
)
task = Task(
task_id=task_id,
func=func,
args=args or (),
kwargs=kwargs or {},
max_retries=max_retries,
position=position,
task_context=task_context,
)
self.active_tasks[task_id] = task

await self.queue.put(task)
return task_id

async def get_task_status(self, task_id: str) -> Optional[TaskAbstract]:
async with self.lock:
if task_id not in self.active_tasks:
return None

task = self.active_tasks[task_id]
if task.status == TaskStatus.PENDING:
task.position = self._calculate_position(task_id)
return task

def _calculate_position(self, task_id: str) -> int:
pending_tasks = [
tid for tid, task in self.active_tasks.items()
if task.status == TaskStatus.PENDING and tid != task_id
]
return len(pending_tasks)

async def close(self):
"""Gracefully shut down the queue."""
self._running = False

while not self.queue.empty():
self.queue.get_nowait()
self.queue.task_done()

for worker in self._workers:
worker.cancel()

await asyncio.gather(*self._workers, return_exceptions=True)
logger.info(f"Queue {self.name} shutdown completed")

class QueueSingleton(Queue):
_instance = None

def __new__(cls, *args, **kwargs):
if cls._instance is None:
cls._instance = Queue(*args, **kwargs)
return cls._instance

@classmethod
async def initialize(cls, *args, **kwargs):
"""Инициализация с запуском workers"""
instance = cls(*args, **kwargs)
await instance.start()
return instance



Подробнее здесь: https://stackoverflow.com/questions/796 ... in-pycharm
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение
  • В асинхронных функциях строки выполняются в произвольном порядке и сумме OT? [закрыто]
    Anonymous » » в форуме Javascript
    0 Ответы
    16 Просмотры
    Последнее сообщение Anonymous
  • Отладчик VS Code - Феликс Беккер - Отладчик ни к чему не приводит
    Anonymous » » в форуме Php
    0 Ответы
    48 Просмотры
    Последнее сообщение Anonymous
  • Отладчик Eclipse и удаленный отладчик не работают
    Anonymous » » в форуме JAVA
    0 Ответы
    31 Просмотры
    Последнее сообщение Anonymous
  • Отладчик Eclipse и удаленный отладчик не работают
    Anonymous » » в форуме JAVA
    0 Ответы
    11 Просмотры
    Последнее сообщение Anonymous
  • Отладчик Eclipse и удаленный отладчик не работают
    Anonymous » » в форуме JAVA
    0 Ответы
    3 Просмотры
    Последнее сообщение Anonymous

Вернуться в «Python»