Я пытаюсь использовать соединитель Mongo CDC в качестве источника для моего источника DataStream в моем задании Flink. Я использую тот же пример кода, что и [согласно документации][1].
Это мой код:
MongoDBSource mongoSource =
MongoDBSource.builder()
.hosts("cluster0-shard-...:
,cluster0-shard-...:,cluster0-shard-...:")
.username("myUsername")
.password("myPassword")
.databaseList("exercises")
.collectionList("exercises.movies")
.deserializer(new JsonDebeziumDeserializationSchema())
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(3000);
env.fromSource(mongoSource, WatermarkStrategy.noWatermarks(), "MongoDBIncrementalSource")
.setParallelism(1)
.print();
env.execute("mongo-cdc");
Поскольку я использую набор реплик, я помещаю URI подключения для всех трех узлов с портами, разделенными запятой. Я попытался использовать обычный URI соединения из Compass, но возникла другая ошибка из-за mongodb+srv в начале, и мне пришлось изменить URI.
У меня работает локальный кластер Flink, и я отправляю данные через терминал:
./bin/flink run -c org.example.MongoStream D:/Java-Projects/SimpleFlinkJob/target/flink-1.0.jar
Проходит около 30 секунд, прежде чем задание появится в пользовательском интерфейсе Flink на порту 8081, и оно постоянно переключает свой статус между ВЫПОЛНЕНИЕ и ПЕРЕЗАПУСК.
При выполнении задания возникает следующая ошибка:
Caused by: com.mongodb.MongoTimeoutException: Timed out after 30000 ms while waiting to connect. Client view of cluster state is {type=UNKNOWN, servers=[{address=cluster0-shard-...:
, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketReadException: Prematurely reached end of stream}}, {address=cluster0-shard-00-...:, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketReadException: Prematurely reached end of stream}}, {address=cluster0-shard-...:, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketReadException: Prematurely reached end of stream}}]
Я уверен, что проблема не в Mongo, потому что я без проблем подключаюсь через Compass, а также пробовал другой пример для [приемник/источник Mongo без cdc] [2] и все работает, даже при использовании стандартного URI mongodb+srv.
Я использую Flink 1.20.0, и это мои зависимости:
org.apache.flink
flink-java
1.20.0
org.apache.flink
flink-streaming-java
1.20.0
org.apache.flink
flink-connector-mongodb
1.2.0-1.19
org.apache.flink
flink-connector-mongodb-cdc
3.2.1
```
What could I be missing?
[1]: https://nightlies.apache.org/flink/flin ... eam-source
[2]: https://nightlies.apache.org/flink/flin ... m/mongodb/
Подробнее здесь: https://stackoverflow.com/questions/793 ... sults-in-m
Задание Apache Flink, пытающееся прочитать из Mongo через исходный соединитель cdc, приводит к исключению MongoTimeoutEx ⇐ JAVA
-
- Похожие темы
- Ответы
- Просмотры
- Последнее сообщение
-
-
Задача Apache Flink, пытаясь прочитать из Mongo через разъем CDC Source Connector
Anonymous » » в форуме JAVA - 0 Ответы
- 36 Просмотры
-
Последнее сообщение Anonymous
-