Потребитель Spring Cloud Stream Kafka в пакетном режиме не повторяет попыткуJAVA

Программисты JAVA общаются здесь
Ответить Пред. темаСлед. тема
Anonymous
 Потребитель Spring Cloud Stream Kafka в пакетном режиме не повторяет попытку

Сообщение Anonymous »

Я использую Spring Cloud Stream (версия 4.0.3) и Kafka Binder в приложении Spring Boot для пакетного получения сообщений из темы Kafka. При возникновении исключения весь пакет отправляется в тему DLQ без повторной попытки. Пожалуйста, помогите мне найти проблему.
Ниже приведены мои повторы и конфигурация dlq

Код: Выделить всё

@Configuration
public class KafkaRetryConfig {

@Bean
public ListenerContainerCustomizer customizer(KafkaOperations bytesTemplate) {
return (container, destinationName, group) -> {
container.setCommonErrorHandler(new DefaultErrorHandler(new DeadLetterPublishingRecoverer(bytesTemplate), new FixedBackOff(5000L, 5L)));
};
}
}
Ниже приведен потребительский код Kafka

Код: Выделить всё

@Bean
public Consumer recordsConsumer() {
return message -> {
List records= message.getPayload();
int index = IntStream.range(0, records.size())
.filter(streamIndex -> records.get(streamIndex).getId().equals("abc123"))
.findFirst()
.orElse(-1);
Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
assert acknowledgment != null;
try {
if (index >  -1) {
throw new RuntimeException("runtime exception");
}
//message processing logic
acknowledgment.acknowledge();
} catch (Exception e) {
throw new BatchListenerFailedException(records.get(index).toString(),index);
}
};
}
Ниже приведены свойства моего приложения

Код: Выделить всё

spring:
cloud:
stream:
default-binder: kafka
default:
contentType: application/*+avro
consumer:
useNativeDecoding: true
autoStartup: false
producer:
useNativeEncoding: true
kafka:
binder:
autoCreateTopics: false
brokers: broker
configuration:
enable:
auto.commit: false
idempotence: true
max.in.flight.requests.per.connection: 1
request.timeout.ms: 5000
security.protocol: SASL_SSL
sasl:
kerberos:
service:
name: service-name
jaas:
config: com.sun.security.auth.module.Krb5LoginModule required
doNotPrompt=true
useKeyTab=true
useTicketCache=false
storeKey=true
keyTab="xyz.keytab"
principal="[email protected]";
ssl:
endpoint.identification.algorithm:
truststore:
type: JKS
location: /config/global/payx-cacerts/cacerts
password: changeit
consumer-properties:
client.id: hrs-productsubscription-consumer-test-9
key.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
value.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
specific.avro.reader: true
schema.registry.url: schema-registry-url
#fetch.max.wait.ms: 60000
max.poll.records: 200
requiredAcks: -1
producer-properties:
key.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
bindings:
recordsConsumer-in-0:
consumer:
#ackMode: MANUAL
startOffset: earliest
resetOffsets: false
autoCommitOffset: false
enableDlq: true
dlqName: dlq-topic-name
dlqPartitions: 1
dlqProducerProperties:
configuration:
key.serializer: org.apache.kafka.common.serialization.StringSerializer
value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
schema.registry.url: schema-registry-url
configuration:
group.id: group-id
schema.registry.url: schema-registry-url
autoStartup: true
key.deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
value.deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
spring:
deserializer:
key:
delegate.class: org.apache.kafka.common.serialization.StringDeserializer
value:
delegate.class: io.confluent.kafka.streams.serdes.avro.SpecificAvroDeserializer
bindings:
recordsConsumer-in-0:
consumer:
batch-mode: true
max-attempts: 2
destination: topic-name
group: group-name
partitioned: true
concurrency: 8
Похоже, конфигурация повтора в моем приложении не работает.

Подробнее здесь: https://stackoverflow.com/questions/786 ... t-retrying
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «JAVA»