Обработка ошибок с помощью весеннего облачного потока KafkaJAVA

Программисты JAVA общаются здесь
Ответить Пред. темаСлед. тема
Anonymous
 Обработка ошибок с помощью весеннего облачного потока Kafka

Сообщение Anonymous »

Я использую Spring-cloud-stream-binder-kafka в своем приложении Spring Boot, имеющем функцию Spring Cloud, отвечающую за операции обновления. После успешного выполнения эта функция пересылает входное сообщение как выходное. Конфигурация этой функции описана ниже:
SPRING_CLOUD_FUNCTION_DEFINITION: upsertData

SPRING_CLOUD_STREAM_BINDINGS_upsertData-in-0_DESTINATION: UpsertDataEvent
spring.cloud.stream.bindings.upsertData-in-0.group: upsert-data-event-group
spring.cloud.stream.bindings.upsertDataGroup-in-0.content-type: application/json

SPRING_CLOUD_STREAM_BINDINGS_upsertData-out-0_DESTINATION: UpsertDataSuccessEvent
spring.cloud.stream.bindings.upsertData-out-0.producer.partitionCount: 1
spring.cloud.stream.bindings.upsertData-out-0.group: upsert-data-success-event-group
spring.cloud.stream.bindings.upsertData-out-0.content-type: application/json

Мне было поручено выполнить следующие требования клиента:
  • Развернуть 12 экземпляров этого приложения, каждому назначено 12 разделов Kafka, по одному разделу на каждый экземпляр потребителя. Это легко сделать.
  • В случае сбоя направьте входное сообщение в тему ошибки с сообщением об ошибке в качестве заголовка. Это можно легко реализовать.
  • Реализуйте механизм, при котором после отправки сообщения в тему ошибки (специфического для раздела потребителя, где произошла ошибка), этот конкретный потребитель останавливает потребление до тех пор, пока тема ошибок удалена. Однако конечная точка HTTP приложения должна оставаться работоспособной, хотя потребление событий из темы ввода приостановлено.
Ниже я представил свою текущую реализацию. Однако я не уверен, как выполнить требование 3:
@Component("upsertData")
@AllArgsConstructor
@Slf4j
public class MyCloudFunction implements Function {

private final StreamBridge streamBridge;

@Override
public Data apply(Message message) {
log.info("Upsert Data Message received: {}", message);
Data requestPayload = message.getPayload();

try {
var searchResult = // Get Data Logic

if (searchResult != null) {
// Update Data Logic
} else {
// Create Data Logic
}

log.info("Upsert Data Event processed successfully");

return requestPayload;
} catch (Exception e) {
log.error("Error processing message. Sending to error queue.", e);
sendMessageToErrorTopic(e, requestPayload);
return null;
}
}

private void sendMessageToErrorTopic(Exception e, Data requestPayload) {
Message errorMessage =
MessageBuilder.withPayload(requestPayload)
.setHeader("error-reason", e)
.build();
streamBridge.send("UpsertDataFailureEvent", errorMessage);
}

}



Подробнее здесь: https://stackoverflow.com/questions/784 ... ream-kafka
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение
  • Перенаправление весеннего облачного шлюза
    Anonymous » » в форуме JAVA
    0 Ответы
    5 Просмотры
    Последнее сообщение Anonymous
  • Перенаправление весеннего облачного шлюза
    Гость » » в форуме JAVA
    0 Ответы
    0 Просмотры
    Последнее сообщение Гость
  • Требуется руководство: обнаружение голосовой активности и обработка тайм-аута с помощью облачного API преобразования реч
    Anonymous » » в форуме C++
    0 Ответы
    52 Просмотры
    Последнее сообщение Anonymous
  • Kafka-реактор для повторного чтения того же сообщения Kafka, если обработка сообщения не удалась
    Anonymous » » в форуме JAVA
    0 Ответы
    109 Просмотры
    Последнее сообщение Anonymous
  • Обработка ошибок при загрузке Spring Kafka в пакетном прослушивателе
    Anonymous » » в форуме JAVA
    0 Ответы
    14 Просмотры
    Последнее сообщение Anonymous

Вернуться в «JAVA»