Код: Выделить всё
15:12:03.495 [xx-0-C-1] ERROR o.s.k.l.KafkaMessageListenerContainer - Consumer exception
java.lang.IllegalStateException: This error handler cannot process 'SerializationException's directly; please consider configuring an 'ErrorHandlingDeserializer' in the value and/or key deserializer
at org.springframework.kafka.listener.DefaultErrorHandler.handleOtherException(DefaultErrorHandler.java:192)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1925)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1348)
at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804)
at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: org.apache.kafka.common.errors.RecordDeserializationException: Error deserializing key/value for partition xxxxxxxx-15 at offset xxxxx. If needed, please seek past the record to continue consumption.
at org.apache.kafka.clients.consumer.internals.CompletedFetch.parseRecord(CompletedFetch.java:331)
at org.apache.kafka.clients.consumer.internals.CompletedFetch.fetchRecords(CompletedFetch.java:283)
at org.apache.kafka.clients.consumer.internals.FetchCollector.fetchRecords(FetchCollector.java:168)
at org.apache.kafka.clients.consumer.internals.FetchCollector.collectFetch(FetchCollector.java:134)
at org.apache.kafka.clients.consumer.internals.Fetcher.collectFetch(Fetcher.java:145)
at org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.pollForFetches(LegacyKafkaConsumer.java:666)
at org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.poll(LegacyKafkaConsumer.java:617)
at org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.poll(LegacyKafkaConsumer.java:590)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:874)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollConsumer(KafkaMessageListenerContainer.java:1625)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1600)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1405)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1296)
... 2 common frames omitted
Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro value schema for id 19222
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer$DeserializationContext.schemaFromRegistry(AbstractKafkaAvroDeserializer.java:345)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:152)
at io.confluent.kafka.serializers.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:53)
at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:62)
at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:73)
at org.apache.kafka.clients.consumer.internals.CompletedFetch.parseRecord(CompletedFetch.java:321)
... 14 common frames omitted
Caused by: java.net.ConnectException: Connection refused
at java.base/sun.nio.ch.Net.pollConnect(Native Method)
at java.base/sun.nio.ch.Net.pollConnectNow(Net.java:672)
at java.base/sun.nio.ch.NioSocketImpl.timedFinishConnect(NioSocketImpl.java:547)
at java.base/sun.nio.ch.NioSocketImpl.connect(NioSocketImpl.java:602)
at java.base/java.net.Socket.connect(Socket.java:639)
at java.base/sun.net.NetworkClient.doConnect(NetworkClient.java:178)
at java.base/sun.net.www.http.HttpClient.openServer(HttpClient.java:533)
at java.base/sun.net.www.http.HttpClient.openServer(HttpClient.java:638)
at java.base/sun.net.www.http.HttpClient.(HttpClient.java:281)
at java.base/sun.net.www.http.HttpClient.New(HttpClient.java:386)
at java.base/sun.net.www.http.HttpClient.New(HttpClient.java:408)
at java.base/sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:1312)
at java.base/sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1245)
at java.base/sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:1131)
at java.base/sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:1060)
at java.base/sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1690)
at java.base/sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1614)
at java.base/java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:529)
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:281)
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:371)
at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:840)
at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:813)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:294)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaBySubjectAndId(CachedSchemaRegistryClient.java:417)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer$DeserializationContext.schemaFromRegistry(AbstractKafkaAvroDeserializer.java:342)
... 19 common frames omitted
Можно ли как-нибудь отложить получение сообщений до тех пор, пока посланник не будет готов? Здесь мы не можем использовать проверку готовности из Kubernetes, поскольку потребители Kafka начинают получать сообщения раньше.
Может ли кто-нибудь поделиться своим опытом, что можно сделать в этой ситуации, чтобы эффективно решить проблему.
Подробнее здесь: https://stackoverflow.com/questions/786 ... omes-ready