Kafka ProducerFencedException при использовании транзакций с Spring Kafka и синхронизации базы данныхJAVA

Программисты JAVA общаются здесь
Ответить Пред. темаСлед. тема
Anonymous
 Kafka ProducerFencedException при использовании транзакций с Spring Kafka и синхронизации базы данных

Сообщение Anonymous »

Я столкнулся с ProducerFencedException при попытке синхронизировать события между Kafka и базой данных PostgreSQL с помощью Spring Kafka. События отправляются в Kafka, и база данных успешно обновляется, и я могу видеть события в теме Kafka, когда использую уровень изоляции read_uncommited. Однако потребитель, похоже, не потребляет сообщения с уровнем изоляции read_commited.

Конфигурация производителя:

Код: Выделить всё

@Configuration
public class KafkaProducerConfig {
@Value("${bootstrap.servers}")
private String boostrapServers;

public Map producerConfig() {
Map producerConfig = new HashMap();
producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, boostrapServers);
producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
producerConfig.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
producerConfig.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transaction-event-id");
return producerConfig;
}

@Bean
public  ProducerFactory producerFactory() {
DefaultKafkaProducerFactory defaultKafkaProducerFactory =
new DefaultKafkaProducerFactory(producerConfig());
defaultKafkaProducerFactory.setTransactionIdPrefix("transaction-event-id");
return defaultKafkaProducerFactory;
}

@Bean
public  KafkaTemplate kafkaTemplate() {
return new KafkaTemplate(producerFactory());
}

@Bean
public KafkaTransactionManager transactionEventProducerKafkaTransactionManager() {
return new KafkaTransactionManager(producerFactory());
}
}
Конфигурация потребителя:

Код: Выделить всё

isolation.level=read_committed
Логика событий производителя и обновления базы данных:

Код: Выделить всё

@Transactional
public void sendMessageToTopics(Map duplicateCampaignId,
List campaignSMSList, SmsType smsType) {

if (smsType == SmsType.BULK) {
campaignSMSList.forEach(campaignSMS -> {
sendEvent(smsBulkTopicName, campaignSMS);
});
}
if (smsType == SmsType.PROFILE) {
campaignSMSList.forEach(campaignSMS -> sendEvent(smsProfileTopicName, campaignSMS));
}

duplicateCampaignId.forEach((campaignId, errorMessage) -> customerCampaignRepository.
updateCampaignSmsStatusById(campaignId, CustomerCampaignStatus.ERROR, errorMessage));
customerCampaignRepository.batchUpdateCampaignSms(campaignSMSList);
}

private void sendEvent(String topicName, CampaignSMS campaignSMS) {
try {
kafkaTemplate.send(topicName, campaignSMS).get(3L, TimeUnit.SECONDS);
} catch (InterruptedException e) {
log.error(e.getMessage());
Thread.currentThread().interrupt();
throw new InternalException(e.getMessage());
} catch (ExecutionException | TimeoutException e) {
log.error(e.getMessage());
throw new InternalException(e.getMessage());
}
}
В журналах появляется следующая ошибка:

Код: Выделить всё

ERROR o.s.k.c.DefaultKafkaProducerFactory - commitTransaction failed: CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@39ec10a3]
org.apache.kafka.common.errors.ProducerFencedException: There is a newer producer with the same transactionalId which fences the current one.  ERROR o.s.k.c.DefaultKafkaProducerFactory - commitTransaction failed: CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@39ec10a3]
org.apache.kafka.common.errors.ProducerFencedException: Producer with transactionalId 'transaction-event-id0' and ProducerIdAndEpoch(producerId=120332, epoch=0) has been fenced by another producer with the same transactionalIdTransactionSynchronization.afterCompletion threw exception
org.apache.kafka.common.errors.ProducerFencedException: Producer with transactionalId 'transaction-event-id0' and ProducerIdAndEpoch(producerId=120332, epoch=0) has been fenced by another producer with the same transactionalId
org.apache.kafka.common.errors.ProducerFencedException: There is a newer producer with the same transactionalId which fences the current one.
Эта ошибка возникает при попытке зафиксировать транзакцию в моей запланированной задаче. Похоже, что несколько производителей используют один и тот же идентификатор транзакции, что приводит к этой проблеме с ограждением.Я также пробовал динамически генерировать файл транзакции, используя UUID.randomUUID() для каждого производителя, но этот подход тоже не сработал, и появляется та же ошибка.(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transaction-event-id"+UUID.randomUUID() и defaultKafkaProducerFactory.setTransactionIdPrefix("transaction-event-id"+UUID.randomUUID());)
Вот мои вопросы:
  • Как я могу гарантировать, что каждый экземпляр производителя имеет уникальный
    transactionalId?
  • Каковы наилучшие методы управления транзакциями Kafka при
    синхронизации с базой данных?
  • Есть ли способ корректно обработать ProducerFencedException в таком
    сценарии?
Будем очень признательны за любую помощь! Спасибо!

Подробнее здесь: https://stackoverflow.com/questions/791 ... a-and-data
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «JAVA»