SPRING_CLOUD_FUNCTION_DEFINITION: upsertData
SPRING_CLOUD_STREAM_BINDINGS_upsertData-in-0_DESTINATION: UpsertDataEvent
spring.cloud.stream.bindings.upsertData-in-0.group: upsert-data-event-group
spring.cloud.stream.bindings.upsertDataGroup-in-0.content-type: application/json
SPRING_CLOUD_STREAM_BINDINGS_upsertData-out-0_DESTINATION: UpsertDataSuccessEvent
spring.cloud.stream.bindings.upsertData-out-0.producer.partitionCount: 1
spring.cloud.stream.bindings.upsertData-out-0.group: upsert-data-success-event-group
spring.cloud.stream.bindings.upsertData-out-0.content-type: application/json
Мне было поручено выполнить следующие требования клиента:
- Развернуть 12 экземпляров этого приложения, каждому назначено 12 разделов Kafka, по одному разделу на каждый экземпляр потребителя. Это легко сделать.
- В случае сбоя направьте входное сообщение в тему ошибки с сообщением об ошибке в качестве заголовка. Это можно легко реализовать.
- Реализуйте механизм, при котором после отправки сообщения в тему ошибки (специфического для раздела потребителя, где произошла ошибка), этот конкретный потребитель останавливает потребление до тех пор, пока тема ошибок удалена. Однако конечная точка HTTP приложения должна оставаться работоспособной, хотя потребление событий из темы ввода приостановлено.
@Component("upsertData")
@AllArgsConstructor
@Slf4j
public class MyCloudFunction implements Function {
private final StreamBridge streamBridge;
@Override
public Data apply(Message message) {
log.info("Upsert Data Message received: {}", message);
Data requestPayload = message.getPayload();
try {
var searchResult = // Get Data Logic
if (searchResult != null) {
// Update Data Logic
} else {
// Create Data Logic
}
log.info("Upsert Data Event processed successfully");
return requestPayload;
} catch (Exception e) {
log.error("Error processing message. Sending to error queue.", e);
sendMessageToErrorTopic(e, requestPayload);
return null;
}
}
private void sendMessageToErrorTopic(Exception e, Data requestPayload) {
Message errorMessage =
MessageBuilder.withPayload(requestPayload)
.setHeader("error-reason", e)
.build();
streamBridge.send("UpsertDataFailureEvent", errorMessage);
}
}
Подробнее здесь: https://stackoverflow.com/questions/784 ... ream-kafka