Конфигурация производителя:
Код: Выделить всё
@Configuration
public class KafkaProducerConfig {
@Value("${bootstrap.servers}")
private String boostrapServers;
public Map producerConfig() {
Map producerConfig = new HashMap();
producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, boostrapServers);
producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
producerConfig.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
producerConfig.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transaction-event-id");
return producerConfig;
}
@Bean
public ProducerFactory producerFactory() {
DefaultKafkaProducerFactory defaultKafkaProducerFactory =
new DefaultKafkaProducerFactory(producerConfig());
defaultKafkaProducerFactory.setTransactionIdPrefix("transaction-event-id");
return defaultKafkaProducerFactory;
}
@Bean
public KafkaTemplate kafkaTemplate() {
return new KafkaTemplate(producerFactory());
}
@Bean
public KafkaTransactionManager transactionEventProducerKafkaTransactionManager() {
return new KafkaTransactionManager(producerFactory());
}
}
Код: Выделить всё
isolation.level=read_committed
Код: Выделить всё
@Transactional
public void sendMessageToTopics(Map duplicateCampaignId,
List campaignSMSList, SmsType smsType) {
if (smsType == SmsType.BULK) {
campaignSMSList.forEach(campaignSMS -> {
sendEvent(smsBulkTopicName, campaignSMS);
});
}
if (smsType == SmsType.PROFILE) {
campaignSMSList.forEach(campaignSMS -> sendEvent(smsProfileTopicName, campaignSMS));
}
duplicateCampaignId.forEach((campaignId, errorMessage) -> customerCampaignRepository.
updateCampaignSmsStatusById(campaignId, CustomerCampaignStatus.ERROR, errorMessage));
customerCampaignRepository.batchUpdateCampaignSms(campaignSMSList);
}
private void sendEvent(String topicName, CampaignSMS campaignSMS) {
try {
kafkaTemplate.send(topicName, campaignSMS).get(3L, TimeUnit.SECONDS);
} catch (InterruptedException e) {
log.error(e.getMessage());
Thread.currentThread().interrupt();
throw new InternalException(e.getMessage());
} catch (ExecutionException | TimeoutException e) {
log.error(e.getMessage());
throw new InternalException(e.getMessage());
}
}
Код: Выделить всё
ERROR o.s.k.c.DefaultKafkaProducerFactory - commitTransaction failed: CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@39ec10a3]
org.apache.kafka.common.errors.ProducerFencedException: There is a newer producer with the same transactionalId which fences the current one. ERROR o.s.k.c.DefaultKafkaProducerFactory - commitTransaction failed: CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@39ec10a3]
org.apache.kafka.common.errors.ProducerFencedException: Producer with transactionalId 'transaction-event-id0' and ProducerIdAndEpoch(producerId=120332, epoch=0) has been fenced by another producer with the same transactionalIdTransactionSynchronization.afterCompletion threw exception
org.apache.kafka.common.errors.ProducerFencedException: Producer with transactionalId 'transaction-event-id0' and ProducerIdAndEpoch(producerId=120332, epoch=0) has been fenced by another producer with the same transactionalId
org.apache.kafka.common.errors.ProducerFencedException: There is a newer producer with the same transactionalId which fences the current one.
Вот мои вопросы:
- Как я могу гарантировать, что каждый экземпляр производителя имеет уникальный
transactionalId? - Каковы наилучшие методы управления транзакциями Kafka при
синхронизации с базой данных? - Есть ли способ корректно обработать ProducerFencedException в таком
сценарии?
Подробнее здесь: https://stackoverflow.com/questions/791 ... a-and-data