принимает информацию о платеже < /li>
генерирует уникальный идентификатор платежа (PUID) < /li>
Поддает информацию о платеже в тему < /li>
То же, что и то же самое, что -то вроде. Потребление статуса завершения платежей из другой темы, где в ключе является PUID, а стоимость - это объект статуса оплаты. План состоит в том, чтобы < /p>
запустить событие, как только глобальная таблица K получает статус платежа < /li>
Это событие должно проверить, получен ли PUID в таблице Global K, та же PUID, который мы ждем на шаге 4. Puid < /li>
вернуть статус платежа пользователю < /li>
< /ol>
Вопрос в том, как запустить этот обратный вызов на шаге 5? Это приложение для Spring Boot. < /P>
Я определил свои свойства кафки, подобные этому < /p>
# Kafka common
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
# Function definition
spring.cloud.function.definition=paymentStatus
# Kafka Streams binder config
spring.cloud.stream.bindings.paymentStatus-in-0.destination=payment-topic
spring.cloud.stream.kafka.streams.binder.functions.paymentStatus.applicationId=payment-submission-global-ktable-app
spring.cloud.stream.kafka.streams.binder.functions.paymentStatus.configuration.state.dir=C:/kafka-streams/paymentStatus
# Binding-specific Kafka Streams consumer materialization
spring.cloud.stream.bindings.paymentStatus-in-0.consumer.materializedAs=payment-status-global-state-store
spring.cloud.stream.bindings.paymentStatus-in-0.consumer.valueSerde=org.springframework.kafka.support.serializer.JsonSerde
< /code>
Это то, о чем я думал < /p>
@Component
public class PaymentStatusKafkaConsumer {
private final ApplicationEventPublisher publisher;
private final PuidWaitRegistry registry;
public PaymentStatusKafkaConsumer(ApplicationEventPublisher publisher, PuidWaitRegistry registry) {
this.publisher = publisher;
this.registry = registry;
}
@Bean
public Function stream() {
return input -> {
input.peek((key, value) -> {
if (value != null && value.getPuid() != null) {
if (registry.isWaitingFor(value.getPuid())) {
//
publisher.publishEvent(new PaymentStatusEvent(value.getPuid()));
}
}
});
return input;
};
}
}
@Component
public class PuidWaitRegistry {
private final ConcurrentMap waitingMap = new ConcurrentHashMap();
public CompletableFuture register(String puid) {
CompletableFuture future = new CompletableFuture();
waitingMap.put(puid, future);
return future;
}
public boolean isWaitingFor(String puid) {
return waitingMap.containsKey(puid);
}
public void complete(String puid, PasResponse response) {
CompletableFuture future = waitingMap.remove(puid);
if (future != null && !future.isDone()) {
future.complete(response);
}
}
public void fail(String puid, Throwable t) {
CompletableFuture future = waitingMap.remove(puid);
if (future != null && !future.isDone()) {
future.completeExceptionally(t);
}
}
}
< /code>
Только проблема с этим, которую я вижу в том, что Kstream будет привязан к разделу и не получит всех Puids. Что, если мы ждем PUID - 1234, и таблица Global K получила статус платежа за это, но Kstream в нашем экземпляре приложения, который ожидает PUID 1234, не получает этот PUID, и событие приложения никогда не запускается, хотя GlobalkTable получила данные. В конечном итоге это будет время ожидания после 12 секунд, и пользователь не получит правильный ответ < /p>
Есть ли способ достичь этого с помощью подхода, управляемого событием? Мы не хотим опросить Globalktable через каждые 100 мс или 200 мс, чтобы проверить, получили ли мы данные в таблице Global K.
Подробнее здесь: https://stackoverflow.com/questions/797 ... eives-data
Мобильная версия