Состояние гонки в Confluent Kafka Consumer с Asyncio и ThreadPoolExecutor в PythonPython

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 Состояние гонки в Confluent Kafka Consumer с Asyncio и ThreadPoolExecutor в Python

Сообщение Anonymous »

Я работаю над приложением Python, в котором мне нужно получать сообщения из темы Kafka, обрабатывать их, создавая асинхронный запрос API, и формировать ответ на исходящую тему Kafka. Поскольку клиент Kafka, который я использую, является синхронным (confluent-kafka), я решил использовать ThreadPoolExecutor для запуска потребителя в отдельном потоке и запуска асинхронных задач в основном цикле событий для операций ввода-вывода.
Код работает, но я сталкиваюсь с состоянием гонки, когда два запроса поступают одновременно. Подтверждение отправляется для обоих запросов, но фактический запрос API (внутри fetch_response_from_rest_service) отправляется только для одного из запросов, причем дважды. Эта проблема возникает в том разделе кода, где я отметил комментарий.
Вот соответствующий код:

Код: Выделить всё

import json
import confluent_kafka as kafka
import asyncio
from concurrent.futures import ThreadPoolExecutor
import logging as logger

async def run_prediction(inbound_topics, outbound_topics):
consumer_args = {'bootstrap.servers': config.BOOTSTRAP_SERVERS,
'group.id': config.APPLICATION_ID,
'default.topic.config': {'auto.offset.reset': config.AUTO_OFFSET_RESET},
'enable.auto.commit': config.ENABLE_AUTO_COMMIT,
'max.poll.interval.ms': config.MAX_POLL_INTERVAL_MS}
training_consumer = kafka.Consumer(consumer_args)
training_consumer.subscribe(inbound_topics)
outbound_producer = kafka.Producer({'bootstrap.servers': config.BOOTSTRAP_SERVERS})
logger.info(f"Listening to inbound topic {inbound_topics}")

while True:
msg = training_consumer.poll(timeout=config.KAFKA_POLL_TIMEOUT)
if not msg:
continue
if msg.error():
logger.info(f"Consumer error: {str(msg.error())}")
continue
try:
send_ack(msg.value(), "MESSAGE_RECEIVED")
loop = asyncio.get_event_loop()
loop.run_in_executor(executor, lambda: asyncio.run(
fetch_response_from_rest_service(message=msg.value().decode(),
callback=kafka_status_callback(msg, outbound_producer,
outbound_topics))))
except Exception as ex:
logger.exception(ex)

async def fetch_response_from_rest_service(message, callback):
# race condition happens at this point message variable when two requests come at same time
message = json.loads(message)
url = "SOME_ENDPOINT"
headers = {
"Content-Type": "application/json"
}
response = None
try:
logger.info(f"sending request to {url} for payload {message}")
response = await async_request("POST", url, headers, data=json.dumps(message),
timeout=10)

response = json.loads(response)
except Exception as ex:
logger.exception(f"All retries failed. Error: {ex}")
finally:
callback(response)

asyncio.run(run_prediction(["INBOUND_TOPIC"], ["OUTBOUND_TOPIC"]))
Метод send_ack просто отправляет сообщение в теме Kafka о подтверждении обработки.
Я определил состояние гонки, когда несколько сообщений обрабатываются одновременно время. Функция fetch_response_from_rest_service, похоже, использует неправильное сообщение для одного из запросов, из-за чего она дважды повторно использует одно сообщение, а другое удаляется.
Я пытался решить эту проблему, блокировка участка кода, обрабатывающего переменную сообщения:

Код: Выделить всё

async def fetch_response_from_rest_service(message, callback):
message_copy_lock = asyncio.Lock()
async with message_copy_lock:
logger.info(f"Got message : {json.loads(message)['conversationRequest']['requestId']}")
message = json.loads(message)
url = "SOME_ENDPOINT"
headers = {
"Content-Type": "application/json"
}
response = None
try:
logger.info(f"sending request to {url} for payload {message}")
response = await async_request("POST", url, headers, data=json.dumps(message),
timeout=10)
response = json.loads(response)
except Exception as ex:
logger.exception(f"All retries failed.  Error: {ex}")
finally:
callback(response)
Однако это не устранило проблему.
Мои ограничения:
  • < li>Я хочу продолжать использовать синхронный клиент Kafka (Confluent Kafka)
    из-за ограничений проекта. Я не могу переключиться на асинхронный клиент Kafka
    например, AIOKafka. Я рассматривал возможность использования asyncio.create_task(), если я
    использовал асинхронный клиент Kafka, где я мог бы дождаться ответа от вызова API и при этом продолжить обработку запросов на опрос, но я хочу избежать этого пути из-за
    ограничения проекта.
Вопросы:
  • Почему это происходит?
  • Как мне избежать состояния гонки в моей текущей настройке?
  • Правильно ли я использую asyncio.run_in_executor() в этом контексте?
  • Должен ли я делать что-то по-другому для безопасной обработки параллельных запросов? Будет ли какой-либо другой метод синхронизации работать лучше, чем asyncio.Lock()?
Будем благодарны за любую помощь или предложения!

Подробнее здесь: https://stackoverflow.com/questions/789 ... executor-i
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение
  • Состояние гонки в Confluent Kafka Consumer с Asyncio и ThreadPoolExecutor в Python
    Anonymous » » в форуме Python
    0 Ответы
    25 Просмотры
    Последнее сообщение Anonymous
  • Состояние гонки в Confluent Kafka Consumer с Asyncio и ThreadPoolExecutor в Python
    Anonymous » » в форуме Python
    0 Ответы
    14 Просмотры
    Последнее сообщение Anonymous
  • Состояние гонки в Confluent Kafka Consumer с Asyncio и ThreadPoolExecutor в Python
    Anonymous » » в форуме Python
    0 Ответы
    21 Просмотры
    Последнее сообщение Anonymous
  • Состояние гонки в Confluent Kafka Consumer с Asyncio и ThreadPoolExecutor в Python
    Anonymous » » в форуме Python
    0 Ответы
    18 Просмотры
    Последнее сообщение Anonymous
  • Zookeeper Confluent Kafka «Ошибка открытия файла Zookeeper-gc.log» при запуске Confluent
    Anonymous » » в форуме JAVA
    0 Ответы
    26 Просмотры
    Последнее сообщение Anonymous

Вернуться в «Python»