Apache Spark установил dataFrame из базы данных Oracle в эластичный поиск ⇐ Python
Apache Spark установил dataFrame из базы данных Oracle в эластичный поиск
При попытке записать или установить dataFrame из базы данных Oracle в elasticsearch с помощью Apache spark я получаю следующее сообщение об ошибке:
Примечание. Я использую Elasticsearch версии 8.8.2, Spark V 3.3.3, python V 3.8, pySpark V 3.3.0 и Scala V 2.12
Код ниже:
def write(self, dataFrame): """начните индексировать новый источник данных в Elasticsearch, выполнив все необходимые шаги. Аргументы: dataFrame {Spark DataFrame} — исходный dataFrame Поднимает: Ошибка – могут возникнуть непредвиденные типы ошибок. """ # создаем новое имя индекса для этого dataFrame self.currentIndex = self.generateIndex() # установите детали исходного документа перед индексированием пытаться: счетчик = dataFrame.count() кроме исключения как ошибки: self.terminate("Не удалось подсчитать количество кадров данных", error=err) self.__updateSource(key=Source.rowNumber, value=count) # информируем приложение об импорте процентов поток = Тема (цель = self.__pubPercent) поток.start() # выводим количество строк строки = self.document.get(Source.rowNumber) # индексируем этот dataFrame в Elasticsearch разделы = 10, если строки < 10**5 еще 100 пытаться: self.setDataFrame(self.currentIndex, dataFrame.repartition(разделы)) кроме исключения как ошибки: распечатать (ошибиться) # дождаться завершения потока поток.join() def setDataFrame(self, index, dataFrame, idField=None): """Создать новый индекс, используя dataFrame Spark не принимает добавление документов dataFrame в существующий индекс Итак, он удалит индекс, если он существует. Аргументы: index {str} -- Имя индекса dataFrame {Spark dateFrame} — индексные документы Ключевые аргументы: idField {str} — Имя идентификационного поля (по умолчанию: нет) Поднимает: Ошибка – могут возникнуть непредвиденные типы ошибок. """ # удалить старый индекс, если он существует esClient.deleteIndex(индекс) # пытаемся установить сопоставление индексов esClient.putMapping(index, dataFrame.dtypes) # записываем фрейм данных в elasticsearch писатель = ( dataFrame.write.format(self.SPARK_SQL_FORMAT) .option(self.RESOURSE_KEY, "{}/{}".format(index, index)) .option(self.NODE_KEY, self.__host) .option(self.PORT_KEY, self.__port) ) # проверяем, есть ли поле идентификации или нет если idField: писатель = писатель.опция(self.MAPPING_ID_KEY, idField) # экспортируем dataFrame в Elastic писатель.сохранить() Исключение:
Произошла ошибка при вызове o68.save. : java.lang.NoClassDefFoundError: скала/класс $продукта в org.elasticsearch.spark.sql.ElasticsearchRelation.(DefaultSource.scala:228) в org.elasticsearch.spark.sql.DefaultSource.createRelation(DefaultSource.scala:105) в org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:47) в org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75) в org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73) в org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84) в org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98) в org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:118) в org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195) в org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103) в org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827) в org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65) в org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98) в org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94) в org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:512) в org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:104) в org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:512) в org.apache.spark.sql.catalyst.plans.ological.LogicalPlan.org$apache$spark$sql$catalyst$plans$ological$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31) в org.apache.spark.sql.catalyst.plans.ological.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267) в org.apache.spark.sql.catalyst.plans.ological.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263) в org.apache.spark.sql.catalyst.plans.ological.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31) в org.apache.spark.sql.catalyst.plans.ological.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31) в org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:488) в org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94) в org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81) в org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79) в org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:133) в org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:856) в org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:387) в org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:360) в org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:247) в java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0 (собственный метод) в java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) в java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) в java.base/java.lang.reflect.Method.invoke(Method.java:568) в py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) в py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374) в py4j.Gateway.invoke(Gateway.java:282) в py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) в py4j.commands.CallCommand.execute(CallCommand.java:79) в py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182) в py4j.ClientServerConnection.run(ClientServerConnection.java:106) в java.base/java.lang.Thread.run(Thread.java:833) Вызвано: java.lang.ClassNotFoundException: scala.Product$class в java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:641) в java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:188) в java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:520) Может ли кто-нибудь сообщить мне, сталкивался ли кто-нибудь с этой проблемой и как ее решили?
При попытке записать или установить dataFrame из базы данных Oracle в elasticsearch с помощью Apache spark я получаю следующее сообщение об ошибке:
Примечание. Я использую Elasticsearch версии 8.8.2, Spark V 3.3.3, python V 3.8, pySpark V 3.3.0 и Scala V 2.12
Код ниже:
def write(self, dataFrame): """начните индексировать новый источник данных в Elasticsearch, выполнив все необходимые шаги. Аргументы: dataFrame {Spark DataFrame} — исходный dataFrame Поднимает: Ошибка – могут возникнуть непредвиденные типы ошибок. """ # создаем новое имя индекса для этого dataFrame self.currentIndex = self.generateIndex() # установите детали исходного документа перед индексированием пытаться: счетчик = dataFrame.count() кроме исключения как ошибки: self.terminate("Не удалось подсчитать количество кадров данных", error=err) self.__updateSource(key=Source.rowNumber, value=count) # информируем приложение об импорте процентов поток = Тема (цель = self.__pubPercent) поток.start() # выводим количество строк строки = self.document.get(Source.rowNumber) # индексируем этот dataFrame в Elasticsearch разделы = 10, если строки < 10**5 еще 100 пытаться: self.setDataFrame(self.currentIndex, dataFrame.repartition(разделы)) кроме исключения как ошибки: распечатать (ошибиться) # дождаться завершения потока поток.join() def setDataFrame(self, index, dataFrame, idField=None): """Создать новый индекс, используя dataFrame Spark не принимает добавление документов dataFrame в существующий индекс Итак, он удалит индекс, если он существует. Аргументы: index {str} -- Имя индекса dataFrame {Spark dateFrame} — индексные документы Ключевые аргументы: idField {str} — Имя идентификационного поля (по умолчанию: нет) Поднимает: Ошибка – могут возникнуть непредвиденные типы ошибок. """ # удалить старый индекс, если он существует esClient.deleteIndex(индекс) # пытаемся установить сопоставление индексов esClient.putMapping(index, dataFrame.dtypes) # записываем фрейм данных в elasticsearch писатель = ( dataFrame.write.format(self.SPARK_SQL_FORMAT) .option(self.RESOURSE_KEY, "{}/{}".format(index, index)) .option(self.NODE_KEY, self.__host) .option(self.PORT_KEY, self.__port) ) # проверяем, есть ли поле идентификации или нет если idField: писатель = писатель.опция(self.MAPPING_ID_KEY, idField) # экспортируем dataFrame в Elastic писатель.сохранить() Исключение:
Произошла ошибка при вызове o68.save. : java.lang.NoClassDefFoundError: скала/класс $продукта в org.elasticsearch.spark.sql.ElasticsearchRelation.(DefaultSource.scala:228) в org.elasticsearch.spark.sql.DefaultSource.createRelation(DefaultSource.scala:105) в org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:47) в org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75) в org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73) в org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84) в org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98) в org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:118) в org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195) в org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103) в org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827) в org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65) в org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98) в org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94) в org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:512) в org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:104) в org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:512) в org.apache.spark.sql.catalyst.plans.ological.LogicalPlan.org$apache$spark$sql$catalyst$plans$ological$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31) в org.apache.spark.sql.catalyst.plans.ological.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267) в org.apache.spark.sql.catalyst.plans.ological.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263) в org.apache.spark.sql.catalyst.plans.ological.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31) в org.apache.spark.sql.catalyst.plans.ological.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31) в org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:488) в org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94) в org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81) в org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79) в org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:133) в org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:856) в org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:387) в org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:360) в org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:247) в java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0 (собственный метод) в java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) в java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) в java.base/java.lang.reflect.Method.invoke(Method.java:568) в py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) в py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374) в py4j.Gateway.invoke(Gateway.java:282) в py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) в py4j.commands.CallCommand.execute(CallCommand.java:79) в py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182) в py4j.ClientServerConnection.run(ClientServerConnection.java:106) в java.base/java.lang.Thread.run(Thread.java:833) Вызвано: java.lang.ClassNotFoundException: scala.Product$class в java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:641) в java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:188) в java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:520) Может ли кто-нибудь сообщить мне, сталкивался ли кто-нибудь с этой проблемой и как ее решили?
-
- Похожие темы
- Ответы
- Просмотры
- Последнее сообщение