Задание Apache Flink, пытающееся прочитать из Mongo через исходный соединитель cdc, приводит к исключению MongoTimeoutExJAVA

Программисты JAVA общаются здесь
Ответить Пред. темаСлед. тема
Anonymous
 Задание Apache Flink, пытающееся прочитать из Mongo через исходный соединитель cdc, приводит к исключению MongoTimeoutEx

Сообщение Anonymous »

Я пытаюсь использовать соединитель Mongo CDC в качестве источника для моего источника DataStream в моем задании Flink. Я использую тот же пример кода, что и [согласно документации][1].
Это мой код:
MongoDBSource mongoSource =
MongoDBSource.builder()
.hosts("cluster0-shard-...:
,cluster0-shard-...:,cluster0-shard-...:")
.username("myUsername")
.password("myPassword")
.databaseList("exercises")
.collectionList("exercises.movies")
.deserializer(new JsonDebeziumDeserializationSchema())
.build();

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.enableCheckpointing(3000);

env.fromSource(mongoSource, WatermarkStrategy.noWatermarks(), "MongoDBIncrementalSource")
.setParallelism(1)
.print();

env.execute("mongo-cdc");

Поскольку я использую набор реплик, я помещаю URI подключения для всех трех узлов с портами, разделенными запятой. Я попытался использовать обычный URI соединения из Compass, но возникла другая ошибка из-за mongodb+srv в начале, и мне пришлось изменить URI.
У меня работает локальный кластер Flink, и я отправляю данные через терминал:
./bin/flink run -c org.example.MongoStream D:/Java-Projects/SimpleFlinkJob/target/flink-1.0.jar
Проходит около 30 секунд, прежде чем задание появится в пользовательском интерфейсе Flink на порту 8081, и оно постоянно переключает свой статус между ВЫПОЛНЕНИЕ и ПЕРЕЗАПУСК.
При выполнении задания возникает следующая ошибка:
Caused by: com.mongodb.MongoTimeoutException: Timed out after 30000 ms while waiting to connect. Client view of cluster state is {type=UNKNOWN, servers=[{address=cluster0-shard-...:
, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketReadException: Prematurely reached end of stream}}, {address=cluster0-shard-00-...:, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketReadException: Prematurely reached end of stream}}, {address=cluster0-shard-...:, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketReadException: Prematurely reached end of stream}}]
Я уверен, что проблема не в Mongo, потому что я без проблем подключаюсь через Compass, а также пробовал другой пример для [приемник/источник Mongo без cdc] [2] и все работает, даже при использовании стандартного URI mongodb+srv.
Я использую Flink 1.20.0, и это мои зависимости:

org.apache.flink
flink-java
1.20.0



org.apache.flink
flink-streaming-java
1.20.0



org.apache.flink
flink-connector-mongodb
1.2.0-1.19



org.apache.flink
flink-connector-mongodb-cdc
3.2.1

```

What could I be missing?

[1]: https://nightlies.apache.org/flink/flin ... eam-source
[2]: https://nightlies.apache.org/flink/flin ... m/mongodb/


Подробнее здесь: https://stackoverflow.com/questions/793 ... sults-in-m
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «JAVA»