Я пытаюсь использовать соединитель Mongo CDC в качестве источника для моего источника DataStream в моем задании Flink. Я использую тот же пример кода, что и [согласно документации][1].
Это мой код:
MongoDBSource mongoSource =
MongoDBSource.builder()
.hosts("cluster0-shard-...:
,cluster0-shard-...:,cluster0-shard-...:")
.username("myUsername")
.password("myPassword")
.databaseList("exercises")
.collectionList("exercises.movies")
.deserializer(new JsonDebeziumDeserializationSchema())
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(3000);
env.fromSource(mongoSource, WatermarkStrategy.noWatermarks(), "MongoDBIncrementalSource")
.setParallelism(1)
.print();
env.execute("mongo-cdc");
Поскольку я использую набор реплик, я помещаю URI подключения для всех трех узлов с портами, разделенными запятой. Я попытался использовать обычный URI соединения из Compass, но возникла другая ошибка из-за mongodb+srv в начале, и мне пришлось изменить URI.
У меня работает локальный кластер Flink, и я отправляю данные через терминал:
./bin/flink run -c org.example.MongoStream D:/Java-Projects/SimpleFlinkJob/target/flink-1.0.jar
Проходит около 30 секунд, прежде чем задание появится в пользовательском интерфейсе Flink на порту 8081, и оно постоянно переключает свой статус между ВЫПОЛНЕНИЕ и ПЕРЕЗАПУСК.
При выполнении задания возникает следующая ошибка:
Caused by: com.mongodb.MongoTimeoutException: Timed out after 30000 ms while waiting to connect. Client view of cluster state is {type=UNKNOWN, servers=[{address=cluster0-shard-...:
, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketReadException: Prematurely reached end of stream}}, {address=cluster0-shard-00-...:, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketReadException: Prematurely reached end of stream}}, {address=cluster0-shard-...:, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketReadException: Prematurely reached end of stream}}]
Я уверен, что проблема не в Mongo, потому что я без проблем подключаюсь через Compass, а также пробовал другой пример для [приемник/источник Mongo без cdc] [2] и все работает, даже при использовании стандартного URI mongodb+srv.
Я использую Flink 1.20.0, и это мои зависимости:
org.apache.flink
flink-java
1.20.0
org.apache.flink
flink-streaming-java
1.20.0
org.apache.flink
flink-connector-mongodb
1.2.0-1.19
org.apache.flink
flink-connector-mongodb-cdc
3.2.1
```
What could I be missing?
[1]: https://nightlies.apache.org/flink/flin ... eam-source
[2]: https://nightlies.apache.org/flink/flin ... m/mongodb/
Подробнее здесь: https://stackoverflow.com/questions/793 ... sults-in-m
Задание Apache Flink, пытающееся прочитать из Mongo через исходный соединитель cdc, приводит к исключению MongoTimeoutEx ⇐ JAVA
Программисты JAVA общаются здесь
1736866694
Anonymous
Я пытаюсь использовать соединитель Mongo CDC в качестве источника для моего источника DataStream в моем задании Flink. Я использую тот же пример кода, что и [согласно документации][1].
Это мой код:
MongoDBSource mongoSource =
MongoDBSource.builder()
.hosts("cluster0-shard-...:
,cluster0-shard-...:,cluster0-shard-...:")
.username("myUsername")
.password("myPassword")
.databaseList("exercises")
.collectionList("exercises.movies")
.deserializer(new JsonDebeziumDeserializationSchema())
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(3000);
env.fromSource(mongoSource, WatermarkStrategy.noWatermarks(), "MongoDBIncrementalSource")
.setParallelism(1)
.print();
env.execute("mongo-cdc");
Поскольку я использую набор реплик, я помещаю URI подключения для всех трех узлов с портами, разделенными запятой. Я попытался использовать обычный URI соединения из Compass, но возникла другая ошибка из-за mongodb+srv в начале, и мне пришлось изменить URI.
У меня работает локальный кластер Flink, и я отправляю данные через терминал:
./bin/flink run -c org.example.MongoStream D:/Java-Projects/SimpleFlinkJob/target/flink-1.0.jar
Проходит около 30 секунд, прежде чем задание появится в пользовательском интерфейсе Flink на порту 8081, и оно постоянно переключает свой статус между ВЫПОЛНЕНИЕ и ПЕРЕЗАПУСК.
При выполнении задания возникает следующая ошибка:
Caused by: com.mongodb.MongoTimeoutException: Timed out after 30000 ms while waiting to connect. Client view of cluster state is {type=UNKNOWN, servers=[{address=cluster0-shard-...:
, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketReadException: Prematurely reached end of stream}}, {address=cluster0-shard-00-...:, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketReadException: Prematurely reached end of stream}}, {address=cluster0-shard-...:, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketReadException: Prematurely reached end of stream}}]
Я уверен, что проблема не в Mongo, потому что я без проблем подключаюсь через Compass, а также пробовал другой пример для [приемник/источник Mongo без cdc] [2] и все работает, даже при использовании стандартного URI mongodb+srv.
Я использую Flink 1.20.0, и это мои зависимости:
org.apache.flink
flink-java
1.20.0
org.apache.flink
flink-streaming-java
1.20.0
org.apache.flink
flink-connector-mongodb
1.2.0-1.19
org.apache.flink
flink-connector-mongodb-cdc
3.2.1
```
What could I be missing?
[1]: https://nightlies.apache.org/flink/flink-cdc-docs-master/docs/connectors/flink-sources/mongodb-cdc/#datastream-source
[2]: https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/mongodb/
Подробнее здесь: [url]https://stackoverflow.com/questions/79355426/apache-flink-job-trying-to-read-from-mongo-via-cdc-source-connector-results-in-m[/url]
Ответить
1 сообщение
• Страница 1 из 1
Перейти
- Кемерово-IT
- ↳ Javascript
- ↳ C#
- ↳ JAVA
- ↳ Elasticsearch aggregation
- ↳ Python
- ↳ Php
- ↳ Android
- ↳ Html
- ↳ Jquery
- ↳ C++
- ↳ IOS
- ↳ CSS
- ↳ Excel
- ↳ Linux
- ↳ Apache
- ↳ MySql
- Детский мир
- Для души
- ↳ Музыкальные инструменты даром
- ↳ Печатная продукция даром
- Внешняя красота и здоровье
- ↳ Одежда и обувь для взрослых даром
- ↳ Товары для здоровья
- ↳ Физкультура и спорт
- Техника - даром!
- ↳ Автомобилистам
- ↳ Компьютерная техника
- ↳ Плиты: газовые и электрические
- ↳ Холодильники
- ↳ Стиральные машины
- ↳ Телевизоры
- ↳ Телефоны, смартфоны, плашеты
- ↳ Швейные машинки
- ↳ Прочая электроника и техника
- ↳ Фототехника
- Ремонт и интерьер
- ↳ Стройматериалы, инструмент
- ↳ Мебель и предметы интерьера даром
- ↳ Cантехника
- Другие темы
- ↳ Разное даром
- ↳ Давай меняться!
- ↳ Отдам\возьму за копеечку
- ↳ Работа и подработка в Кемерове
- ↳ Давай с тобой поговорим...
Мобильная версия