CommitFailedException Фиксация не может быть завершена, поскольку группа уже выполнила повторную балансировку и назначил ⇐ JAVA
CommitFailedException Фиксация не может быть завершена, поскольку группа уже выполнила повторную балансировку и назначил
Я использовал Kafka 0.10.2 и теперь столкнулся с CommitFailedException. нравится:
Коммит не может быть завершен, так как группа уже прошла ребалансировку и назначил разделы другому участнику. Это означает, что время между последующими вызовами poll() было дольше настроенного max.poll.interval.ms, что обычно подразумевает, что цикл опроса тратить слишком много времени на обработку сообщений. Вы можете решить эту проблему либо увеличив тайм-аут сеанса или уменьшив максимальный размер пакеты, возвращаемые в poll() с max.poll.records.
Я установил для max.poll.interval.ms значение Integer.MAX_VALUE. так может ли кто-нибудь сказать мне, почему это все еще происходит, даже если я установил значение?
Еще один вопрос: Я делаю, как описано, чтобы установить для session.timeout.ms значение 60000, и это все равно происходит. Я пытаюсь воспроизвести простым кодом
public static void main(String[] args) выдает InterruptedException { Регистратор logger = Logger.getLogger(KafkaConsumer10.class); logger.info("ХХ"); Свойства props = новые свойства(); props.put("bootstrap.servers", "kafka-broker:9098"); props.put("group.id", "тест"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("max.poll.interval.ms", "300000"); props.put("session.timeout.ms", "10000"); props.put("max.poll.records", "2"); KafkaConsumer потребитель = новый KafkaConsumer(реквизит); потребитель.подписка(Arrays.asList("t1")); в то время как (истина) { Thread.sleep(11000); ConsumerRecords записи = Consumer.poll(100); //Thread.sleep(11000); Thread.sleep(11000); для (ConsumerRecord запись: записи) System.out.printf("смещение = %d, ключ = %s, значение = %s%n", запись.смещение(), запись.ключ(), запись.значение()); } Когда я устанавливаю для session.timeout.ms значение 10000, я пытаюсь спать более 10000 мс в цикле опроса, но, похоже, это работает, и никаких исключений не возникает. так что я в замешательстве по этому поводу. если Heartbeat запускается Consumer.poll и Consumer.commit, кажется, что в моем коде Heartbeat истек по тайм-ауту сеанса. почему бы не выбросить CommitFailedException?
Я использовал Kafka 0.10.2 и теперь столкнулся с CommitFailedException. нравится:
Коммит не может быть завершен, так как группа уже прошла ребалансировку и назначил разделы другому участнику. Это означает, что время между последующими вызовами poll() было дольше настроенного max.poll.interval.ms, что обычно подразумевает, что цикл опроса тратить слишком много времени на обработку сообщений. Вы можете решить эту проблему либо увеличив тайм-аут сеанса или уменьшив максимальный размер пакеты, возвращаемые в poll() с max.poll.records.
Я установил для max.poll.interval.ms значение Integer.MAX_VALUE. так может ли кто-нибудь сказать мне, почему это все еще происходит, даже если я установил значение?
Еще один вопрос: Я делаю, как описано, чтобы установить для session.timeout.ms значение 60000, и это все равно происходит. Я пытаюсь воспроизвести простым кодом
public static void main(String[] args) выдает InterruptedException { Регистратор logger = Logger.getLogger(KafkaConsumer10.class); logger.info("ХХ"); Свойства props = новые свойства(); props.put("bootstrap.servers", "kafka-broker:9098"); props.put("group.id", "тест"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("max.poll.interval.ms", "300000"); props.put("session.timeout.ms", "10000"); props.put("max.poll.records", "2"); KafkaConsumer потребитель = новый KafkaConsumer(реквизит); потребитель.подписка(Arrays.asList("t1")); в то время как (истина) { Thread.sleep(11000); ConsumerRecords записи = Consumer.poll(100); //Thread.sleep(11000); Thread.sleep(11000); для (ConsumerRecord запись: записи) System.out.printf("смещение = %d, ключ = %s, значение = %s%n", запись.смещение(), запись.ключ(), запись.значение()); } Когда я устанавливаю для session.timeout.ms значение 10000, я пытаюсь спать более 10000 мс в цикле опроса, но, похоже, это работает, и никаких исключений не возникает. так что я в замешательстве по этому поводу. если Heartbeat запускается Consumer.poll и Consumer.commit, кажется, что в моем коде Heartbeat истек по тайм-ауту сеанса. почему бы не выбросить CommitFailedException?
-
- Похожие темы
- Ответы
- Просмотры
- Последнее сообщение