Как настроить повторяющиеся исключения для потребителей, когда транзакции Kafka включены?JAVA

Программисты JAVA общаются здесь
Ответить
Anonymous
 Как настроить повторяющиеся исключения для потребителей, когда транзакции Kafka включены?

Сообщение Anonymous »

Мы пытаемся настроить повторяющиеся исключения вместе с транзакциями Kafka и очередями недоставленных писем в Spring Cloud Stream. То есть мы хотим указать, какие исключения следует повторить.
Однако кажется, что как только вы включите транзакции (установив Spring.cloud.stream.kafka.binder.transaction .transaction-id-prefix), повторные попытки по умолчанию и повторные исключения игнорируются. Нужно ли их настроить по-другому?
Вот минимальный пример, воспроизводящий проблему. Просто определите @SpringBootApplication с потребителем, который всегда выдает IllegalArgumentException:

Код: Выделить всё

@Configuration
@Slf4j
public class ConsumerConfig {
@Bean
public Consumer consumeMessage() {
return s -> {
log.info("Consuming {}", s);
throw new IllegalArgumentException(s);
};
}
}
и application.yml:

Код: Выделить всё

spring:
cloud:
function:
definition: consumeMessage
stream:
kafka:
binder:
transaction:
transaction-id-prefix: transaction-
required-acks: all
configuration:
key.serializer: org.apache.kafka.common.serialization.StringSerializer
key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
bindings:
consumeMessage-in-0:
consumer:
enable-dlq: true
bindings:
consumeMessage-in-0:
group: my-group
destination: my-topic
consumer:
default-retryable: false
max-attempts: 5
back-off-initial-interval: 100
retryable-exceptions:
java.lang.UnsupportedOperationException: true
java.lang.IllegalArgumentException: false
Это касается Spring Boot 3.4.0 и Spring Cloud 2024.0.0 и следующих зависимостей:

Код: Выделить всё

        
org.apache.kafka
kafka-streams


org.springframework.cloud
spring-cloud-stream


org.springframework.cloud
spring-cloud-starter-stream-kafka


org.springframework.cloud
spring-cloud-stream-binder-kafka-streams


org.projectlombok
lombok
provided

Я также запустил проект GitHub с тем же кодом.
Когда я отправляю сообщение в своей теме, оно будет повторите попытку 5 раз, несмотря на повторную попытку по умолчанию: false и java.lang.IllegalArgumentException: false. Если я отключу префикс идентификатора транзакции, он будет работать так, как задумано.
Погружаясь в код Spring Cloud Stream, я обнаружил, что KafkaMessageChannelBinder установит RetryTemplate< /code> настраивается с помощью buildRetryTemplate(properties), если TransactionManager отсутствует, но если он есть, вместо этого будет настроен AfterRollbackProcessor, передавая ему только BackOff без использования конфигурации повторяющихся исключений.

Подробнее здесь: https://stackoverflow.com/questions/793 ... ctions-are
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «JAVA»