Задача Apache Flink, пытаясь прочитать из Mongo через разъем CDC Source ConnectorJAVA

Программисты JAVA общаются здесь
Ответить Пред. темаСлед. тема
Anonymous
 Задача Apache Flink, пытаясь прочитать из Mongo через разъем CDC Source Connector

Сообщение Anonymous »

Я пытаюсь использовать Mongo Cdc Connector в качестве источника для моего источника DataStream в моей работе. Я использую тот же пример кода, что и [Per Docs] [1]. < /P>
Это мой код: < /p>
MongoDBSource mongoSource =
MongoDBSource.builder()
.hosts("cluster0-shard-...:
,cluster0-shard-...:,cluster0-shard-...:")
.username("myUsername")
.password("myPassword")
.databaseList("exercises")
.collectionList("exercises.movies")
.deserializer(new JsonDebeziumDeserializationSchema())
.build();

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.enableCheckpointing(3000);

env.fromSource(mongoSource, WatermarkStrategy.noWatermarks(), "MongoDBIncrementalSource")
.setParallelism(1)
.print();

env.execute("mongo-cdc");
< /code>
Поскольку я использую набор реплики, я поместил URIS подключения для всех трех узлов с портами, разделенными запятыми. Я попытался использовать URI обычного соединения из Compass, но вначале была произведена еще одна ошибка из -за MongoDB+SRV , и мне пришлось изменить URI.
У меня работает локальный кластер Flink, и я отправляю через терминал:
./bin/flink run -c org.example.MongoStream D:/Java-Projects/SimpleFlinkJob/target/flink-1.0.jar
Это займет около 30 секунд, прежде чем задание появится в пользовательском интерфейсе Flink на порту 8081, и он постоянно переключает свой статус между запуском и перезапуском . < /p>
Ошибка, которую создает задание, заключается в следующем:
Caused by: com.mongodb.MongoTimeoutException: Timed out after 30000 ms while waiting to connect. Client view of cluster state is {type=UNKNOWN, servers=[{address=cluster0-shard-...:
, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketReadException: Prematurely reached end of stream}}, {address=cluster0-shard-00-...:, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketReadException: Prematurely reached end of stream}}, {address=cluster0-shard-...:, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketReadException: Prematurely reached end of stream}}]
Я уверен, что проблема не в Mongo, потому что я подключаюсь без проблемы через компас, а также попробовал другой пример для [Mongo Sind/Source без CDC] [2] и все работает, даже при использовании стандартного mongodb+srv uri.
Я использую Flink 1.20.0, и это мои зависимости:

org.apache.flink
flink-java
1.20.0



org.apache.flink
flink-streaming-java
1.20.0



org.apache.flink
flink-connector-mongodb
1.2.0-1.19



org.apache.flink
flink-connector-mongodb-cdc
3.2.1

```

What could I be missing?

[1]: https://nightlies.apache.org/flink/flin ... eam-source
[2]: https://nightlies.apache.org/flink/flin ... m/mongodb/


Подробнее здесь: https://stackoverflow.com/questions/793 ... sults-in-m
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «JAVA»