Я пытаюсь получить максимальное значение в теме Kafka, которая имеет несколько разделов. Для этого я беру значение максимального смещения каждого раздела и оставляю самое высокое. Если я запускаю следующий код только после выбора одного раздела, все работает нормально:
c.assign([tp_offset]) c.seek(tp_offset) сообщение = c.poll(1.0) номер_блока = json.loads(msg.value().decode('utf-8'))["номер_блока"] печать (номер_блока) Однако, когда я пытаюсь запустить его в цикле для прохождения всех разделов, я получаю следующую ошибку во второй раз, когда он выполняется:
cimpl.KafkaException: KafkaError{code=_STATE,val=-172,str="Не удалось найти смещение 29: локально: ошибочное состояние"
Я думаю, что это связано с перемещением потребителя в другой раздел, но я не совсем уверен, как это исправить, любая помощь будет приветствоваться. Заранее большое спасибо
