KafkaAvroSerializer UnresolvedUnionException с полем timestamp-millis, допускающим значение NULLJAVA

Программисты JAVA общаются здесь
Ответить
Anonymous
 KafkaAvroSerializer UnresolvedUnionException с полем timestamp-millis, допускающим значение NULL

Сообщение Anonymous »

Я использую KafkaAvroSerializer от confluent со схемой, которая включает поле timestamp-millis, допускающее значение NULL. Поле отображается в Instant в Java, но когда я отправляю ненулевое значение, я получаю исключение UnresolvedUnionException. Почему Instant не разрешается должным образом. Вот соответствующая часть моей схемы Avro:

Код: Выделить всё

       {
"name": "source_created_ts",
"doc": "The created TS of the record.",
"type": {
"type": "long",
"logicalType": "timestamp-millis"
}
},
{
"name": "source_deleted_ts",
"doc": "The deleted TS of the record.",
"type": [
"null",
{
"type": "long",
"logicalType": "timestamp-millis"
}
]
},
Я использую плагин avro-maven-plugin для создания классов Java, и оба поля сопоставляются с java.time.Instant.
Вот пример того, как я создаю записи

Код: Выделить всё

Properties props = new Properties();
props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroSerializer");
props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroSerializer");
props.setProperty("specific.avro.reader", "true");
props.setProperty(KafkaAvroSerializerConfig.AUTO_REGISTER_SCHEMAS, "false");
props.setProperty(KafkaAvroSerializerConfig.USE_LATEST_VERSION, "true");
props.setProperty("schema.registry.rules.enabled", "true");
props.setProperty(KafkaAvroSerializerConfig.RULE_EXECUTORS, "CEL");
KafkaProducer producer = new KafkaProducer(props);

TestValue value = new TestValue();
value.setSourceCreatedTs(Instant.parse("2023-05-15T14:10:00Z"));
value.setSourceDeletedTs(null);
producer.send(new ProducerRecord("test-topic", key, value)).get();

value = new TestValue();
value.setSourceCreatedTs(Instant.parse("2023-05-15T14:10:00Z"));
value.setSourceDeletedTs(Instant.parse("2025-10-08T09:46:00Z"));
producer.send(new ProducerRecord("test-topic", key, value)).get();
Первая запись работает правильно, но когда я отправляю вторую запись с ненулевой меткой времени, кажется, что она отправляется не так долго, и я получаю это не в ошибке объединения:

Код: Выделить всё

Caused by: org.apache.avro.UnresolvedUnionException: Not in union ["null",{"type":"long","logicalType":"timestamp-millis"}]: 2025-10-08T09:46:00Z
at org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:910)
at io.confluent.kafka.schemaregistry.avro.AvroSchema.toTransformedMessage(AvroSchema.java:555)
at io.confluent.kafka.schemaregistry.avro.AvroSchema.toTransformedMessage(AvroSchema.java:589)
at io.confluent.kafka.schemaregistry.avro.AvroSchema.transformMessage(AvroSchema.java:531)
at io.confluent.kafka.schemaregistry.rules.FieldRuleExecutor.transform(FieldRuleExecutor.java:102)
at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.executeRules(AbstractKafkaSchemaSerDe.java:708)
at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.executeRules(AbstractKafkaSchemaSerDe.java:654)
at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:144)
...  5 more
Вот зависимости kafka/confluent:

Код: Выделить всё

      
org.apache.kafka
kafka-clients
7.6.2-ccs



io.confluent
kafka-avro-serializer
7.6.2



io.confluent
kafka-schema-rules
7.6.2



org.apache.avro
avro
1.11.3

Я видел подобные вопросы, используя Apache Spark, но, поскольку мы не используем, решения не сработали в моей ситуации. Я ожидаю, что сериализатор будет правильно обрабатывать Instant, и это происходит для поля, не допускающего значения NULL, но когда у меня есть поле объединения NULL, он не распознает тип, который он должен отправить. Почему Instant некорректно разрешается в объединении?


Подробнее здесь: https://stackoverflow.com/questions/798 ... illis-fiel
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «JAVA»