Flink Как DataStream объединяет пользовательский POJO в другой DataStreamJAVA

Программисты JAVA общаются здесь
Ответить
Anonymous
 Flink Как DataStream объединяет пользовательский POJO в другой DataStream

Сообщение Anonymous »

Я хочу преобразовать DataStream в DataStream с информацией о схеме

ввод

args[0] DataStream< /p>

Код: Выделить всё

{"fields":["China","Beijing"]}
схема args[1]

Код: Выделить всё

message spark_schema {
optional binary country (UTF8);
optional binary city (UTF8);
}
ожидайте результата

Код: Выделить всё

{"country":"china", "city":"beijing"}
мой код такой

Код: Выделить всё

public DataStream convert(DataStream source, MessageType messageType) {

SingleOutputStreamOperator dataWithSchema = source.map((MapFunction) row -> {
JSONObject data = new JSONObject();
this.fields = messageType.getFields().stream().map(Type::getName).collect(Collectors.toList());
for (int i = 0; i < fields.size(); i++) {
data.put(fields.get(i), row.getField(i));
}
return data.toJSONString();
});
return dataWithSchema;
}
Ошибки-исключения

Код: Выделить всё

Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: Object com.xxxx.ParquetDataSourceReader$$Lambda$64/1174881426@d78795 is not serializable
at org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:180)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1823)
at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:188)
at org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:590)
Но код ниже работает нормально

Код: Выделить всё

public DataStream convert(DataStream source, MessageType messageType) {
if (this.fields == null) {
throw new RuntimeException("The schema of AbstractRowStreamReader is null");
}

List field = messageType.getFields().stream().map(Type::getName).collect(Collectors.toList());
SingleOutputStreamOperator dataWithSchema = source.map((MapFunction) row -> {
JSONObject data = new JSONObject();
for (int i = 0; i < field.size(); i++) {
data.put(field.get(i), row.getField(i));
}
return data.toJSONString();
});
return dataWithSchema;
}
Оператор карты Flink, как объединить внешний сложный POJO?


Подробнее здесь: https://stackoverflow.com/questions/605 ... datastream
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «JAVA»