Ниже приведены мои повторы и конфигурация dlq
Код: Выделить всё
@Configuration
public class KafkaRetryConfig {
@Bean
public ListenerContainerCustomizer customizer(KafkaOperations bytesTemplate) {
return (container, destinationName, group) -> {
container.setCommonErrorHandler(new DefaultErrorHandler(new DeadLetterPublishingRecoverer(bytesTemplate), new FixedBackOff(5000L, 5L)));
};
}
}
Код: Выделить всё
@Bean
public Consumer recordsConsumer() {
return message -> {
List records= message.getPayload();
int index = IntStream.range(0, records.size())
.filter(streamIndex -> records.get(streamIndex).getId().equals("abc123"))
.findFirst()
.orElse(-1);
Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
assert acknowledgment != null;
try {
if (index > -1) {
throw new RuntimeException("runtime exception");
}
//message processing logic
acknowledgment.acknowledge();
} catch (Exception e) {
throw new BatchListenerFailedException(records.get(index).toString(),index);
}
};
}
Код: Выделить всё
spring:
cloud:
stream:
default-binder: kafka
default:
contentType: application/*+avro
consumer:
useNativeDecoding: true
autoStartup: false
producer:
useNativeEncoding: true
kafka:
binder:
autoCreateTopics: false
brokers: broker
configuration:
enable:
auto.commit: false
idempotence: true
max.in.flight.requests.per.connection: 1
request.timeout.ms: 5000
security.protocol: SASL_SSL
sasl:
kerberos:
service:
name: service-name
jaas:
config: com.sun.security.auth.module.Krb5LoginModule required
doNotPrompt=true
useKeyTab=true
useTicketCache=false
storeKey=true
keyTab="xyz.keytab"
principal="[email protected]";
ssl:
endpoint.identification.algorithm:
truststore:
type: JKS
location: /config/global/payx-cacerts/cacerts
password: changeit
consumer-properties:
client.id: hrs-productsubscription-consumer-test-9
key.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
value.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
specific.avro.reader: true
schema.registry.url: schema-registry-url
#fetch.max.wait.ms: 60000
max.poll.records: 200
requiredAcks: -1
producer-properties:
key.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
bindings:
recordsConsumer-in-0:
consumer:
#ackMode: MANUAL
startOffset: earliest
resetOffsets: false
autoCommitOffset: false
enableDlq: true
dlqName: dlq-topic-name
dlqPartitions: 1
dlqProducerProperties:
configuration:
key.serializer: org.apache.kafka.common.serialization.StringSerializer
value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
schema.registry.url: schema-registry-url
configuration:
group.id: group-id
schema.registry.url: schema-registry-url
autoStartup: true
key.deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
value.deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
spring:
deserializer:
key:
delegate.class: org.apache.kafka.common.serialization.StringDeserializer
value:
delegate.class: io.confluent.kafka.streams.serdes.avro.SpecificAvroDeserializer
bindings:
recordsConsumer-in-0:
consumer:
batch-mode: true
max-attempts: 2
destination: topic-name
group: group-name
partitioned: true
concurrency: 8
Подробнее здесь: https://stackoverflow.com/questions/786 ... t-retrying