По этой причине я пытаюсь создать простое задание Java для Apache Flink.
Цель проста: дана исходная база данных, содержащая одну таблицу (
Код: Выделить всё
notes
Экземпляры Apache Flink и MySQL запускаются через Docker Compose. .
Вот файл docker-compose.yaml (я сохранил имена пользователей и пароли, поскольку это всего лишь игровой проект):
Код: Выделить всё
name: cdc-flink
services:
jobmanager:
image: flink:latest
ports:
- 8081:8081
command: jobmanager
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
volumes:
- ./packages/flink/flink-sql-connector-mysql-cdc-3.2.0.jar:/opt/flink/lib/flink-sql-connector-mysql-cdc-3.2.0.jar
- ./packages/flink/flink-table-api-java-bridge-1.20.0.jar:/opt/flink/lib/flink-table-api-java-bridge-1.20.0.jar
- ./packages/flink/flink-table-planner-loader-1.20.0.jar:/opt/flink/lib/flink-table-planner-loader-1.20.0-jar
- ./packages/flink/flink-connector-jdbc-3.2.0-1.19.jar:/opt/flink/lib/flink-connector-jdbc-3.2.0-1.19.jar
- ./packages/flink/mysql-connector-java-8.0.27.jar:/opt/flink/lib/mysql-connector-java-8.0.27.jar
- ./packages/flink/flink-streaming-java-1.20.0.jar:/opt/flink/lib/flink-streaming-java-1.20.0.jar
- ./packages/flink/flink-connector-base-1.20.0.jar:/opt/flink/lib/flink-connector-base-1.20.0.jar
- ./packages/flink/flink-table-runtime-1.20.0.jar:/opt/flink/lib/flink-table-runtime-1.20.0.jar
- ./packages/flink/flink-clients-1.20.0.jar:/opt/flink/lib/flink-clients-1.20.0.jar
- ./packages/flink/flink-core-1.20.0.jar:/opt/flink/lib/flink-core-1.20.0.jar
- ./packages/flink/flink-java-1.20.0.jar:/opt/flink/lib/flink-java-1.20.0.jar
- ./packages/flink/flink-json-1.20.0.jar:/opt/flink/lib/flink-json-1.20.0.jar
- ./packages/flink/config.yaml:/opt/flink/conf/config.yaml
taskmanager:
image: flink:latest
depends_on:
- jobmanager
command: taskmanager
deploy:
replicas: 2
links:
- "jobmanager:jobmanager"
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
# SQL databases
mysql-source:
image: mysql:8.0
container_name: db-source
volumes:
- ./init/db-source:/docker-entrypoint-initdb.d
- ./volumes/mysql/source:/var/lib/mysql
ports:
- 3307:3306
environment:
- MYSQL_ROOT_PASSWORD=root
- MYSQL_PASSWORD=root
mysql-destination-a:
image: mysql:8.0
container_name: db-destination-a
volumes:
- ./init/db-destination:/docker-entrypoint-initdb.d
- ./volumes/mysql/destination-a:/var/lib/mysql
ports:
- 3308:3306
environment:
- MYSQL_ROOT_PASSWORD=root
- MYSQL_PASSWORD=root
mysql-destination-b:
image: mysql:8.0
container_name: db-destination-b
volumes:
- ./init/db-destination:/docker-entrypoint-initdb.d
- ./volumes/mysql/destination-b:/var/lib/mysql
ports:
- 3309:3306
environment:
- MYSQL_ROOT_PASSWORD=root
- MYSQL_PASSWORD=root
- Flink conf.yaml перезаписан, поэтому Taskmanager.numberOfTaskSlots равен 10, а не 1 , в противном случае я бы получил сообщение об ошибке (невозможно получить ресурсы).
- Сопоставления, определенные для служб диспетчера заданий, должны быть правильными.
- Экземпляры MySQL запущены и работают, и
service db-source содержит источник базы данных - источник базы данных содержит примечания к таблице
Структура примечаний к таблице равна
(int, pk)Код: Выделить всё
id
- (строка, допускающая значение NULL)
Код: Выделить всё
destination
- (строка, допускающая значение NULL)
Код: Выделить всё
content
Код моей работы на Java состоит всего из двух классов:
Код: Выделить всё
public class CopyNotes {
public static void main(String[] args) throws Exception {
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.inStreamingMode()
.build();
TableEnvironment tableEnv = TableEnvironment.create(settings);
tableEnv.createTable("source", SqlDescriptors.SourceDatabase);
tableEnv.createTable("destination_a", SqlDescriptors.DestinationA);
Table source = tableEnv
.from("source")
.filter($("destination").isEqual("a"))
.select($("id"), $("content"));
TablePipeline pipeline = source
.insertInto("destination_a");
pipeline.printExplain();
pipeline.execute();
}
}
public class SqlDescriptors {
public static final TableDescriptor SourceDatabase =
TableDescriptor.forConnector("mysql-cdc")
.schema(
Schema.newBuilder()
.column("id", DataTypes.INT().notNull())
.column("destination", DataTypes.STRING().nullable())
.column("content", DataTypes.STRING().nullable())
.primaryKey("id")
.build()
)
.option(MySqlSourceOptions.HOSTNAME, "db-source")
.option(MySqlSourceOptions.PORT, 3306)
.option(MySqlSourceOptions.USERNAME, "root")
.option(MySqlSourceOptions.PASSWORD, "root")
.option(MySqlSourceOptions.DATABASE_NAME, "source")
.option(MySqlSourceOptions.TABLE_NAME, "notes")
.option(MySqlSourceOptions.HEARTBEAT_INTERVAL, Duration.ofSeconds(1))
.option(MySqlSourceOptions.SERVER_ID, "5800-5900")
.build();
public static final TableDescriptor DestinationA =
TableDescriptor.forConnector("jdbc")
.schema(
Schema.newBuilder()
.column("id", DataTypes.BIGINT().notNull())
.column("content", DataTypes.STRING().nullable())
.primaryKey("id")
.build())
.option(JdbcConnectorOptions.URL, "jdbc:mysql://db-destination-a:3306/destination")
.option(JdbcConnectorOptions.SINK_PARALLELISM, 1)
.option(JdbcConnectorOptions.TABLE_NAME, "notes")
.option(JdbcConnectorOptions.USERNAME, "root")
.option(JdbcConnectorOptions.PASSWORD, "root")
.build();
}
Код: Выделить всё
Caused by: java.io.StreamCorruptedException: unexpected block data
Примечание: вот полная трассировка стека ошибок
Код: Выделить всё
org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
at org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:219)
at org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.handleFailureAndReport(ExecutionFailureHandler.java:166)
at org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:121)
at org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:281)
at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:272)
at org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:265)
at org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:787)
at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:764)
at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:83)
at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:515)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.base/java.lang.reflect.Method.invoke(Unknown Source)
at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRpcInvocation$1(PekkoRpcActor.java:318)
at org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcInvocation(PekkoRpcActor.java:316)
at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:229)
at org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:88)
at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:174)
at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33)
at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29)
at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
at org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
at org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547)
at org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545)
at org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229)
at org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590)
at org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557)
at org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:280)
at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:241)
at org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:253)
at java.base/java.util.concurrent.ForkJoinTask.doExec(Unknown Source)
at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown Source)
at java.base/java.util.concurrent.ForkJoinPool.scan(Unknown Source)
at java.base/java.util.concurrent.ForkJoinPool.runWorker(Unknown Source)
at java.base/java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source)
Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot instantiate user function.
at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:416)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:169)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.(RegularOperatorChain.java:60)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:789)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:771)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:939)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
at java.base/java.lang.Thread.run(Unknown Source)
Caused by: java.io.StreamCorruptedException: unexpected block data
at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
at java.base/java.io.ObjectInputStream.defaultReadFields(Unknown Source)
at java.base/java.io.ObjectInputStream.readSerialData(Unknown Source)
at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
at java.base/java.io.ObjectInputStream.defaultReadFields(Unknown Source)
at java.base/java.io.ObjectInputStream.readSerialData(Unknown Source)
at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
at java.base/java.io.ObjectInputStream.defaultReadFields(Unknown Source)
at java.base/java.io.ObjectInputStream.readSerialData(Unknown Source)
at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
at java.base/java.io.ObjectInputStream.defaultReadFields(Unknown Source)
at java.base/java.io.ObjectInputStream.readSerialData(Unknown Source)
at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
at java.base/java.io.ObjectInputStream.defaultReadFields(Unknown Source)
at java.base/java.io.ObjectInputStream.readSerialData(Unknown Source)
at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
at java.base/java.io.ObjectInputStream.readArray(Unknown Source)
at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
at java.base/java.io.ObjectInputStream.defaultReadFields(Unknown Source)
at java.base/java.io.ObjectInputStream.readSerialData(Unknown Source)
at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
at java.base/java.io.ObjectInputStream.defaultReadFields(Unknown Source)
at java.base/java.io.ObjectInputStream.readSerialData(Unknown Source)
at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
at java.base/java.io.ObjectInputStream.defaultReadFields(Unknown Source)
at java.base/java.io.ObjectInputStream.readSerialData(Unknown Source)
at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
at java.base/java.io.ObjectInputStream.defaultReadFields(Unknown Source)
at java.base/java.io.ObjectInputStream.readSerialData(Unknown Source)
at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
at java.base/java.io.ObjectInputStream.defaultReadFields(Unknown Source)
at java.base/java.io.ObjectInputStream.readSerialData(Unknown Source)
at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
at java.base/java.io.ObjectInputStream.readObject(Unknown Source)
at java.base/java.io.ObjectInputStream.readObject(Unknown Source)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:533)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:521)
at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:475)
at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:400)
... 9 more
Документация Apache Flink привела меня к моему текущему решению, которое по какой-то причине должно быть неверным. .
Подробнее здесь: https://stackoverflow.com/questions/790 ... nk-java-io