Я работаю над работой Flink Pactor, которая считывает струны json из Кафки, преобразует их в avro geneicRecords , и записывает их в Parquet, используя avroparquetwriters .
Сообщения JSON выглядят так:
{"user_id":914,"message":"User login","timestamp":"2025-06-29"}
my avro schema file log_schema.avsc содержит:
{
"type": "record",
"name": "UserLog",
"namespace": "com.example",
"fields": [
{ "name": "user_id", "type": "int" },
{ "name": "message", "type": "string" },
{ "name": "timestamp", "type": "string" }
]
}
В коде я использую flatmapfunction для анализа JSON и Emit GenericRecord Объекты:
.flatMap(new FlatMapFunction() {
private final ObjectMapper mapper = new ObjectMapper();
@Override
public void flatMap(String json, Collector out) {
try {
JsonNode node = mapper.readTree(json);
GenericRecord record = new GenericData.Record(schema);
record.put("user_id", node.get("user_id").asInt());
record.put("message", node.get("message").asText());
record.put("timestamp", node.get("timestamp").asText());
out.collect(record);
} catch (Exception e) {
System.err.println("Invalid JSON: " + json + " → " + e.getMessage());
}
}
})
.returns(TypeInformation.of(GenericRecord.class));
< /code>
Тогда я пишу проанализированный поток на панкус -панкете с паркетом: < /p>
FileSink sink = FileSink
.forBulkFormat(new Path("file:///output/"), AvroParquetWriters.forGenericRecord(schema))
.build();
parsedStream.sinkTo(sink);
< /code>
Хотя строка JSON действительна и соответствует схеме AVRO, я получаю следующую ошибку: < /p>
Invalid JSON: {"user_id":914,"message":"User login","timestamp":"2025-06-29"} → Could not forward element to next operator
Подробнее здесь: https://stackoverflow.com/questions/796 ... t-operator
Flink Flatmap "не может перестать элемент следующему оператору" ⇐ JAVA
-
- Похожие темы
- Ответы
- Просмотры
- Последнее сообщение