Код работает, но я сталкиваюсь с состоянием гонки, когда два запроса поступают одновременно. Подтверждение отправляется для обоих запросов, но фактический запрос API (внутри fetch_response_from_rest_service) отправляется только для одного из запросов, причем дважды. Эта проблема возникает в том разделе кода, где я отметил комментарий.
Вот соответствующий код:
Код: Выделить всё
import json
import confluent_kafka as kafka
import asyncio
from concurrent.futures import ThreadPoolExecutor
import logging as logger
async def run_prediction(inbound_topics, outbound_topics):
consumer_args = {'bootstrap.servers': config.BOOTSTRAP_SERVERS,
'group.id': config.APPLICATION_ID,
'default.topic.config': {'auto.offset.reset': config.AUTO_OFFSET_RESET},
'enable.auto.commit': config.ENABLE_AUTO_COMMIT,
'max.poll.interval.ms': config.MAX_POLL_INTERVAL_MS}
training_consumer = kafka.Consumer(consumer_args)
training_consumer.subscribe(inbound_topics)
outbound_producer = kafka.Producer({'bootstrap.servers': config.BOOTSTRAP_SERVERS})
logger.info(f"Listening to inbound topic {inbound_topics}")
while True:
msg = training_consumer.poll(timeout=config.KAFKA_POLL_TIMEOUT)
if not msg:
continue
if msg.error():
logger.info(f"Consumer error: {str(msg.error())}")
continue
try:
send_ack(msg.value(), "MESSAGE_RECEIVED")
loop = asyncio.get_event_loop()
loop.run_in_executor(executor, lambda: asyncio.run(
fetch_response_from_rest_service(message=msg.value().decode(),
callback=kafka_status_callback(msg, outbound_producer,
outbound_topics))))
except Exception as ex:
logger.exception(ex)
async def fetch_response_from_rest_service(message, callback):
# race condition happens at this point message variable when two requests come at same time
message = json.loads(message)
url = "SOME_ENDPOINT"
headers = {
"Content-Type": "application/json"
}
response = None
try:
logger.info(f"sending request to {url} for payload {message}")
response = await async_request("POST", url, headers, data=json.dumps(message),
timeout=10)
response = json.loads(response)
except Exception as ex:
logger.exception(f"All retries failed. Error: {ex}")
finally:
callback(response)
asyncio.run(run_prediction(["INBOUND_TOPIC"], ["OUTBOUND_TOPIC"]))
Я определил состояние гонки, когда несколько сообщений обрабатываются одновременно время. Функция fetch_response_from_rest_service, похоже, использует неправильное сообщение для одного из запросов, из-за чего она дважды повторно использует одно сообщение, а другое удаляется.
Я пытался решить эту проблему, блокировка участка кода, обрабатывающего переменную сообщения:
Код: Выделить всё
async def fetch_response_from_rest_service(message, callback):
message_copy_lock = asyncio.Lock()
async with message_copy_lock:
logger.info(f"Got message : {json.loads(message)['conversationRequest']['requestId']}")
message = json.loads(message)
url = "SOME_ENDPOINT"
headers = {
"Content-Type": "application/json"
}
response = None
try:
logger.info(f"sending request to {url} for payload {message}")
response = await async_request("POST", url, headers, data=json.dumps(message),
timeout=10)
response = json.loads(response)
except Exception as ex:
logger.exception(f"All retries failed. Error: {ex}")
finally:
callback(response)
Мои ограничения:
- < li>Я хочу продолжать использовать синхронный клиент Kafka (Confluent Kafka)
из-за ограничений проекта. Я не могу переключиться на асинхронный клиент Kafka
например, AIOKafka. Я рассматривал возможность использования asyncio.create_task(), если я
использовал асинхронный клиент Kafka, где я мог бы дождаться ответа от вызова API и при этом продолжить обработку запросов на опрос, но я хочу избежать этого пути из-за
ограничения проекта.
- Почему это происходит?
- Как я могу избежать состояния гонки в моей текущей настройке?
- Как работают несколько циклов событий в одном потоке по сравнению с работающими циклами событий в разных потоках работают?
- Правильно ли я использую asyncio.run_in_executor() в этом контексте?
- Должен ли я делать что-то по-другому для обработки параллельного процесса запросы безопасно? Будет ли какой-либо другой метод синхронизации работать лучше, чем asyncio.Lock()?
Подробнее здесь: https://stackoverflow.com/questions/789 ... executor-i