Как отправить сообщение об исключении в виде пользовательского заголовка в Spring Boot Kafka?JAVA

Программисты JAVA общаются здесь
Ответить
Anonymous
 Как отправить сообщение об исключении в виде пользовательского заголовка в Spring Boot Kafka?

Сообщение Anonymous »

Я написал приложение весенней загрузки, которое создает и потребляет данные через Kafka. Вот код -

Код: Выделить всё

package com.sushant.kafka.learning.services;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.sushant.kafka.learning.adapter.ErrorMessageAdapter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.DltHandler;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.RetryableTopic;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.retry.annotation.Backoff;
import org.springframework.stereotype.Service;

import com.sushant.kafka.learning.models.DTO.Location;

@Service
public class KafkaConsumerService {
private final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumerService.class);

@Autowired
private ErrorMessageAdapter errorMessageAdapter;

@Autowired
private GoogleChatService googleChatService;

@RetryableTopic(attempts = "2", backoff = @Backoff(value = 1000L, multiplier = 2, maxDelay = 10000L))
@KafkaListener(topics = "location", groupId = "myKafkaGroup", containerFactory = "containerFactory")
public void consumeLocationCoordinates(@Payload String locationData, Acknowledgment acknowledgment) throws Exception {
try {

Location location = mapLocation(locationData);
if (location.getSomeBoolean()) {
LOGGER.info("someBoolean is not null.");
}
acknowledgment.acknowledge();
} catch (Exception e) {
//            String errorMessage = errorMessageAdapter.buildErrorMessageForGoogleChat(locationData, e.getMessage(), "location");
//            googleChatService.sendMessageToGoogleChat(errorMessage);
LOGGER.error("Error while processing data: {}", e.getMessage());
throw e;
}
}

@DltHandler
private void consumeLocationCoordinatesErrorHandler(
@Payload String payload,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.EXCEPTION_FQCN) String exception
) throws Exception {
LOGGER.info("Sending to DLT on topic: {} and exception: {}", topic, exception);
String errorMessage = errorMessageAdapter.buildErrorMessageForGoogleChat(payload, exception, topic);
googleChatService.sendMessageToGoogleChat(errorMessage);
}

private Location mapLocation(final String locationData) throws Exception {
ObjectMapper objectMapper = new ObjectMapper();
return objectMapper.readValue(locationData, Location.class);
}
}
Я также добавил аннотацию @RetryableTopic для повтора попытки в случае какого-либо исключения. Затем с помощью @DltHandler я хочу получить доступ к исходному исключению, которое было создано.
В настоящее время я получаю это исключение Отправка в DLT по теме: location-dlt и исключение: org.springframework .kafka.listener.ListenerExecutionFailedException, и когда я добавляю строковое исключение @Header(KafkaHeaders.DLT_EXCEPTION_FQCN) в @DltHandler, оно начинает бесконечно писать сообщение в тему недоставленного письма и выдает эту ошибку Отсутствует заголовок 'kafka_dlt-Exception-fqcn' для типа параметра метода [класс java.lang.String]
Любой заголовок с префиксом DLT_ не работает. Я хочу получить доступ к исходному сообщению об исключении. В моем случае getSomeBoolean создаст NullPointerException

Подробнее здесь: https://stackoverflow.com/questions/791 ... boot-kafka
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «JAVA»