Вызвано: java.lang.NullPointerException: невозможно вызвать «org.apache.flink.types.Row.getFieldNames(boolean)», поскольPython

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 Вызвано: java.lang.NullPointerException: невозможно вызвать «org.apache.flink.types.Row.getFieldNames(boolean)», посколь

Сообщение Anonymous »

Я пытаюсь реализовать jdbc-коды Apache Flink, которые считывают данные из Apache Kafka и вставляют их в MySQL. Сначала я создаю простые коды, которые считывают данные из одной темы Kafka и вставляют в одну таблицу MySQL. Ниже приведены коды.

Код: Выделить всё

from title_flink_stream import TITLE_FLINK_STREAM

from pyflink.common import WatermarkStrategy, Types, SimpleStringSchema, Row
from pyflink.common.typeinfo import RowTypeInfo
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.jdbc import JdbcConnectionOptions, JdbcExecutionOptions, JdbcSink
from pyflink.datastream.connectors.kafka import KafkaSource, KafkaOffsetsInitializer
from pyflink.datastream.functions import MapFunction

from pyflink.datastream.functions import MapFunction

import configparser
import os

config = configparser.ConfigParser()

path = os.path.dirname(__file__)
os.chdir(path)

config.read('resources/SystemConfig.ini')

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)

env.add_jars('file:///home/joseph/flink/jars/flink-connector-kafka-3.1.0-1.18.jar',
'file:///home/joseph/flink/jars/kafka-clients-3.8.0.jar',
'file:///home/joseph/flink/jars/flink-connector-jdbc-3.1.2-1.18.jar',
'file:///home/joseph/flink/jars/mysql-connector-j-8.3.0.jar')

class CustomCsvMapFunction(MapFunction):
def map(self, value):
str_list = value.split(',')
# return Types.ROW(str_list)
if str_list[0] != '' and str_list[1] != '':
return Row(date=str_list[0], value=float(str_list[1]), state=str_list[2], id=str_list[3], title=str_list[4], frequency_short=str_list[5],\
units_short=str_list[6], seasonal_adjustment_short=str_list[7])

kafka_brokerlist = config['KAFKA_CONFIG']['kafka.brokerlist']

mysql_user = config['MYSQL_CONFIG']['mysql.user']
mysql_password = config['MYSQL_CONFIG']['mysql.password']
mysql_host_url = config['MYSQL_CONFIG']['mysql.host.url']

type_name = ['date','value','state','id','title','frequency_short','units_short','seasonal_adjustment_short']
type_schema = [Types.STRING(), Types.FLOAT(), Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING()]
output_type = Types.ROW_NAMED(type_name, type_schema)
type_info = RowTypeInfo(type_schema, type_name)

source = KafkaSource.builder() \
.set_bootstrap_servers(kafka_brokerlist) \
.set_topics('topic_' + 'exportGoods') \
.set_starting_offsets(KafkaOffsetsInitializer.earliest()) \
.set_value_only_deserializer(SimpleStringSchema()) \
.build()

ds = env.from_source(source, WatermarkStrategy.no_watermarks(), "Kafka Source")

csv_ds = ds.filter(lambda str: not(str.startswith('date'))).map(CustomCsvMapFunction(), output_type=output_type)
# csv_ds.print()

jdbcConnOptions = JdbcConnectionOptions.JdbcConnectionOptionsBuilder()\
.with_url(mysql_host_url)\
.with_driver_name('com.mysql.cj.jdbc.Driver')\
.with_user_name(mysql_user)\
.with_password(mysql_password)\
.build()

jdbcExeOptions = JdbcExecutionOptions.builder()\
.with_batch_interval_ms(1000)\
.with_batch_size(200)\
.with_max_retries(5)\
.build()

csv_ds.add_sink(
JdbcSink.sink(
'INSERT INTO ' + 'tbl_' + 'exportGoods' + ' VALUES(?, ?, ?, ?, ?, ?, ?, ?)',
type_info, jdbcConnOptions, jdbcExeOptions))

env.execute('Flink Save2 MySQL')
env.close()
Коды работают успешно и без исключений. Но у меня есть несколько тем Kafka и соответствующие им несколько таблиц MySQL. Поэтому я создаю цикл for для подключения jdbc, как показано ниже,

Код: Выделить всё

for flink_stream in TITLE_FLINK_STREAM:
source = KafkaSource.builder() \
.set_bootstrap_servers(kafka_brokerlist) \
.set_topics('topic_' + flink_stream.suffix) \
.set_starting_offsets(KafkaOffsetsInitializer.earliest()) \
.set_value_only_deserializer(SimpleStringSchema()) \
.build()

ds = env.from_source(source, WatermarkStrategy.no_watermarks(), "Kafka Source")

csv_ds = ds.filter(lambda str: not(str.startswith('date'))).map(CustomCsvMapFunction(), output_type=output_type)
# csv_ds.print()

jdbcConnOptions = JdbcConnectionOptions.JdbcConnectionOptionsBuilder()\
.with_url(mysql_host_url)\
.with_driver_name('com.mysql.cj.jdbc.Driver')\
.with_user_name(mysql_user)\
.with_password(mysql_password)\
.build()

jdbcExeOptions = JdbcExecutionOptions.builder()\
.with_batch_interval_ms(1000)\
.with_batch_size(200)\
.with_max_retries(5)\
.build()

csv_ds.add_sink(
JdbcSink.sink(
'INSERT INTO ' + 'tbl_' + flink_stream.suffix + ' VALUES(?, ?, ?, ?, ?, ?, ?, ?)',
type_info, jdbcConnOptions, jdbcExeOptions))

env.execute('Flink Save2 MySQL')
env.close()
Некоторое время вставка данных выполняется успешно, но коды цикла прерываются и выдают следующие исключения.

Код: Выделить всё

Exception in thread read_grpc_client_inputs:
Traceback (most recent call last):
File "/usr/lib/python/anaconda3/lib/python3.11/threading.py", line 1045, in _bootstrap_inner
self.run()
File "/usr/lib/python/anaconda3/lib/python3.11/threading.py", line 982, in run
self._target(*self._args, **self._kwargs)
File "/home/joseph/VSCode_Workspace/.venv-etl/lib/python3.11/site-packages/apache_beam/runners/worker/data_plane.py", line 669, in 
target=lambda: self._read_inputs(elements_iterator),
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/joseph/VSCode_Workspace/.venv-etl/lib/python3.11/site-packages/apache_beam/runners/worker/data_plane.py", line 652, in _read_inputs
for elements in elements_iterator:
File "/home/joseph/VSCode_Workspace/.venv-etl/lib/python3.11/site-packages/grpc/_channel.py", line 543, in __next__
return self._next()
^^^^^^^^^^^^
File "/home/joseph/VSCode_Workspace/.venv-etl/lib/python3.11/site-packages/grpc/_channel.py", line 969, in _next
raise self
grpc._channel._MultiThreadedRendezvous: 
Exception in thread read_grpc_client_inputs:
Traceback (most recent call last):
File "/usr/lib/python/anaconda3/lib/python3.11/threading.py", line 1045, in _bootstrap_inner
self.run()
File "/usr/lib/python/anaconda3/lib/python3.11/threading.py", line 982, in run
self._target(*self._args, **self._kwargs)
File "/home/joseph/VSCode_Workspace/.venv-etl/lib/python3.11/site-packages/apache_beam/runners/worker/data_plane.py", line 669, in 
target=lambda: self._read_inputs(elements_iterator),
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/joseph/VSCode_Workspace/.venv-etl/lib/python3.11/site-packages/apache_beam/runners/worker/data_plane.py", line 652, in _read_inputs
for elements in elements_iterator:
File "/home/joseph/VSCode_Workspace/.venv-etl/lib/python3.11/site-packages/grpc/_channel.py", line 543, in __next__
return self._next()
^^^^^^^^^^^^
File "/home/joseph/VSCode_Workspace/.venv-etl/lib/python3.11/site-packages/grpc/_channel.py", line 969, in _next
raise self
grpc._channel._MultiThreadedRendezvous:  
Exception in thread read_grpc_client_inputs:
Traceback (most recent call last):
File "/usr/lib/python/anaconda3/lib/python3.11/threading.py", line 1045, in _bootstrap_inner
self.run()
File "/usr/lib/python/anaconda3/lib/python3.11/threading.py", line 982, in run
self._target(*self._args, **self._kwargs)
File "/home/joseph/VSCode_Workspace/.venv-etl/lib/python3.11/site-packages/apache_beam/runners/worker/data_plane.py", line 669, in 
target=lambda: self._read_inputs(elements_iterator),
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/joseph/VSCode_Workspace/.venv-etl/lib/python3.11/site-packages/apache_beam/runners/worker/data_plane.py", line 652, in _read_inputs
for elements in elements_iterator:
File "/home/joseph/VSCode_Workspace/.venv-etl/lib/python3.11/site-packages/grpc/_channel.py", line 543, in __next__
return self._next()
^^^^^^^^^^^^
File "/home/joseph/VSCode_Workspace/.venv-etl/lib/python3.11/site-packages/grpc/_channel.py", line 969, in _next
raise self
grpc._channel._MultiThreadedRendezvous: 
Traceback (most recent call last):
File "/home/joseph/VSCode_Workspace/etl-stream-python/com/aaa/etl/jdbctest.py", line 116, in 
env.execute('Flink Save2 MySQL')
File "/home/joseph/VSCode_Workspace/.venv-etl/lib/python3.11/site-packages/pyflink/datastream/stream_execution_environment.py", line 824, in execute
return JobExecutionResult(self._j_stream_execution_environment.execute(j_stream_graph))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/joseph/VSCode_Workspace/.venv-etl/lib/python3.11/site-packages/py4j/java_gateway.py", line 1322, in __call__
return_value = get_return_value(
^^^^^^^^^^^^^^^^^
File "/home/joseph/VSCode_Workspace/.venv-etl/lib/python3.11/site-packages/pyflink/util/exceptions.py", line 146, in deco
return f(*a, **kw)
^^^^^^^^^^^
File "/home/joseph/VSCode_Workspace/.venv-etl/lib/python3.11/site-packages/py4j/protocol.py", line 326, in get_return_value
raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o10.execute.
: org.apache.flink.runtime.client.JobExecutionException:  Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:141)
at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:646)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147)
at org.apache.flink.runtime.rpc.pekko.PekkoInvocationHandler.lambda$invokeRpc$1(PekkoInvocationHandler.java:268)
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147)
at org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1287)
at org.apache.flink.runtime.concurrent.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java:93)
at org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
at org.apache.flink.runtime.concurrent.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92)
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147)
at org.apache.flink.runtime.concurrent.pekko.ScalaFutureUtils$1.onComplete(ScalaFutureUtils.java:47)
at org.apache.pekko.dispatch.OnComplete.internal(Future.scala:310)
at org.apache.pekko.dispatch.OnComplete.internal(Future.scala:307)
at org.apache.pekko.dispatch.japi$CallbackBridge.apply(Future.scala:234)
at org.apache.pekko.dispatch.japi$CallbackBridge.apply(Future.scala:231)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
at org.apache.flink.runtime.concurrent.pekko.ScalaFutureUtils$DirectExecutionContext.execute(ScalaFutureUtils.java:65)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:72)
at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:288)
at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:288)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:288)
at org.apache.pekko.pattern.PromiseActorRef.$bang(AskSupport.scala:629)
at org.apache.pekko.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:34)
at org.apache.pekko.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:33)
at scala.concurrent.Future.$anonfun$andThen$1(Future.scala:536)
at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
at org.apache.pekko.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:73)
at org.apache.pekko.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:110)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:85)
at org.apache.pekko.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:110)
at org.apache.pekko.dispatch.TaskInvocation.run(AbstractDispatcher.scala:59)
at org.apache.pekko.dispatch.ForkJoinExecutorConfigurator$PekkoForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:57)
at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373)
at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182)
at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655)
at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622)
at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165)
Caused by:  org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
at org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:219)
at org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.handleFailureAndReport(ExecutionFailureHandler.java:166)
at org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:121)
at org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:281)
at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:272)
at org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:265)
at org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:787)
at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:764)
at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:83)
at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:515)
at jdk.internal.reflect.GeneratedMethodAccessor31.invoke(Unknown Source)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:569)
at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRpcInvocation$1(PekkoRpcActor.java:318)
at org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcInvocation(PekkoRpcActor.java:316)
at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:229)
at org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:88)
at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:174)
at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33)
at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29)
at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
at org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
at org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547)
at org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545)
at org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229)
at org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590)
at org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557)
at org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:280)
at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:241)
at org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:253)
...  5 more
Caused by: java.io.IOException: Failed to deserialize consumer record due to
at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:56)
at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:33)
at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:203)
at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:422)
at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:638)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:973)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:917)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:949)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:92)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:310)
at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110)
at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter$SourceOutputWrapper.collect(KafkaRecordEmitter.java:67)
at org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:84)
at org.apache.flink.connector.kafka.source.reader.deserializer.KafkaValueOnlyDeserializationSchemaWrapper.deserialize(KafkaValueOnlyDeserializationSchemaWrapper.java:51)
at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:53)
...  14 more
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:92)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50)
at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collectAndCheckIfChained(ChainingOutput.java:90)
at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collectAndCheckIfChained(ChainingOutput.java:40)
at org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:83)
at org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:34)
at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:52)
at org.apache.flink.streaming.api.operators.python.process.collector.RunnerOutputCollector.collect(RunnerOutputCollector.java:52)
at org.apache.flink.streaming.api.operators.python.process.AbstractExternalOneInputPythonFunctionOperator.emitResult(AbstractExternalOneInputPythonFunctionOperator.java:133)
at org.apache.flink.streaming.api.operators.python.process.AbstractExternalPythonFunctionOperator.emitResults(AbstractExternalPythonFunctionOperator.java:142)
at org.apache.flink.streaming.api.operators.python.process.AbstractExternalPythonFunctionOperator.invokeFinishBundle(AbstractExternalPythonFunctionOperator.java:101)
at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.checkInvokeFinishBundleByCount(AbstractPythonFunctionOperator.java:292)
at org.apache.flink.streaming.api.operators.python.process.AbstractExternalOneInputPythonFunctionOperator.processElement(AbstractExternalOneInputPythonFunctionOperator.java:146)
at org.apache.flink.streaming.api.operators.python.process.ExternalPythonProcessOperator.processElement(ExternalPythonProcessOperator.java:112)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
... 22 more
Caused by: java.lang.NullPointerException: Cannot invoke "org.apache.flink.types.Row.getFieldNames(boolean)" because "from" is null
at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:137)
at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:69)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:74)
Эти исключения связаны с ошибками конфигурации Flink? Исключением, похоже, не являются несоответствия грамматик кодов. Тогда мне нужно изменить конфигурацию Flink? К вашему сведению, я создал коды Java SDK Flink, имеющие аналогичную архитектуру, и эти коды работают без исключений.

Подробнее здесь: https://stackoverflow.com/questions/790 ... link-types
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «Python»