Как запустить событие или обратный вызов каждый раз, когда Globalktable получает данныеJAVA

Программисты JAVA общаются здесь
Ответить
Anonymous
 Как запустить событие или обратный вызов каждый раз, когда Globalktable получает данные

Сообщение Anonymous »

У меня есть услуга, которая < /p>

принимает информацию о платеже < /li>
генерирует уникальный идентификатор платежа (PUID) < /li>
Поддает информацию о платеже в тему < /li>
То же, что и то же самое, что -то вроде. Потребление статуса завершения платежей из другой темы, где в ключе является PUID, а стоимость - это объект статуса оплаты. План состоит в том, чтобы < /p>

запустить событие, как только глобальная таблица K получает статус платежа < /li>
Это событие должно проверить, получен ли PUID в таблице Global K, та же PUID, который мы ждем на шаге 4. Puid < /li>
вернуть статус платежа пользователю < /li>
< /ol>
Вопрос в том, как запустить этот обратный вызов на шаге 5? Это приложение для Spring Boot. < /P>
Я определил свои свойства кафки, подобные этому < /p>
# Kafka common
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
# Function definition
spring.cloud.function.definition=paymentStatus
# Kafka Streams binder config
spring.cloud.stream.bindings.paymentStatus-in-0.destination=payment-topic
spring.cloud.stream.kafka.streams.binder.functions.paymentStatus.applicationId=payment-submission-global-ktable-app
spring.cloud.stream.kafka.streams.binder.functions.paymentStatus.configuration.state.dir=C:/kafka-streams/paymentStatus
# Binding-specific Kafka Streams consumer materialization
spring.cloud.stream.bindings.paymentStatus-in-0.consumer.materializedAs=payment-status-global-state-store
spring.cloud.stream.bindings.paymentStatus-in-0.consumer.valueSerde=org.springframework.kafka.support.serializer.JsonSerde
< /code>
Это то, о чем я думал < /p>
@Component
public class PaymentStatusKafkaConsumer {
private final ApplicationEventPublisher publisher;
private final PuidWaitRegistry registry;
public PaymentStatusKafkaConsumer(ApplicationEventPublisher publisher, PuidWaitRegistry registry) {
this.publisher = publisher;
this.registry = registry;
}
@Bean
public Function stream() {
return input -> {
input.peek((key, value) -> {
if (value != null && value.getPuid() != null) {
if (registry.isWaitingFor(value.getPuid())) {
// 🔥 Only publish event if someone is waiting for this PUID
publisher.publishEvent(new PaymentStatusEvent(value.getPuid()));
}
}
});
return input;
};
}
}

@Component
public class PuidWaitRegistry {
private final ConcurrentMap waitingMap = new ConcurrentHashMap();
public CompletableFuture register(String puid) {
CompletableFuture future = new CompletableFuture();
waitingMap.put(puid, future);
return future;
}
public boolean isWaitingFor(String puid) {
return waitingMap.containsKey(puid);
}
public void complete(String puid, PasResponse response) {
CompletableFuture future = waitingMap.remove(puid);
if (future != null && !future.isDone()) {
future.complete(response);
}
}
public void fail(String puid, Throwable t) {
CompletableFuture future = waitingMap.remove(puid);
if (future != null && !future.isDone()) {
future.completeExceptionally(t);
}
}
}
< /code>
Только проблема с этим, которую я вижу в том, что Kstream будет привязан к разделу и не получит всех Puids. Что, если мы ждем PUID - 1234, и таблица Global K получила статус платежа за это, но Kstream в нашем экземпляре приложения, который ожидает PUID 1234, не получает этот PUID, и событие приложения никогда не запускается, хотя GlobalkTable получила данные. В конечном итоге это будет время ожидания после 12 секунд, и пользователь не получит правильный ответ < /p>
Есть ли способ достичь этого с помощью подхода, управляемого событием? Мы не хотим опросить Globalktable через каждые 100 мс или 200 мс, чтобы проверить, получили ли мы данные в таблице Global K.


Подробнее здесь: https://stackoverflow.com/questions/797 ... eives-data
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «JAVA»