Flink Flatmap "не может перестать элемент следующему оператору"JAVA

Программисты JAVA общаются здесь
Ответить Пред. темаСлед. тема
Anonymous
 Flink Flatmap "не может перестать элемент следующему оператору"

Сообщение Anonymous »

Я работаю над работой Flink Pactor, которая считывает струны json из Кафки, преобразует их в avro geneicRecords , и записывает их в Parquet, используя avroparquetwriters .
Сообщения JSON выглядят так:
{"user_id":914,"message":"User login","timestamp":"2025-06-29"}
my avro schema file log_schema.avsc содержит:
{
"type": "record",
"name": "UserLog",
"namespace": "com.example",
"fields": [
{ "name": "user_id", "type": "int" },
{ "name": "message", "type": "string" },
{ "name": "timestamp", "type": "string" }
]
}

В коде я использую flatmapfunction для анализа JSON и Emit GenericRecord Объекты:
.flatMap(new FlatMapFunction() {
private final ObjectMapper mapper = new ObjectMapper();

@Override
public void flatMap(String json, Collector out) {
try {
JsonNode node = mapper.readTree(json);
GenericRecord record = new GenericData.Record(schema);
record.put("user_id", node.get("user_id").asInt());
record.put("message", node.get("message").asText());
record.put("timestamp", node.get("timestamp").asText());
out.collect(record);
} catch (Exception e) {
System.err.println("Invalid JSON: " + json + " → " + e.getMessage());
}
}
})
.returns(TypeInformation.of(GenericRecord.class));
< /code>
Тогда я пишу проанализированный поток на панкус -панкете с паркетом: < /p>
FileSink sink = FileSink
.forBulkFormat(new Path("file:///output/"), AvroParquetWriters.forGenericRecord(schema))
.build();

parsedStream.sinkTo(sink);
< /code>
Хотя строка JSON действительна и соответствует схеме AVRO, я получаю следующую ошибку: < /p>
Invalid JSON: {"user_id":914,"message":"User login","timestamp":"2025-06-29"} → Could not forward element to next operator


Подробнее здесь: https://stackoverflow.com/questions/796 ... t-operator
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «JAVA»