Это мой код: < /p>
@retry(stop=stop_after_attempt(3), wait=wait_fixed(2))
def kafka_consumer(self):
"""Consume messages from Kafka with retry logic."""
consumer = Consumer({
"bootstrap.servers": "localhost:9092",
"group.id": "test_group",
"auto.offset.reset": "earliest",
"enable.auto.commit": False
})
consumer.subscribe(["test_topic"])
try:
logging.info("Kafka Consumer started.")
while True:
msg = consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
else:
logging.error(f"Kafka error: {msg.error()}")
raise KafkaException(msg.error())
data = msg.value().decode()
logging.info(f"Received from Kafka: {data}")
# Check if event loop is running
if not self.loop.is_running():
logging.error("Event loop is not running!")
continue
# Make http call in the background
future = asyncio.run_coroutine_threadsafe(
self.send_post_request(data), self.loop
)
try:
success = future.result(timeout=5) #
except Exception as e:
logging.error(f"Error in send_post_request: {e}")
success = False
if success:
consumer.commit(message=msg)
else:
logging.warning(f"Message retained for retry: {data}")
except KafkaException as e:
logging.error(f"Kafka connection error: {e}")
raise
finally:
consumer.close()
< /code>
Когда я получаю успешный результат, я совершаю смещение в Kafka для этого сообщения,
работает ли эта реализация для сценария ниже? Успешно обработан, и я совершил эти сообщения < /p>
, затем, если мое приложение сбои или перезагрузки, получу ли я сообщение M1 снова для обработки?>
Подробнее здесь: https://stackoverflow.com/questions/794 ... sage-level
Как я могу управлять смещениями в Кафке на уровне сообщений ⇐ Python
-
- Похожие темы
- Ответы
- Просмотры
- Последнее сообщение