Как я могу управлять смещениями в Кафке на уровне сообщенийPython

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 Как я могу управлять смещениями в Кафке на уровне сообщений

Сообщение Anonymous »

Это мой код: < /p>
@retry(stop=stop_after_attempt(3), wait=wait_fixed(2))
def kafka_consumer(self):
"""Consume messages from Kafka with retry logic."""
consumer = Consumer({
"bootstrap.servers": "localhost:9092",
"group.id": "test_group",
"auto.offset.reset": "earliest",
"enable.auto.commit": False
})
consumer.subscribe(["test_topic"])

try:
logging.info("Kafka Consumer started.")
while True:
msg = consumer.poll(timeout=1.0)

if msg is None:
continue

if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
else:
logging.error(f"Kafka error: {msg.error()}")
raise KafkaException(msg.error())

data = msg.value().decode()
logging.info(f"Received from Kafka: {data}")

# Check if event loop is running
if not self.loop.is_running():
logging.error("Event loop is not running!")
continue

# Make http call in the background
future = asyncio.run_coroutine_threadsafe(
self.send_post_request(data), self.loop
)

try:
success = future.result(timeout=5) #
except Exception as e:
logging.error(f"Error in send_post_request: {e}")
success = False

if success:
consumer.commit(message=msg)
else:
logging.warning(f"Message retained for retry: {data}")

except KafkaException as e:
logging.error(f"Kafka connection error: {e}")
raise
finally:
consumer.close()
< /code>
Когда я получаю успешный результат, я совершаю смещение в Kafka для этого сообщения,
работает ли эта реализация для сценария ниже? Успешно обработан, и я совершил эти сообщения < /p>
, затем, если мое приложение сбои или перезагрузки, получу ли я сообщение M1 снова для обработки?>

Подробнее здесь: https://stackoverflow.com/questions/794 ... sage-level
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение
  • Как управлять выбранными форматами сообщений в произвольном типе сообщений
    Гость » » в форуме Php
    0 Ответы
    61 Просмотры
    Последнее сообщение Гость
  • Просроченные записи в Кафке
    Anonymous » » в форуме JAVA
    0 Ответы
    16 Просмотры
    Последнее сообщение Anonymous
  • Невозможно подключиться к Кафке, попав в продюсер, в докере-композите
    Anonymous » » в форуме JAVA
    0 Ответы
    14 Просмотры
    Последнее сообщение Anonymous
  • Как выполнять поиск по ключу содержит в Кафке-Уи?
    Anonymous » » в форуме JAVA
    0 Ответы
    9 Просмотры
    Последнее сообщение Anonymous
  • Как выполнять поиск по ключу содержит в Кафке-Уи?
    Anonymous » » в форуме JAVA
    0 Ответы
    2 Просмотры
    Последнее сообщение Anonymous

Вернуться в «Python»