Как реализовать источник BOUNDED для режима пакетного выполнения Flink?JAVA

Программисты JAVA общаются здесь
Ответить
Anonymous
 Как реализовать источник BOUNDED для режима пакетного выполнения Flink?

Сообщение Anonymous »

Я пытаюсь выполнить пакетное задание Flink (1.12.1), выполнив следующие действия:
  • Пользовательская SourceFunction для подключения к MongoDB li>
    Создавайте любые плоские карты и карты для преобразования некоторых данных
  • Поместите их в другую базу данных MongoDB
Я пытаюсь запустить его в StreamExecutionEnvironment с RuntimeExecutionMode.BATCH, но приложение выдает исключение, поскольку определяет мой источник как НЕОГРАНИЧЕННЫЙ... И я не могу установить его ОГРАНИЧЕННЫЙ (оно должно завершиться после сбора всех документов в коллекции mongo)
Исключение:

Код: Выделить всё

    exception in thread "main" java.lang.IllegalStateException: Detected an UNBOUNDED source with the 'execution.runtime-mode' set to 'BATCH'. This combination is not allowed, please set the 'execution.runtime-mode' to STREAMING or AUTOMATIC
at org.apache.flink.util.Preconditions.checkState(Preconditions.java:193)
at org.apache.flink.streaming.api.graph.StreamGraphGenerator.shouldExecuteInBatchMode(StreamGraphGenerator.java:335)
at org.apache.flink.streaming.api.graph.StreamGraphGenerator.generate(StreamGraphGenerator.java:258)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1958)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1943)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1782)
at com.grupotsk.bigdata.matadatapmexporter.MetadataPMExporter.main(MetadataPMExporter.java:33)
Некоторые коды:
Среда выполнения

Код: Выделить всё

public static StreamExecutionEnvironment getBatch() {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);

env.addSource(new MongoSource()).print();

return env;

}
Источник Mongo:

Код: Выделить всё

public class MongoSource extends RichSourceFunction {

private static final long serialVersionUID = 8321722349907219802L;
private MongoClient mongoClient;
private MongoCollection mc;

@Override
public void open(Configuration con) {
mongoClient = new MongoClient(
new MongoClientURI("mongodb://localhost:27017/database"));

mc=mongoClient.getDatabase("database").getCollection("collection");

}

@Override
public void run(SourceContext ctx) throws Exception {

MongoCursor itr=mc.find(Document.class).cursor();
while(itr.hasNext())
ctx.collect(itr.next());
this.cancel();

}

@Override
public void cancel() {
mongoClient.close();

}
Спасибо!

Подробнее здесь: https://stackoverflow.com/questions/702 ... ution-mode
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «JAVA»