Невозможно выполнить потоковую передачу данных из одной базы данных MySQL в другую через Apache Flink (java.io.StreamCorJAVA

Программисты JAVA общаются здесь
Ответить Пред. темаСлед. тема
Anonymous
 Невозможно выполнить потоковую передачу данных из одной базы данных MySQL в другую через Apache Flink (java.io.StreamCor

Сообщение Anonymous »

Я разработчик полного стека, но моя компания хотела бы изучить преимущества некоторых шаблонов (сбор данных об изменениях, что для меня в новинку).
По этой причине я пытаюсь создать простое задание Java для Apache Flink.
Цель проста: дана исходная база данных, содержащая одну таблицу (), задание должно обнаруживать изменения, фильтровать результаты по определенным критериям и отправлять их в целевую базу данных в той же таблице.
Экземпляры Apache Flink и MySQL запускаются через Docker Compose. .
Вот файл docker-compose.yaml (я сохранил имена пользователей и пароли, поскольку это всего лишь игровой проект):

Код: Выделить всё

name: cdc-flink
services:
jobmanager:
image: flink:latest
ports:
- 8081:8081
command: jobmanager
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
volumes:
- ./packages/flink/flink-sql-connector-mysql-cdc-3.2.0.jar:/opt/flink/lib/flink-sql-connector-mysql-cdc-3.2.0.jar
- ./packages/flink/flink-table-api-java-bridge-1.20.0.jar:/opt/flink/lib/flink-table-api-java-bridge-1.20.0.jar
- ./packages/flink/flink-table-planner-loader-1.20.0.jar:/opt/flink/lib/flink-table-planner-loader-1.20.0-jar
- ./packages/flink/flink-connector-jdbc-3.2.0-1.19.jar:/opt/flink/lib/flink-connector-jdbc-3.2.0-1.19.jar
- ./packages/flink/mysql-connector-java-8.0.27.jar:/opt/flink/lib/mysql-connector-java-8.0.27.jar
- ./packages/flink/flink-streaming-java-1.20.0.jar:/opt/flink/lib/flink-streaming-java-1.20.0.jar
- ./packages/flink/flink-connector-base-1.20.0.jar:/opt/flink/lib/flink-connector-base-1.20.0.jar
- ./packages/flink/flink-table-runtime-1.20.0.jar:/opt/flink/lib/flink-table-runtime-1.20.0.jar
- ./packages/flink/flink-clients-1.20.0.jar:/opt/flink/lib/flink-clients-1.20.0.jar
- ./packages/flink/flink-core-1.20.0.jar:/opt/flink/lib/flink-core-1.20.0.jar
- ./packages/flink/flink-java-1.20.0.jar:/opt/flink/lib/flink-java-1.20.0.jar
- ./packages/flink/flink-json-1.20.0.jar:/opt/flink/lib/flink-json-1.20.0.jar
- ./packages/flink/config.yaml:/opt/flink/conf/config.yaml
taskmanager:
image: flink:latest
depends_on:
- jobmanager
command: taskmanager
deploy:
replicas: 2
links:
- "jobmanager:jobmanager"
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager

# SQL databases
mysql-source:
image: mysql:8.0
container_name: db-source
volumes:
- ./init/db-source:/docker-entrypoint-initdb.d
- ./volumes/mysql/source:/var/lib/mysql
ports:
- 3307:3306
environment:
- MYSQL_ROOT_PASSWORD=root
- MYSQL_PASSWORD=root
mysql-destination-a:
image: mysql:8.0
container_name: db-destination-a
volumes:
- ./init/db-destination:/docker-entrypoint-initdb.d
- ./volumes/mysql/destination-a:/var/lib/mysql
ports:
- 3308:3306
environment:
- MYSQL_ROOT_PASSWORD=root
- MYSQL_PASSWORD=root
mysql-destination-b:
image: mysql:8.0
container_name:  db-destination-b
volumes:
- ./init/db-destination:/docker-entrypoint-initdb.d
- ./volumes/mysql/destination-b:/var/lib/mysql
ports:
- 3309:3306
environment:
- MYSQL_ROOT_PASSWORD=root
- MYSQL_PASSWORD=root
Еще пара подробностей:
  • Flink conf.yaml перезаписан, поэтому Taskmanager.numberOfTaskSlots равен 10, а не 1 , в противном случае я бы получил сообщение об ошибке (невозможно получить ресурсы).
  • Сопоставления, определенные для служб диспетчера заданий, должны быть правильными.
  • Экземпляры MySQL запущены и работают, и
    service db-source содержит источник базы данных
  • источник базы данных содержит примечания к таблице
    Структура примечаний к таблице равна
    (int, pk)
  • Код: Выделить всё

    destination
    (строка, допускающая значение NULL)
  • (строка, допускающая значение NULL)



Код моей работы на Java состоит всего из двух классов:

Код: Выделить всё

public class CopyNotes {
public static void main(String[] args) throws Exception {
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.inStreamingMode()
.build();

TableEnvironment tableEnv = TableEnvironment.create(settings);
tableEnv.createTable("source", SqlDescriptors.SourceDatabase);
tableEnv.createTable("destination_a", SqlDescriptors.DestinationA);

Table source = tableEnv
.from("source")
.filter($("destination").isEqual("a"))
.select($("id"), $("content"));

TablePipeline pipeline = source
.insertInto("destination_a");

pipeline.printExplain();
pipeline.execute();
}
}

public class SqlDescriptors {
public static final TableDescriptor SourceDatabase =
TableDescriptor.forConnector("mysql-cdc")
.schema(
Schema.newBuilder()
.column("id", DataTypes.INT().notNull())
.column("destination", DataTypes.STRING().nullable())
.column("content", DataTypes.STRING().nullable())
.primaryKey("id")
.build()
)
.option(MySqlSourceOptions.HOSTNAME, "db-source")
.option(MySqlSourceOptions.PORT, 3306)
.option(MySqlSourceOptions.USERNAME, "root")
.option(MySqlSourceOptions.PASSWORD, "root")
.option(MySqlSourceOptions.DATABASE_NAME, "source")
.option(MySqlSourceOptions.TABLE_NAME, "notes")
.option(MySqlSourceOptions.HEARTBEAT_INTERVAL, Duration.ofSeconds(1))
.option(MySqlSourceOptions.SERVER_ID, "5800-5900")
.build();

public static final TableDescriptor DestinationA =
TableDescriptor.forConnector("jdbc")
.schema(
Schema.newBuilder()
.column("id", DataTypes.BIGINT().notNull())
.column("content", DataTypes.STRING().nullable())
.primaryKey("id")
.build())
.option(JdbcConnectorOptions.URL, "jdbc:mysql://db-destination-a:3306/destination")
.option(JdbcConnectorOptions.SINK_PARALLELISM, 1)
.option(JdbcConnectorOptions.TABLE_NAME, "notes")
.option(JdbcConnectorOptions.USERNAME, "root")
.option(JdbcConnectorOptions.PASSWORD, "root")
.build();
}
Если я упаковываю и загружаю задание, Flink не жалуется, но выдает исключение при попытке запустить задание (что должно произойти сразу после отправки, что, по-видимому, вызвано этой ошибкой:

Код: Выделить всё

Caused by: java.io.StreamCorruptedException: unexpected block data
Есть ли у вас какие-либо предложения/наблюдения?
Примечание: вот полная трассировка стека ошибок

Код: Выделить всё

org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
at org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:219)
at org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.handleFailureAndReport(ExecutionFailureHandler.java:166)
at org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:121)
at org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:281)
at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:272)
at org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:265)
at org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:787)
at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:764)
at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:83)
at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:515)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.base/java.lang.reflect.Method.invoke(Unknown Source)
at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRpcInvocation$1(PekkoRpcActor.java:318)
at org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcInvocation(PekkoRpcActor.java:316)
at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:229)
at org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:88)
at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:174)
at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33)
at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29)
at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
at org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
at org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547)
at org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545)
at org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229)
at org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590)
at org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557)
at org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:280)
at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:241)
at org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:253)
at java.base/java.util.concurrent.ForkJoinTask.doExec(Unknown Source)
at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown Source)
at java.base/java.util.concurrent.ForkJoinPool.scan(Unknown Source)
at java.base/java.util.concurrent.ForkJoinPool.runWorker(Unknown Source)
at java.base/java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source)
Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException:  Cannot instantiate user function.
at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:416)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:169)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.(RegularOperatorChain.java:60)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:789)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:771)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:939)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
at java.base/java.lang.Thread.run(Unknown Source)
Caused by: java.io.StreamCorruptedException:  unexpected block data
at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
at java.base/java.io.ObjectInputStream.defaultReadFields(Unknown Source)
at java.base/java.io.ObjectInputStream.readSerialData(Unknown Source)
at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
at java.base/java.io.ObjectInputStream.defaultReadFields(Unknown Source)
at java.base/java.io.ObjectInputStream.readSerialData(Unknown Source)
at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
at java.base/java.io.ObjectInputStream.defaultReadFields(Unknown Source)
at java.base/java.io.ObjectInputStream.readSerialData(Unknown Source)
at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
at java.base/java.io.ObjectInputStream.defaultReadFields(Unknown Source)
at java.base/java.io.ObjectInputStream.readSerialData(Unknown Source)
at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
at java.base/java.io.ObjectInputStream.defaultReadFields(Unknown Source)
at java.base/java.io.ObjectInputStream.readSerialData(Unknown Source)
at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
at java.base/java.io.ObjectInputStream.readArray(Unknown Source)
at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
at java.base/java.io.ObjectInputStream.defaultReadFields(Unknown Source)
at java.base/java.io.ObjectInputStream.readSerialData(Unknown Source)
at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
at java.base/java.io.ObjectInputStream.defaultReadFields(Unknown Source)
at java.base/java.io.ObjectInputStream.readSerialData(Unknown Source)
at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
at java.base/java.io.ObjectInputStream.defaultReadFields(Unknown Source)
at java.base/java.io.ObjectInputStream.readSerialData(Unknown Source)
at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
at java.base/java.io.ObjectInputStream.defaultReadFields(Unknown Source)
at java.base/java.io.ObjectInputStream.readSerialData(Unknown Source)
at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
at java.base/java.io.ObjectInputStream.defaultReadFields(Unknown Source)
at java.base/java.io.ObjectInputStream.readSerialData(Unknown Source)
at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
at java.base/java.io.ObjectInputStream.readObject(Unknown Source)
at java.base/java.io.ObjectInputStream.readObject(Unknown Source)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:533)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:521)
at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:475)
at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:400)
... 9 more
Я пробовал написать конвейер CDC разными способами, включая SQL-подобный код.
Документация Apache Flink привела меня к моему текущему решению, которое по какой-то причине должно быть неверным. .

Подробнее здесь: https://stackoverflow.com/questions/790 ... nk-java-io
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «JAVA»