Я пытаюсь выполнить пакетное задание Flink (1.12.1), выполнив следующие действия:
Пользовательская SourceFunction для подключения к MongoDB li>
Создавайте любые плоские карты и карты для преобразования некоторых данных
Поместите их в другую базу данных MongoDB
Я пытаюсь запустить его в StreamExecutionEnvironment с RuntimeExecutionMode.BATCH, но приложение выдает исключение, поскольку определяет мой источник как НЕОГРАНИЧЕННЫЙ... И я не могу установить его ОГРАНИЧЕННЫЙ (оно должно завершиться после сбора всех документов в коллекции mongo)
Исключение:
exception in thread "main" java.lang.IllegalStateException: Detected an UNBOUNDED source with the 'execution.runtime-mode' set to 'BATCH'. This combination is not allowed, please set the 'execution.runtime-mode' to STREAMING or AUTOMATIC
at org.apache.flink.util.Preconditions.checkState(Preconditions.java:193)
at org.apache.flink.streaming.api.graph.StreamGraphGenerator.shouldExecuteInBatchMode(StreamGraphGenerator.java:335)
at org.apache.flink.streaming.api.graph.StreamGraphGenerator.generate(StreamGraphGenerator.java:258)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1958)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1943)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1782)
at com.grupotsk.bigdata.matadatapmexporter.MetadataPMExporter.main(MetadataPMExporter.java:33)
Я пытаюсь выполнить пакетное задание Flink (1.12.1), выполнив следующие действия: [list] [*]Пользовательская SourceFunction для подключения к MongoDB li> Создавайте любые плоские карты и карты для преобразования некоторых данных [*]Поместите их в другую базу данных MongoDB [/list] Я пытаюсь запустить его в StreamExecutionEnvironment с RuntimeExecutionMode.BATCH, но приложение выдает исключение, поскольку определяет мой источник как НЕОГРАНИЧЕННЫЙ... И я не могу установить его ОГРАНИЧЕННЫЙ (оно должно завершиться после сбора всех документов в коллекции mongo) Исключение: [code] exception in thread "main" java.lang.IllegalStateException: Detected an UNBOUNDED source with the 'execution.runtime-mode' set to 'BATCH'. This combination is not allowed, please set the 'execution.runtime-mode' to STREAMING or AUTOMATIC at org.apache.flink.util.Preconditions.checkState(Preconditions.java:193) at org.apache.flink.streaming.api.graph.StreamGraphGenerator.shouldExecuteInBatchMode(StreamGraphGenerator.java:335) at org.apache.flink.streaming.api.graph.StreamGraphGenerator.generate(StreamGraphGenerator.java:258) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1958) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1943) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1782) at com.grupotsk.bigdata.matadatapmexporter.MetadataPMExporter.main(MetadataPMExporter.java:33) [/code] Некоторые коды: Среда выполнения [code]public static StreamExecutionEnvironment getBatch() { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.BATCH);
env.addSource(new MongoSource()).print();
return env;
} [/code] Источник Mongo: [code]public class MongoSource extends RichSourceFunction {
private static final long serialVersionUID = 8321722349907219802L; private MongoClient mongoClient; private MongoCollection mc;
@Override public void open(Configuration con) { mongoClient = new MongoClient( new MongoClientURI("mongodb://localhost:27017/database"));