Код: Выделить всё
package com.sushant.kafka.learning.services;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.sushant.kafka.learning.adapter.ErrorMessageAdapter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.DltHandler;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.RetryableTopic;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.retry.annotation.Backoff;
import org.springframework.stereotype.Service;
import com.sushant.kafka.learning.models.DTO.Location;
@Service
public class KafkaConsumerService {
private final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumerService.class);
@Autowired
private ErrorMessageAdapter errorMessageAdapter;
@Autowired
private GoogleChatService googleChatService;
@RetryableTopic(attempts = "2", backoff = @Backoff(value = 1000L, multiplier = 2, maxDelay = 10000L))
@KafkaListener(topics = "location", groupId = "myKafkaGroup", containerFactory = "containerFactory")
public void consumeLocationCoordinates(@Payload String locationData, Acknowledgment acknowledgment) throws Exception {
try {
Location location = mapLocation(locationData);
if (location.getSomeBoolean()) {
LOGGER.info("someBoolean is not null.");
}
acknowledgment.acknowledge();
} catch (Exception e) {
// String errorMessage = errorMessageAdapter.buildErrorMessageForGoogleChat(locationData, e.getMessage(), "location");
// googleChatService.sendMessageToGoogleChat(errorMessage);
LOGGER.error("Error while processing data: {}", e.getMessage());
throw e;
}
}
@DltHandler
private void consumeLocationCoordinatesErrorHandler(
@Payload String payload,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.EXCEPTION_FQCN) String exception
) throws Exception {
LOGGER.info("Sending to DLT on topic: {} and exception: {}", topic, exception);
String errorMessage = errorMessageAdapter.buildErrorMessageForGoogleChat(payload, exception, topic);
googleChatService.sendMessageToGoogleChat(errorMessage);
}
private Location mapLocation(final String locationData) throws Exception {
ObjectMapper objectMapper = new ObjectMapper();
return objectMapper.readValue(locationData, Location.class);
}
}
В настоящее время я получаю это исключение Отправка в DLT по теме: location-dlt и исключение: org.springframework .kafka.listener.ListenerExecutionFailedException, и когда я добавляю строковое исключение @Header(KafkaHeaders.DLT_EXCEPTION_FQCN) в @DltHandler, оно начинает бесконечно писать сообщение в тему недоставленного письма и выдает эту ошибку Отсутствует заголовок 'kafka_dlt-Exception-fqcn' для типа параметра метода [класс java.lang.String]
Любой заголовок с префиксом DLT_ не работает. Я хочу получить доступ к исходному сообщению об исключении. В моем случае getSomeBoolean создаст NullPointerException
Подробнее здесь: https://stackoverflow.com/questions/791 ... boot-kafka
Мобильная версия