@Transactional @Override public void syncShop(Shop shop) { log.info("Syncing shop"); shopRepository.lockShop(shop.getId()); // @Query(native = true, value = "SELECT * FROM shop WHERE id = ? FOR UPDATE")
var products = shop.getProducts(); products.parallelStream() .forEach(product -> { log.info("Syncing product"); var seller = product.getSeller(); var sellerDetails = restClient.getLegalDetails(seller.getInn());
var dto = new ProductDto(); dto.setProduct(product); dto.setSellerDetails(sellerDetails);
kafkaTemplate.send("product_details", "product", dto); // topic, key, value
product.setSynced(true); productRepository.save(product); }); shop.setSynced(true); } } [/code] нужен restClient, сообщения Kafka не отправляются в случае сбоя сохранения БД