Сброс смещения для раздела не работаетJAVA

Программисты JAVA общаются здесь
Ответить
Anonymous
 Сброс смещения для раздела не работает

Сообщение Anonymous »

У меня есть тема «апельсины» с 10 разделами, 2 потребителями в 1 группе потребителей. Я использую Spring Kafka.
Мне нужно время от времени перечитывать данные и сбрасывать смещения. Мой слушатель реализует ConsumerSeekAware, и в onPartitionsAssigned() я просто вызываю callback#seekToBeginning. Это работает нормально, поскольку в журнале я вижу сообщения от Kafka Client API (2.3.1) следующего содержания:
Сброс смещения для раздела Oranges-X до смещения 0. Это нормально происходит для всех разделов.
Однако фактически сбрасывается только последний раздел (9), а время от времени, если мне повезет, и второй (1). Все остальные вообще не сбрасываются.
Что вызывает у меня настоящую головную боль: если я исключаю раздел 9 из списка разделов, подлежащих сбросу, все остальные разделы сбрасываются нормально, и все работает так, как ожидалось.
Код очень прост:
class ... implements ConsumerSeekAware {
@Override
public void onPartitionsAssigned(Map assignments, ConsumerSeekCallback callback) {
...
callback.seekToBeginning(topicPartition.topic(), topicPartition.partition());

}
...

Журналы:
19 Jun 09:56:49.442] [INFO] [{}] [org.apache.kafka.clients.consumer.internals.SubscriptionState] - [Consumer clientId=orange-0, groupId=avaloq.fints.acpadapter] Resetting offset for partition orange-9 to offset 0.
[19 Jun 09:56:49.443] [INFO] [{}] [org.apache.kafka.clients.consumer.internals.SubscriptionState] - [Consumer clientId=orange-0, groupId=avaloq.fints.acpadapter] Resetting offset for partition orange-8 to offset 0.
[19 Jun 09:56:49.443] [INFO] [{}] [org.apache.kafka.clients.consumer.internals.SubscriptionState] - [Consumer clientId=orange-0, groupId=avaloq.fints.acpadapter] Resetting offset for partition orange-1 to offset 0.
[19 Jun 09:56:49.443] [INFO] [{}] [org.apache.kafka.clients.consumer.internals.SubscriptionState] - [Consumer clientId=orange-0, groupId=avaloq.fints.acpadapter] Resetting offset for partition orange-0 to offset 0.
[19 Jun 09:56:49.443] [INFO] [{}] [org.apache.kafka.clients.consumer.internals.SubscriptionState] - [Consumer clientId=orange-0, groupId=avaloq.fints.acpadapter] Resetting offset for partition orange-3 to offset 0.
[19 Jun 09:56:49.443] [INFO] [{}] [org.apache.kafka.clients.consumer.internals.SubscriptionState] - [Consumer clientId=orange-0, groupId=avaloq.fints.acpadapter] Resetting offset for partition orange-2 to offset 0.
[19 Jun 09:56:49.443] [INFO] [{}] [org.apache.kafka.clients.consumer.internals.SubscriptionState] - [Consumer clientId=orange-0, groupId=avaloq.fints.acpadapter] Resetting offset for partition orange-5 to offset 0.
[19 Jun 09:56:49.443] [INFO] [{}] [org.apache.kafka.clients.consumer.internals.SubscriptionState] - [Consumer clientId=orange-0, groupId=avaloq.fints.acpadapter] Resetting offset for partition orange-4 to offset 0.
[19 Jun 09:56:49.443] [INFO] [{}] [org.apache.kafka.clients.consumer.internals.SubscriptionState] - [Consumer clientId=orange-0, groupId=avaloq.fints.acpadapter] Resetting offset for partition orange-7 to offset 0.
[19 Jun 09:56:49.443] [INFO] [{}] [org.apache.kafka.clients.consumer.internals.SubscriptionState] - [Consumer clientId=orange-0, groupId=avaloq.fints.acpadapter] Resetting offset for partition orange-6 to offset 0.


Подробнее здесь: https://stackoverflow.com/questions/624 ... s-not-work
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «JAVA»