IdempotencyBreachExceptionHandling в службе, используемой в прослушивателе KafkaJAVA

Программисты JAVA общаются здесь
Ответить
Anonymous
 IdempotencyBreachExceptionHandling в службе, используемой в прослушивателе Kafka

Сообщение Anonymous »

Я столкнулся с проблемой возможного исключения idempotencyBreachException в службе, используемой прослушивателем Kafka.
Я не совсем уверен, как правильно справиться с этой ситуацией. У меня есть lockExecutor, который блокирует заданный идентификатор клиента и кеш, используемый в качестве идемпотентного репозитория. В 99,99% случаев все будет работать так, как ожидалось, но меня беспокоит сценарий, когда в конечном итоге возникает исключение IdempotencyBreachException и в базе данных сохраняются 2xdata. Должен ли я просто зарегистрировать ошибку и обработать ее вручную, когда она произойдет, или есть какой-то лучший способ справиться с ней?
Я рассматривал возможность использования @Retryable в методе обработки, но это создает проблему: ключ кэша уже использован, и я знаю, что не следует его удалять. Кроме того, меня беспокоит, как в этом контексте повторные попытки будут взаимодействовать с откатами, поскольку это будет параллельная обработка. На данный момент единственное решение, которое приходит на ум, — это зарегистрировать исключение и при необходимости обработать его вручную в будущем.
@Service
@RequiredArgsConstructor
public class ProcessingEventsServiceImpl {

private final IdempotencyRepository idempotencyRepository;
private final TransactionalService transactionalService;
private final LockExecutor lockExecutor;

public void process(InboundData inbound) {

final String key = inbound.getTransactionId();
final String customerId = inbound.getCustomerId();
final Provenance provenance = inbound.getDataProvenance();

lockExecutor.run(customerId, () -> {

if (idempotencyRepository.isKeyUsed(provenance, key)) {
return;
}

transactionalService.executeTransactionProcess(inbound);
try {
idempotencyRepository.markKeyAsUsed(provenance, key);

} catch (IdempotencyBreachException e) {
log.error(...);
}
});
}
}

@Component
@RequiredArgsConstructor
public class KafkaListener {

private final ProcessingService processingService;
private final DataMapper mapper;

private static final Logger LOGGER =
LoggerFactory.getLogger(KafkaListener.class);

@KafkaListener(topics = "topic")
public void onEvent(Event event) {
var data = mapper.mapEventToModel(event);
processingService.process(data);
}
}

@Service
@RequiredArgsConstructor
public class TransactionalService {
private final InboundRepository repository;
private final InboundMapper mapper;

@Transactional
public void executeTransactionalProcess(TransferData inbound) {
var entity = repository.saveOrUpdate(inbound);
repository.save(entity);
}
}


Подробнее здесь: https://stackoverflow.com/questions/798 ... a-listener
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «JAVA»