Начните использовать сообщения Kafka после того, как Envoy Sidecar будет готов.JAVA

Программисты JAVA общаются здесь
Ответить Пред. темаСлед. тема
Anonymous
 Начните использовать сообщения Kafka после того, как Envoy Sidecar будет готов.

Сообщение Anonymous »

У меня работает приложение Prod в Kubernetes, и во время развертывания я получаю кучу ошибок.

Код: Выделить всё

15:12:03.495 [xx-0-C-1] ERROR o.s.k.l.KafkaMessageListenerContainer - Consumer exception
java.lang.IllegalStateException: This error handler cannot process 'SerializationException's directly; please consider configuring an 'ErrorHandlingDeserializer' in the value and/or key deserializer
at org.springframework.kafka.listener.DefaultErrorHandler.handleOtherException(DefaultErrorHandler.java:192)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1925)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1348)
at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804)
at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: org.apache.kafka.common.errors.RecordDeserializationException: Error deserializing key/value for partition xxxxxxxx-15 at offset xxxxx. If needed, please seek past the record to continue consumption.
at org.apache.kafka.clients.consumer.internals.CompletedFetch.parseRecord(CompletedFetch.java:331)
at org.apache.kafka.clients.consumer.internals.CompletedFetch.fetchRecords(CompletedFetch.java:283)
at org.apache.kafka.clients.consumer.internals.FetchCollector.fetchRecords(FetchCollector.java:168)
at org.apache.kafka.clients.consumer.internals.FetchCollector.collectFetch(FetchCollector.java:134)
at org.apache.kafka.clients.consumer.internals.Fetcher.collectFetch(Fetcher.java:145)
at org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.pollForFetches(LegacyKafkaConsumer.java:666)
at org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.poll(LegacyKafkaConsumer.java:617)
at org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.poll(LegacyKafkaConsumer.java:590)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:874)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollConsumer(KafkaMessageListenerContainer.java:1625)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1600)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1405)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1296)
... 2 common frames omitted
Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro value schema for id 19222
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer$DeserializationContext.schemaFromRegistry(AbstractKafkaAvroDeserializer.java:345)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:152)
at io.confluent.kafka.serializers.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:53)
at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:62)
at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:73)
at org.apache.kafka.clients.consumer.internals.CompletedFetch.parseRecord(CompletedFetch.java:321)
...  14 common frames omitted
Caused by: java.net.ConnectException: Connection refused
at java.base/sun.nio.ch.Net.pollConnect(Native Method)
at java.base/sun.nio.ch.Net.pollConnectNow(Net.java:672)
at java.base/sun.nio.ch.NioSocketImpl.timedFinishConnect(NioSocketImpl.java:547)
at java.base/sun.nio.ch.NioSocketImpl.connect(NioSocketImpl.java:602)
at java.base/java.net.Socket.connect(Socket.java:639)
at java.base/sun.net.NetworkClient.doConnect(NetworkClient.java:178)
at java.base/sun.net.www.http.HttpClient.openServer(HttpClient.java:533)
at java.base/sun.net.www.http.HttpClient.openServer(HttpClient.java:638)
at java.base/sun.net.www.http.HttpClient.(HttpClient.java:281)
at java.base/sun.net.www.http.HttpClient.New(HttpClient.java:386)
at java.base/sun.net.www.http.HttpClient.New(HttpClient.java:408)
at java.base/sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:1312)
at java.base/sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1245)
at java.base/sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:1131)
at java.base/sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:1060)
at java.base/sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1690)
at java.base/sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1614)
at java.base/java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:529)
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:281)
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:371)
at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:840)
at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:813)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:294)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaBySubjectAndId(CachedSchemaRegistryClient.java:417)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer$DeserializationContext.schemaFromRegistry(AbstractKafkaAvroDeserializer.java:342)
... 19 common frames omitted
Я проверил журналы вспомогательной машины посланника, потребители Kafka начинают использовать сообщения еще до того, как вспомогательная машина посланника будет готова, и, поскольку потребитель зависит от реестра схемы kafka, kafka не может десериализовать полезную нагрузку в нужную схему.< /p>
Можно ли как-нибудь отложить получение сообщений до тех пор, пока посланник не будет готов? Здесь мы не можем использовать проверку готовности из Kubernetes, поскольку потребители Kafka начинают получать сообщения раньше.
Может ли кто-нибудь поделиться своим опытом, что можно сделать в этой ситуации, чтобы эффективно решить проблему.

Подробнее здесь: https://stackoverflow.com/questions/786 ... omes-ready
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «JAVA»