Отправка пользовательского события в тему DLQ с использованием @retryabletopicJAVA

Программисты JAVA общаются здесь
Ответить
Anonymous
 Отправка пользовательского события в тему DLQ с использованием @retryabletopic

Сообщение Anonymous »

Я использую Spring Boot с Kafka и @retryabletopic аннотация.
Чего я хочу достичь:
Сообщения, которые в конечном итоге в DLQ должны быть обернуты в пользовательский тип (DlQeventWrapper), который включает в себя оригинальную загрузку плюс некоторые ошибки MetAdata. Пользовательский DeadtletterPublishingRecoverer (как как @Bean, так и как @Component) и переопределение CreateProducerRecord (...), чтобы я мог обернуть исходное событие, прежде чем оно будет отправлено в тему DLQ. Вместо этого @retryabletopic продолжает использовать восстановитель по умолчанию, который просто переиздает оригинальный Originalevent в DLQ. Потому что мой @dlthandler написан, чтобы принять DlqeventWrapper, Spring бросает ошибку преобразования до того, как мой @dlthandler даже будет вызван.

Код: Выделить всё

@KafkaListener(topics = "my-topic", groupId = "my-group")
@RetryableTopic(
attempts = "2",
backoff = @Backoff(delay = 1000),
dltStrategy = DltStrategy.FAIL_ON_ERROR,
autoCreateTopics = "false",
topicSuffixingStrategy = SuffixingWithIndexTopicNamingStrategy.SUFFIX_WITH_INDEX_VALUE,
sameIntervalTopicReuseStrategy = SameIntervalReuseTopicReuseStrategy.MULTIPLE_TOPICS,
retryTopicSuffix = ".RETRY",
dltSuffix = ".DLQ"
)
public void listen(OriginalEvent event) {
throw new RuntimeException("Failing for test");
}

@DltHandler
public void handleDlq(DlqEventWrapper event) {
log.error("Received DLQ event: {}", event);
}

< /code>
Я попытался настроить DeadletterPublishingRecoverer, чтобы обернуть события: < /p>

@Bean
public DeadLetterPublishingRecoverer deadLetterPublishingRecoverer(KafkaTemplate template) {
return new DeadLetterPublishingRecoverer(template,
(record, exception) -> new TopicPartition(record.topic() + ".DLQ", record.partition())) {

@Override
protected ProducerRecord createProducerRecord(
ConsumerRecord record,
TopicPartition topicPartition,
Headers headers,
byte[] key,
byte[] value,
Exception exception) {

DlqEventWrapper wrapper = new DlqEventWrapper();
wrapper.setOriginalEvent(record.value());
wrapper.setErrorMessage(exception != null ? exception.getMessage() : "unknown");
wrapper.setTimestamp(System.currentTimeMillis());

return new ProducerRecord(topicPartition.topic(), null, wrapper);
}
};
}
К вашему сведению, с помощью Spring Kafka версии 3.3.9.


Подробнее здесь: https://stackoverflow.com/questions/797 ... yabletopic
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «JAVA»