class GameMatchesWorker:
def __init__(
self,
consumer: consumers.AbstractConsumer,
producer: producers.AbstractProducer,
provider_client: fast_data.FastDataClient,
) -> None:
self._consumer = consumer
self._producer = producer
self._provider_client = provider_client
self._matches = set()
self._input_buffer = []
self._input_event = asyncio.Event()
self._match_schedular = {}
self._lock = asyncio.Lock()
async def _receive(self) -> dict:
while not self._input_buffer:
await asyncio.wait_for(self._input_event.wait(), timeout=10)
self._input_event.clear()
received_data = self._input_buffer.pop(0)
if received_data.get("code") == fast_data.StatusCodes.SUCCESS:
return {game_match_meta["game_id"]: game_match_meta for game_match_meta in received_data["list"]}
async def _get_update_schedule(self):
current_date = datetime.now()
for source in range(1,6):
await self._provider_client.emit_games_list(
date_from=current_date, date_to=current_date, source=source
)
matches_schedule = await self._receive()
self._match_schedular[source] = matches_schedule
def _check_match_in_schedule(self, match_id: int):
for source in self._match_schedular.values():
if match_id in self._match_schedular[source]:
return True
async def _consume_callback(self, game_match_message: consumers.GameMatchMessage) -> None:
with TRACER.start_as_current_span("fastdata_workerhost_consume_message"):
game_match_id = game_match_message.game_id
if not self._check_match_in_schedule(game_match_id):
await self._get_update_schedule()
Когда выполнение кода переходит к методу self._get_update_schedule() в _consume_callback, контекст выполнения переключается на следующее сообщение, и в этом сценарии мне нужно добиться синхронного выполнения. Используется библиотека aio-pika
Я пробовал установить асинхронные блокировки. Lock() на разных уровнях программы
У меня есть этот код [code]class GameMatchesWorker: def __init__( self, consumer: consumers.AbstractConsumer, producer: producers.AbstractProducer, provider_client: fast_data.FastDataClient, ) -> None: self._consumer = consumer self._producer = producer self._provider_client = provider_client self._matches = set() self._input_buffer = [] self._input_event = asyncio.Event() self._match_schedular = {} self._lock = asyncio.Lock()
async def _receive(self) -> dict: while not self._input_buffer: await asyncio.wait_for(self._input_event.wait(), timeout=10) self._input_event.clear() received_data = self._input_buffer.pop(0)
if received_data.get("code") == fast_data.StatusCodes.SUCCESS: return {game_match_meta["game_id"]: game_match_meta for game_match_meta in received_data["list"]}
def _check_match_in_schedule(self, match_id: int): for source in self._match_schedular.values(): if match_id in self._match_schedular[source]: return True
async def _consume_callback(self, game_match_message: consumers.GameMatchMessage) -> None: with TRACER.start_as_current_span("fastdata_workerhost_consume_message"): game_match_id = game_match_message.game_id if not self._check_match_in_schedule(game_match_id): await self._get_update_schedule() [/code] Когда выполнение кода переходит к методу self._get_update_schedule() в _consume_callback, контекст выполнения переключается на следующее сообщение, и в этом сценарии мне нужно добиться синхронного выполнения. Используется библиотека aio-pika Я пробовал установить асинхронные блокировки. Lock() на разных уровнях программы