Проблема с десериализацией сообщений protobuf kafka в Apache SeatunnelJAVA

Программисты JAVA общаются здесь
Ответить
Anonymous
 Проблема с десериализацией сообщений protobuf kafka в Apache Seatunnel

Сообщение Anonymous »

Я выполняю задание Apache Seatunnel, источником которого являются сообщения Kafka Protobuf. Теперь я застрял с десериализацией поля временной метки.
В protobuf_schema ниже поле join_at имеет тип Timestamp, что, в свою очередь, само по себе является сообщением:

Код: Выделить всё

    protobuf_message_name = User

protobuf_schema = """
syntax = "proto3";

message Timestamp {
int64 seconds = 1;
int32 nanos = 2;
}

enum Country {
BY = 0;
RU = 1;
GE = 2;
KZ = 3;
CN = 4;
BR = 5;
}

message User {
string user_id = 1;
string nickname = 2;
Country country = 3;
Timestamp joined_at = 4;
bool is_active = 5;
}
"""
Моя схема соответствия:

Код: Выделить всё

    schema = {
fields {
user_id = string
nickname = string
country = string
joined_at = {
seconds = long
nanos = int
}
is_active = boolean
}
}
Однако это неправильно, потому что я получаю следующую ошибку:

Код: Выделить всё

2025-11-15 15:24:04,006 ERROR [o.a.s.c.s.SeaTunnel           ] [main] - Exception StackTrace:org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel job executed failed
at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:226)
at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
at org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient.main(SeaTunnelClient.java:40)
Caused by: org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: java.lang.NullPointerException
at org.apache.seatunnel.format.protobuf.ProtobufToRowConverter.converter(ProtobufToRowConverter.java:76)
at org.apache.seatunnel.format.protobuf.ProtobufToRowConverter.convertField(ProtobufToRowConverter.java:152)
at org.apache.seatunnel.format.protobuf.ProtobufToRowConverter.converter(ProtobufToRowConverter.java:81)
at org.apache.seatunnel.format.protobuf.ProtobufDeserializationSchema.deserialize(ProtobufDeserializationSchema.java:55)
at org.apache.seatunnel.format.protobuf.ProtobufDeserializationSchema.deserialize(ProtobufDeserializationSchema.java:34)
at org.apache.seatunnel.api.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:38)
at org.apache.seatunnel.connectors.seatunnel.kafka.source.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:72)
at org.apache.seatunnel.connectors.seatunnel.kafka.source.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:37)
at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:110)
at org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle.collect(SourceFlowLifeCycle.java:159)
at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.collect(SourceSeaTunnelTask.java:127)
at org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:165)
at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.call(SourceSeaTunnelTask.java:132)
at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:683)
at org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:1012)
at org.apache.seatunnel.api.tracing.MDCRunnable.run(MDCRunnable.java:43)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)

at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:218)
... 2 more
Похоже, Seatunnel не видит вложенные поля секунд и нано.

Будем благодарны за любую помощь.

Подробнее здесь: https://stackoverflow.com/questions/798 ... -seatunnel
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «JAVA»