Springboot Kafka Producer и InvalidpidmappingExceptionJAVA

Программисты JAVA общаются здесь
Ответить Пред. темаСлед. тема
Anonymous
 Springboot Kafka Producer и InvalidpidmappingException

Сообщение Anonymous »

Использование Spring Boot 3.4.3 и Spring Kafka 3.3.3 Если производители не используются в течение более недели, я получаю следующие ошибки < /p>
org.springframework.kafka.core.KafkaProducerException: Failed to send
at org.springframework.kafka.core.KafkaTemplate.lambda$buildCallback$9(KafkaTemplate.java:891) ~[spring-kafka-3.3.1.jar!/:3.3.1]
at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer$1.onCompletion(DefaultKafkaProducerFactory.java:1111) ~[spring-kafka-3.3.1.jar!/:3.3.1]
at org.apache.kafka.clients.producer.KafkaProducer$AppendCallbacks.onCompletion(KafkaProducer.java:1568) ~[kafka-clients-7.8.0-ce.jar!/:na]
at org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:312) ~[kafka-clients-7.8.0-ce.jar!/:na]
at org.apache.kafka.clients.producer.internals.ProducerBatch.abort(ProducerBatch.java:200) ~[kafka-clients-7.8.0-ce.jar!/:na]
at org.apache.kafka.clients.producer.internals.RecordAccumulator.abortUndrainedBatches(RecordAccumulator.java:1155) ~[kafka-clients-7.8.0-ce.jar!/:na]
at org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:473) ~[kafka-clients-7.8.0-ce.jar!/:na]
at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:336) ~[kafka-clients-7.8.0-ce.jar!/:na]
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:250) ~[kafka-clients-7.8.0-ce.jar!/:na]
at java.base/java.lang.Thread.run(Thread.java:840) ~[na:na]
at org.apache.kafka.common.utils.KafkaThread.run(KafkaThread.java:66) ~[kafka-clients-7.8.0-ce.jar!/:na]
Caused by: org.apache.kafka.common.errors.InvalidPidMappingException: The producer attempted to use a producer id which is not currently assigned to its transactional id.
< /code>
Код шаблона: < /p>
@Configuration
@Slf4j
public class KafkaMigrationsProducerConfig {
final KafkaProperties kafkaProperties;
private final MeterRegistry meterRegistry;

public KafkaMigrationsProducerConfig(KafkaProperties kafkaProperties, MeterRegistry meterRegistry) {
this.kafkaProperties = kafkaProperties;
this.meterRegistry = meterRegistry;
}

//https://docs.spring.io/spring-kafka/doc ... ansactions
// Starting with version 2.5.8, you can now configure the maxAge property on the producer factory. This is useful when using transactional
// producers that might lay idle for the broker’s transactional.id.expiration.ms. With current kafka-clients, this can cause a ProducerFencedException
// without a re-balance. By setting the maxAge to less than transactional.id.expiration.ms, the factory will refresh the producer if it is past it’s max age.
@Bean
public DefaultKafkaProducerFactoryCustomizer producerFactoryCustomizer() {
return producerFactory -> producerFactory.setMaxAge(Duration.ofDays(1));
}

@Bean
public ProducerFactory migrationsProducerFactory() {
Map configProps = KafkaPropertiesHelper.buildRequiredPropsOnly(kafkaProperties.buildProducerProperties(null));
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaSerializer.class);
configProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "migr-core-tx- " + UUID.randomUUID().toString());
configProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
configProps.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, Collections.singletonList(KafkaProducerMigrationsInterceptor.class));
configProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
configProps.put(ProducerConfig.ACKS_CONFIG, "all");
//retry
configProps.put(ProducerConfig.RETRIES_CONFIG, Integer.toString(Integer.MAX_VALUE));
configProps.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "1000");
//timeouts
configProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, "300000");//5 minutes
configProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "300000");//5 minutes
ProducerFactory result = new DefaultKafkaProducerFactory(configProps);
result.addListener(new MicrometerProducerListener(meterRegistry));

return result;
}

@Bean
public KafkaTransactionManager kafkaMigrationsTransactionManager(final ProducerFactory migrationsFactoryTransactional) {
return new KafkaTransactionManager(migrationsFactoryTransactional);
}

@Bean
public KafkaTemplate kafkaMigrationsTransactionalTemplate(final ProducerFactory migrationsFactoryTransactional) {
return new KafkaTemplate(migrationsFactoryTransactional);
}
}
< /code>
И это код для производителя Kafka < /p>
@Component
@Slf4j
public class MigrationKafkaProducer {
@Autowired
private KafkaTemplate kafkaTemplateTransactional;

@Transactional
public void send(String topic, List payload) throws IAMigrationsException {
CountDownLatch latch = new CountDownLatch(payload.size());

payload.forEach(item -> {
CompletableFuture future = kafkaTemplateTransactional.send(topic, item);
future.whenComplete((result, ex) -> {
if (ex == null) {
assert result != null;
log.info("sent the message correctly with offset=[{}]", result.getRecordMetadata().offset());
} else {
log.error("producer could not send the message {}", ex.getMessage(), ex);
}
latch.countDown();
});
});

try {
if (latch.await(5, TimeUnit.MINUTES)) {
log.info("{} events (including start and stop) sent ok", payload.size());
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("Timeout - data could not be sent to Kafka {}", e.getMessage());
}
}

public void close() {
kafkaTemplateTransactional.getProducerFactory().reset();
}
}
< /code>
Производство Interceptor < /p>
@Override
public void configure(Map configs) {
}

@Override
public ProducerRecord onSend(ProducerRecord record) {
return record;
}

@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
if (Objects.nonNull(exception)) {
log.error("Error on acknowledgement " + exception.getMessage() + " topic: " + metadata.topic() + " offset: " + metadata.offset());
} else {
log.info("Server acknowledge event on topic: " + metadata.topic() + " offset: " + metadata.offset());
}
}

@Override
public void close() {
//do nothing at this moment
}

}
< /code>
Значения производительности: < /p>
acks = -1
auto.include.jmx.reporter = true
batch.size = 16384
bootstrap.servers = [PLAINTEXT://localhost:9092]
buffer.memory = 33554432
client.dns.lookup = use_all_dns_ips
client.id = ia-ds-deploy-test-producer-2
compression.gzip.level = -1
compression.lz4.level = 9
compression.type = none
compression.zstd.level = 3
confluent.lkc.id = null
confluent.proxy.protocol.client.address = null
confluent.proxy.protocol.client.mode = PROXY
confluent.proxy.protocol.client.port = null
confluent.proxy.protocol.client.version = NONE
connections.max.idle.ms = 540000
delivery.timeout.ms = 300000
enable.idempotence = true
enable.metrics.push = true
interceptor.classes = [class KafkaProducerMigrationsInterceptor]
key.serializer = class org.apache.kafka.common.serialization.StringSerializer
linger.ms = 0
max.block.ms = 60000
max.in.flight.requests.per.connection = 1
max.request.size = 1048576
metadata.max.age.ms = 300000
metadata.max.idle.ms = 300000
metadata.recovery.strategy = none
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.adaptive.partitioning.enable = true
partitioner.availability.timeout.ms = 0
partitioner.class = null
partitioner.ignore.keys = false
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 300000
retries = 2147483647
retry.backoff.max.ms = 1000
retry.backoff.ms = 1000
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.connect.timeout.ms = null
sasl.login.read.timeout.ms = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.login.retry.backoff.max.ms = 10000
sasl.login.retry.backoff.ms = 100
sasl.mechanism = GSSAPI
sasl.oauthbearer.clock.skew.seconds = 30
sasl.oauthbearer.expected.audience = null
sasl.oauthbearer.expected.issuer = null
sasl.oauthbearer.header.urlencode = false
sasl.oauthbearer.iat.validation.enabled = false
sasl.oauthbearer.jti.validation.enabled = false
sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000
sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000
sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
sasl.oauthbearer.jwks.endpoint.url = null
sasl.oauthbearer.scope.claim.name = scope
sasl.oauthbearer.sub.claim.name = sub
sasl.oauthbearer.token.endpoint.url = null
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
socket.connection.setup.timeout.max.ms = 30000
socket.connection.setup.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.certificate.chain = null
ssl.keystore.key = null
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.3
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.certificates = null
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.timeout.ms = 60000
transactional.id = migr-core-tx- 67ba5de5-1f21-478c-a223-7072e4ce6de11
value.serializer = class KafkaSerializer
< /code>
Я прочитал, что проблема связана с Transactional.id.expiration.ms настройкой из свойств сервера Kafka. Я пытаюсь воспроизвести ошибку локально, поэтому я запустил локального брокера Kafka и установил Transactional.id.expiration.ms = 5000 (5 секунд). Как я могу создать модульный тест для воспроизведения этой ошибки на моем локальном брокере Kafka? https://medium.com/@micaelaturrin/inval ... 1a6503bf31, похоже, не решает проблему.

Подробнее здесь: https://stackoverflow.com/questions/794 ... gexception
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение
  • Повышение производительности Kafka Producer: использование нескольких шаблонов Kafka вместо одного шаблона с динамическо
    Anonymous » » в форуме JAVA
    0 Ответы
    21 Просмотры
    Последнее сообщение Anonymous
  • Kafka Producer не повторяет попытку в приложении Spring Boot
    Гость » » в форуме JAVA
    0 Ответы
    22 Просмотры
    Последнее сообщение Гость
  • Почему обратный вызов в методе Producer.send не выполняется при невозможности отправки сообщения?
    Гость » » в форуме JAVA
    0 Ответы
    52 Просмотры
    Последнее сообщение Гость
  • Видеопотоки AWS Kinesis — Producer SDK Java — исключение JNI
    Anonymous » » в форуме JAVA
    0 Ответы
    14 Просмотры
    Последнее сообщение Anonymous
  • Определите проверенное сообщение с Press Cafka Producer
    Anonymous » » в форуме JAVA
    0 Ответы
    15 Просмотры
    Последнее сообщение Anonymous

Вернуться в «JAVA»