Я пытаюсь реализовать jdbc-коды Apache Flink, которые считывают данные из Apache Kafka и вставляют их в MySQL. Сначала я создаю простые коды, которые считывают данные из одной темы Kafka и вставляют в одну таблицу MySQL. Ниже приведены коды.
Коды работают успешно и без исключений. Но у меня есть несколько тем Kafka и соответствующие им несколько таблиц MySQL. Поэтому я создаю цикл for для подключения jdbc, как показано ниже,
Exception in thread read_grpc_client_inputs:
Traceback (most recent call last):
File "/usr/lib/python/anaconda3/lib/python3.11/threading.py", line 1045, in _bootstrap_inner
self.run()
File "/usr/lib/python/anaconda3/lib/python3.11/threading.py", line 982, in run
self._target(*self._args, **self._kwargs)
File "/home/joseph/VSCode_Workspace/.venv-etl/lib/python3.11/site-packages/apache_beam/runners/worker/data_plane.py", line 669, in
target=lambda: self._read_inputs(elements_iterator),
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/joseph/VSCode_Workspace/.venv-etl/lib/python3.11/site-packages/apache_beam/runners/worker/data_plane.py", line 652, in _read_inputs
for elements in elements_iterator:
File "/home/joseph/VSCode_Workspace/.venv-etl/lib/python3.11/site-packages/grpc/_channel.py", line 543, in __next__
return self._next()
^^^^^^^^^^^^
File "/home/joseph/VSCode_Workspace/.venv-etl/lib/python3.11/site-packages/grpc/_channel.py", line 969, in _next
raise self
grpc._channel._MultiThreadedRendezvous:
Exception in thread read_grpc_client_inputs:
Traceback (most recent call last):
File "/usr/lib/python/anaconda3/lib/python3.11/threading.py", line 1045, in _bootstrap_inner
self.run()
File "/usr/lib/python/anaconda3/lib/python3.11/threading.py", line 982, in run
self._target(*self._args, **self._kwargs)
File "/home/joseph/VSCode_Workspace/.venv-etl/lib/python3.11/site-packages/apache_beam/runners/worker/data_plane.py", line 669, in
target=lambda: self._read_inputs(elements_iterator),
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/joseph/VSCode_Workspace/.venv-etl/lib/python3.11/site-packages/apache_beam/runners/worker/data_plane.py", line 652, in _read_inputs
for elements in elements_iterator:
File "/home/joseph/VSCode_Workspace/.venv-etl/lib/python3.11/site-packages/grpc/_channel.py", line 543, in __next__
return self._next()
^^^^^^^^^^^^
File "/home/joseph/VSCode_Workspace/.venv-etl/lib/python3.11/site-packages/grpc/_channel.py", line 969, in _next
raise self
grpc._channel._MultiThreadedRendezvous:
Exception in thread read_grpc_client_inputs:
Traceback (most recent call last):
File "/usr/lib/python/anaconda3/lib/python3.11/threading.py", line 1045, in _bootstrap_inner
self.run()
File "/usr/lib/python/anaconda3/lib/python3.11/threading.py", line 982, in run
self._target(*self._args, **self._kwargs)
File "/home/joseph/VSCode_Workspace/.venv-etl/lib/python3.11/site-packages/apache_beam/runners/worker/data_plane.py", line 669, in
target=lambda: self._read_inputs(elements_iterator),
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/joseph/VSCode_Workspace/.venv-etl/lib/python3.11/site-packages/apache_beam/runners/worker/data_plane.py", line 652, in _read_inputs
for elements in elements_iterator:
File "/home/joseph/VSCode_Workspace/.venv-etl/lib/python3.11/site-packages/grpc/_channel.py", line 543, in __next__
return self._next()
^^^^^^^^^^^^
File "/home/joseph/VSCode_Workspace/.venv-etl/lib/python3.11/site-packages/grpc/_channel.py", line 969, in _next
raise self
grpc._channel._MultiThreadedRendezvous:
Traceback (most recent call last):
File "/home/joseph/VSCode_Workspace/etl-stream-python/com/aaa/etl/jdbctest.py", line 116, in
env.execute('Flink Save2 MySQL')
File "/home/joseph/VSCode_Workspace/.venv-etl/lib/python3.11/site-packages/pyflink/datastream/stream_execution_environment.py", line 824, in execute
return JobExecutionResult(self._j_stream_execution_environment.execute(j_stream_graph))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/joseph/VSCode_Workspace/.venv-etl/lib/python3.11/site-packages/py4j/java_gateway.py", line 1322, in __call__
return_value = get_return_value(
^^^^^^^^^^^^^^^^^
File "/home/joseph/VSCode_Workspace/.venv-etl/lib/python3.11/site-packages/pyflink/util/exceptions.py", line 146, in deco
return f(*a, **kw)
^^^^^^^^^^^
File "/home/joseph/VSCode_Workspace/.venv-etl/lib/python3.11/site-packages/py4j/protocol.py", line 326, in get_return_value
raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o10.execute.
: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:141)
at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:646)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147)
at org.apache.flink.runtime.rpc.pekko.PekkoInvocationHandler.lambda$invokeRpc$1(PekkoInvocationHandler.java:268)
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147)
at org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1287)
at org.apache.flink.runtime.concurrent.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java:93)
at org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
at org.apache.flink.runtime.concurrent.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92)
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147)
at org.apache.flink.runtime.concurrent.pekko.ScalaFutureUtils$1.onComplete(ScalaFutureUtils.java:47)
at org.apache.pekko.dispatch.OnComplete.internal(Future.scala:310)
at org.apache.pekko.dispatch.OnComplete.internal(Future.scala:307)
at org.apache.pekko.dispatch.japi$CallbackBridge.apply(Future.scala:234)
at org.apache.pekko.dispatch.japi$CallbackBridge.apply(Future.scala:231)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
at org.apache.flink.runtime.concurrent.pekko.ScalaFutureUtils$DirectExecutionContext.execute(ScalaFutureUtils.java:65)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:72)
at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:288)
at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:288)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:288)
at org.apache.pekko.pattern.PromiseActorRef.$bang(AskSupport.scala:629)
at org.apache.pekko.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:34)
at org.apache.pekko.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:33)
at scala.concurrent.Future.$anonfun$andThen$1(Future.scala:536)
at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
at org.apache.pekko.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:73)
at org.apache.pekko.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:110)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:85)
at org.apache.pekko.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:110)
at org.apache.pekko.dispatch.TaskInvocation.run(AbstractDispatcher.scala:59)
at org.apache.pekko.dispatch.ForkJoinExecutorConfigurator$PekkoForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:57)
at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373)
at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182)
at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655)
at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622)
at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165)
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
at org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:219)
at org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.handleFailureAndReport(ExecutionFailureHandler.java:166)
at org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:121)
at org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:281)
at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:272)
at org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:265)
at org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:787)
at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:764)
at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:83)
at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:515)
at jdk.internal.reflect.GeneratedMethodAccessor31.invoke(Unknown Source)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:569)
at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRpcInvocation$1(PekkoRpcActor.java:318)
at org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcInvocation(PekkoRpcActor.java:316)
at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:229)
at org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:88)
at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:174)
at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33)
at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29)
at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
at org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
at org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547)
at org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545)
at org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229)
at org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590)
at org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557)
at org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:280)
at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:241)
at org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:253)
... 5 more
Caused by: java.io.IOException: Failed to deserialize consumer record due to
at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:56)
at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:33)
at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:203)
at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:422)
at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:638)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:973)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:917)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:949)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:92)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:310)
at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110)
at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter$SourceOutputWrapper.collect(KafkaRecordEmitter.java:67)
at org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:84)
at org.apache.flink.connector.kafka.source.reader.deserializer.KafkaValueOnlyDeserializationSchemaWrapper.deserialize(KafkaValueOnlyDeserializationSchemaWrapper.java:51)
at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:53)
... 14 more
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:92)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50)
at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collectAndCheckIfChained(ChainingOutput.java:90)
at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collectAndCheckIfChained(ChainingOutput.java:40)
at org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:83)
at org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:34)
at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:52)
at org.apache.flink.streaming.api.operators.python.process.collector.RunnerOutputCollector.collect(RunnerOutputCollector.java:52)
at org.apache.flink.streaming.api.operators.python.process.AbstractExternalOneInputPythonFunctionOperator.emitResult(AbstractExternalOneInputPythonFunctionOperator.java:133)
at org.apache.flink.streaming.api.operators.python.process.AbstractExternalPythonFunctionOperator.emitResults(AbstractExternalPythonFunctionOperator.java:142)
at org.apache.flink.streaming.api.operators.python.process.AbstractExternalPythonFunctionOperator.invokeFinishBundle(AbstractExternalPythonFunctionOperator.java:101)
at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.checkInvokeFinishBundleByCount(AbstractPythonFunctionOperator.java:292)
at org.apache.flink.streaming.api.operators.python.process.AbstractExternalOneInputPythonFunctionOperator.processElement(AbstractExternalOneInputPythonFunctionOperator.java:146)
at org.apache.flink.streaming.api.operators.python.process.ExternalPythonProcessOperator.processElement(ExternalPythonProcessOperator.java:112)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
... 22 more
Caused by: java.lang.NullPointerException: Cannot invoke "org.apache.flink.types.Row.getFieldNames(boolean)" because "from" is null
at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:137)
at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:69)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:74)
Эти исключения связаны с ошибками конфигурации Flink? Исключением, похоже, не являются несоответствия грамматик кодов. Тогда мне нужно изменить конфигурацию Flink? К вашему сведению, я создал коды Java SDK Flink, имеющие аналогичную архитектуру, и эти коды работают без исключений. == Обновленные части
Я обнаружил интересный факт. Вышеупомянутое исключение было создано ровно в 7000 входных строках. Всякий раз, когда 7001-я строка данных типа CSV собиралась вставиться в MySQL, возникали ошибки. Я думаю, что какое-то ограничение не позволяет вставить еще 7000 данных в MySQL, и это вопрос конфигурации. Я увеличил параметр функции with_batch_size, но возникли те же ошибки. Есть идеи?
Я пытаюсь реализовать jdbc-коды Apache Flink, которые считывают данные из Apache Kafka и вставляют их в MySQL. Сначала я создаю простые коды, которые считывают данные из одной темы Kafka и вставляют в одну таблицу MySQL. Ниже приведены коды. [code]from title_flink_stream import TITLE_FLINK_STREAM
from pyflink.common import WatermarkStrategy, Types, SimpleStringSchema, Row from pyflink.common.typeinfo import RowTypeInfo from pyflink.datastream import StreamExecutionEnvironment from pyflink.datastream.connectors.jdbc import JdbcConnectionOptions, JdbcExecutionOptions, JdbcSink from pyflink.datastream.connectors.kafka import KafkaSource, KafkaOffsetsInitializer from pyflink.datastream.functions import MapFunction
from pyflink.datastream.functions import MapFunction
env.execute('Flink Save2 MySQL') env.close() [/code] Коды работают успешно и без исключений. Но у меня есть несколько тем Kafka и соответствующие им несколько таблиц MySQL. Поэтому я создаю цикл for для подключения jdbc, как показано ниже, [code]for flink_stream in TITLE_FLINK_STREAM: source = KafkaSource.builder() \ .set_bootstrap_servers(kafka_brokerlist) \ .set_topics('topic_' + flink_stream.suffix) \ .set_starting_offsets(KafkaOffsetsInitializer.earliest()) \ .set_value_only_deserializer(SimpleStringSchema()) \ .build()
env.execute('Flink Save2 MySQL') env.close() [/code] Некоторое время вставка данных выполняется успешно, но коды цикла прерываются и выдают следующие исключения. [code]Exception in thread read_grpc_client_inputs: Traceback (most recent call last): File "/usr/lib/python/anaconda3/lib/python3.11/threading.py", line 1045, in _bootstrap_inner self.run() File "/usr/lib/python/anaconda3/lib/python3.11/threading.py", line 982, in run self._target(*self._args, **self._kwargs) File "/home/joseph/VSCode_Workspace/.venv-etl/lib/python3.11/site-packages/apache_beam/runners/worker/data_plane.py", line 669, in target=lambda: self._read_inputs(elements_iterator), ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/joseph/VSCode_Workspace/.venv-etl/lib/python3.11/site-packages/apache_beam/runners/worker/data_plane.py", line 652, in _read_inputs for elements in elements_iterator: File "/home/joseph/VSCode_Workspace/.venv-etl/lib/python3.11/site-packages/grpc/_channel.py", line 543, in __next__ return self._next() ^^^^^^^^^^^^ File "/home/joseph/VSCode_Workspace/.venv-etl/lib/python3.11/site-packages/grpc/_channel.py", line 969, in _next raise self grpc._channel._MultiThreadedRendezvous: Exception in thread read_grpc_client_inputs: Traceback (most recent call last): File "/usr/lib/python/anaconda3/lib/python3.11/threading.py", line 1045, in _bootstrap_inner self.run() File "/usr/lib/python/anaconda3/lib/python3.11/threading.py", line 982, in run self._target(*self._args, **self._kwargs) File "/home/joseph/VSCode_Workspace/.venv-etl/lib/python3.11/site-packages/apache_beam/runners/worker/data_plane.py", line 669, in target=lambda: self._read_inputs(elements_iterator), ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/joseph/VSCode_Workspace/.venv-etl/lib/python3.11/site-packages/apache_beam/runners/worker/data_plane.py", line 652, in _read_inputs for elements in elements_iterator: File "/home/joseph/VSCode_Workspace/.venv-etl/lib/python3.11/site-packages/grpc/_channel.py", line 543, in __next__ return self._next() ^^^^^^^^^^^^ File "/home/joseph/VSCode_Workspace/.venv-etl/lib/python3.11/site-packages/grpc/_channel.py", line 969, in _next raise self grpc._channel._MultiThreadedRendezvous: Exception in thread read_grpc_client_inputs: Traceback (most recent call last): File "/usr/lib/python/anaconda3/lib/python3.11/threading.py", line 1045, in _bootstrap_inner self.run() File "/usr/lib/python/anaconda3/lib/python3.11/threading.py", line 982, in run self._target(*self._args, **self._kwargs) File "/home/joseph/VSCode_Workspace/.venv-etl/lib/python3.11/site-packages/apache_beam/runners/worker/data_plane.py", line 669, in target=lambda: self._read_inputs(elements_iterator), ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/joseph/VSCode_Workspace/.venv-etl/lib/python3.11/site-packages/apache_beam/runners/worker/data_plane.py", line 652, in _read_inputs for elements in elements_iterator: File "/home/joseph/VSCode_Workspace/.venv-etl/lib/python3.11/site-packages/grpc/_channel.py", line 543, in __next__ return self._next() ^^^^^^^^^^^^ File "/home/joseph/VSCode_Workspace/.venv-etl/lib/python3.11/site-packages/grpc/_channel.py", line 969, in _next raise self grpc._channel._MultiThreadedRendezvous: Traceback (most recent call last): File "/home/joseph/VSCode_Workspace/etl-stream-python/com/aaa/etl/jdbctest.py", line 116, in env.execute('Flink Save2 MySQL') File "/home/joseph/VSCode_Workspace/.venv-etl/lib/python3.11/site-packages/pyflink/datastream/stream_execution_environment.py", line 824, in execute return JobExecutionResult(self._j_stream_execution_environment.execute(j_stream_graph)) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/joseph/VSCode_Workspace/.venv-etl/lib/python3.11/site-packages/py4j/java_gateway.py", line 1322, in __call__ return_value = get_return_value( ^^^^^^^^^^^^^^^^^ File "/home/joseph/VSCode_Workspace/.venv-etl/lib/python3.11/site-packages/pyflink/util/exceptions.py", line 146, in deco return f(*a, **kw) ^^^^^^^^^^^ File "/home/joseph/VSCode_Workspace/.venv-etl/lib/python3.11/site-packages/py4j/protocol.py", line 326, in get_return_value raise Py4JJavaError( py4j.protocol.Py4JJavaError: An error occurred while calling o10.execute. : org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144) at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:141) at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:646) at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510) at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147) at org.apache.flink.runtime.rpc.pekko.PekkoInvocationHandler.lambda$invokeRpc$1(PekkoInvocationHandler.java:268) at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863) at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841) at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510) at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147) at org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1287) at org.apache.flink.runtime.concurrent.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java:93) at org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) at org.apache.flink.runtime.concurrent.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92) at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863) at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841) at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510) at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147) at org.apache.flink.runtime.concurrent.pekko.ScalaFutureUtils$1.onComplete(ScalaFutureUtils.java:47) at org.apache.pekko.dispatch.OnComplete.internal(Future.scala:310) at org.apache.pekko.dispatch.OnComplete.internal(Future.scala:307) at org.apache.pekko.dispatch.japi$CallbackBridge.apply(Future.scala:234) at org.apache.pekko.dispatch.japi$CallbackBridge.apply(Future.scala:231) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64) at org.apache.flink.runtime.concurrent.pekko.ScalaFutureUtils$DirectExecutionContext.execute(ScalaFutureUtils.java:65) at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:72) at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:288) at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:288) at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:288) at org.apache.pekko.pattern.PromiseActorRef.$bang(AskSupport.scala:629) at org.apache.pekko.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:34) at org.apache.pekko.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:33) at scala.concurrent.Future.$anonfun$andThen$1(Future.scala:536) at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33) at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64) at org.apache.pekko.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:73) at org.apache.pekko.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:110) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:85) at org.apache.pekko.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:110) at org.apache.pekko.dispatch.TaskInvocation.run(AbstractDispatcher.scala:59) at org.apache.pekko.dispatch.ForkJoinExecutorConfigurator$PekkoForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:57) at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373) at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182) at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655) at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622) at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165) Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy at org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:219) at org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.handleFailureAndReport(ExecutionFailureHandler.java:166) at org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:121) at org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:281) at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:272) at org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:265) at org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:787) at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:764) at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:83) at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:515) at jdk.internal.reflect.GeneratedMethodAccessor31.invoke(Unknown Source) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:569) at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRpcInvocation$1(PekkoRpcActor.java:318) at org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83) at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcInvocation(PekkoRpcActor.java:316) at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:229) at org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:88) at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:174) at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33) at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29) at scala.PartialFunction.applyOrElse(PartialFunction.scala:127) at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126) at org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) at org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547) at org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545) at org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229) at org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590) at org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557) at org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:280) at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:241) at org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:253) ... 5 more Caused by: java.io.IOException: Failed to deserialize consumer record due to at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:56) at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:33) at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:203) at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:422) at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:638) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:973) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:917) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:949) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) at java.base/java.lang.Thread.run(Thread.java:840) Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:92) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:310) at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110) at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter$SourceOutputWrapper.collect(KafkaRecordEmitter.java:67) at org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:84) at org.apache.flink.connector.kafka.source.reader.deserializer.KafkaValueOnlyDeserializationSchemaWrapper.deserialize(KafkaValueOnlyDeserializationSchemaWrapper.java:51) at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:53) ... 14 more Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:92) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50) at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collectAndCheckIfChained(ChainingOutput.java:90) at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collectAndCheckIfChained(ChainingOutput.java:40) at org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:83) at org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:34) at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:52) at org.apache.flink.streaming.api.operators.python.process.collector.RunnerOutputCollector.collect(RunnerOutputCollector.java:52) at org.apache.flink.streaming.api.operators.python.process.AbstractExternalOneInputPythonFunctionOperator.emitResult(AbstractExternalOneInputPythonFunctionOperator.java:133) at org.apache.flink.streaming.api.operators.python.process.AbstractExternalPythonFunctionOperator.emitResults(AbstractExternalPythonFunctionOperator.java:142) at org.apache.flink.streaming.api.operators.python.process.AbstractExternalPythonFunctionOperator.invokeFinishBundle(AbstractExternalPythonFunctionOperator.java:101) at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.checkInvokeFinishBundleByCount(AbstractPythonFunctionOperator.java:292) at org.apache.flink.streaming.api.operators.python.process.AbstractExternalOneInputPythonFunctionOperator.processElement(AbstractExternalOneInputPythonFunctionOperator.java:146) at org.apache.flink.streaming.api.operators.python.process.ExternalPythonProcessOperator.processElement(ExternalPythonProcessOperator.java:112) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75) ... 22 more Caused by: java.lang.NullPointerException: Cannot invoke "org.apache.flink.types.Row.getFieldNames(boolean)" because "from" is null at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:137) at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:69) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:74) [/code] Эти исключения связаны с ошибками конфигурации Flink? Исключением, похоже, не являются несоответствия грамматик кодов. Тогда мне нужно изменить конфигурацию Flink? К вашему сведению, я создал коды Java SDK Flink, имеющие аналогичную архитектуру, и эти коды работают без исключений. [b]== Обновленные части[/b] Я обнаружил интересный факт. Вышеупомянутое исключение было создано ровно в 7000 входных строках. Всякий раз, когда 7001-я строка данных типа CSV собиралась вставиться в MySQL, возникали ошибки. Я думаю, что какое-то ограничение не позволяет вставить еще 7000 данных в MySQL, и это вопрос конфигурации. Я увеличил параметр функции with_batch_size, но возникли те же ошибки. Есть идеи?
Я пытаюсь реализовать jdbc-коды Apache Flink, которые считывают данные из Apache Kafka и вставляют их в MySQL. Сначала я создаю простые коды, которые считывают данные из одной темы Kafka и вставляют в одну таблицу MySQL. Ниже приведены коды.
from...
Я пытаюсь оценить вариант обработки типов данных в Spark 4 Java API. Ниже приведены примеры Java-кодов различных типов данных.
SparkSession spark = SparkSession.builder().master( local ).appName( VariantExample ).getOrCreate();
Я оцениваю метод Spark 4 try_variant_get, обрабатывающий данные типа варианта. Сначала я привожу примеры операторов sql.
CREATE TABLE family (
id INT,
data VARIANT
);
INSERT INTO family (id, data)
VALUES
(1, PARSE_JSON('{ name : Alice , age...
Я понимаю, что array_unshift($row ,$fmt) заключается в том, чтобы сделать $row примерно таким, как показано ниже, поэтому действительные данные находятся с позиции от 1 до 8. array(9) {
=>
string(8) ssssssss
=>
string(4) 7934
=>
string(6)...
Я понимаю, что array_unshift($row ,$fmt) заключается в том, чтобы сделать $row примерно таким, как показано ниже, поэтому действительные данные находятся с позиции от 1 до 8. array(9) {
=>
string(8) ssssssss
=>
string(4) 7934
=>
string(6)...