Преобразование кадра потоковых данных в формат JSON в Python ⇐ Python
Преобразование кадра потоковых данных в формат JSON в Python
из функций импорта pyspark.sql как F импортировать бото3 импортировать систему ## установить транслируемые переменные table_name = "dev.emp.master_events" df = ( spark.readStream.format("дельта") .option("readChangeFeed", "истина") .option("начальная версия", 2) .table(имя_таблицы) ) элементы = df.select('*') запрос = (items.writeStream.outputMode("append").foreachBatch(лямбда-элементы, epoch_id: items.write.json()).start()) Ошибка перехода ниже
py4j.Py4JException: прокси-сервер Python вызвал исключение. Возвратное сообщение: Traceback (последний вызов — последний): Полное сообщение об ошибке Файл «/databricks/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py», строка 617, в _call_proxy return_value = getattr(self.pool[obj_id], метод)(*params) Файл «/databricks/spark/python/pyspark/sql/utils.py», строка 117, в вызове поднять е Файл "/databricks/spark/python/pyspark/sql/utils.py", строка 114, в вызове self.func(DataFrame(jdf, обернутая_сессия_jdf), пакет_id) Файл «», строка 1, в запрос = (items.writeStream.outputMode("append").foreachBatch(лямбда-элементы, epoch_id: items.write.json()).start()) Файл «/databricks/spark/python/pyspark/instrumentation_utils.py», строка 48, в оболочке. res = func(*args, **kwargs) Ошибка типа: в DataFrameWriter.json() отсутствует 1 обязательный позиционный аргумент: «путь» в py4j.Protocol.getReturnValue(Protocol.java:476) в py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:108) по адресу com.sun.proxy.$Proxy161.call (источник неизвестен) в org.apache.spark.sql.execution.streaming.sources.PythonForeachBatchHelper$.$anonfun$callForeachBatch$1(ForeachBatchSink.scala:486) в org.apache.spark.sql.execution.streaming.sources.PythonForeachBatchHelper$.$anonfun$callForeachBatch$1$adapted(ForeachBatchSink.scala:486) в org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.$anonfun$addBatchLegacy$1(ForeachBatchSink.scala:139) в scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) в org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.runWithAQE(ForeachBatchSink.scala:166) в org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatchLegacy(ForeachBatchSink.scala:139) в org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.$anonfun$addBatch$2(ForeachBatchSink.scala:101) в scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) в org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:662) в com.databricks.spark.util.UsageLogging$.withAttributionTags(UsageLogger.scala:607) в com.databricks.spark.util.UsageLogging$.withAttributionTags(UsageLogger.scala:616) в com.databricks.spark.util.UsageLogging.withAttributionTags(UsageLogger.scala:495) в com.databricks.spark.util.UsageLogging.withAttributionTags$(UsageLogger.scala:493) в org.apache.spark.sql.execution.streaming.StreamExecution.withAttributionTags(StreamExecution.scala:82) в org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:354) в org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.$anonfun$run$2(StreamExecution.scala:276) в scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) в com.databricks.unity.UCSEphemeralState$Handle.runWith(UCSEphemeralState.scala:41) в com.databricks.unity.HandleImpl.runWith(UCSHandle.scala:99) в com.databricks.unity.HandleImpl.$anonfun$runWithAndClose$1(UCSHandle.scala:104) в scala.util.Using$.resource(Using.scala:269) в com.databricks.unity.HandleImpl.runWithAndClose(UCSHandle.scala:103) в org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:276)
из функций импорта pyspark.sql как F импортировать бото3 импортировать систему ## установить транслируемые переменные table_name = "dev.emp.master_events" df = ( spark.readStream.format("дельта") .option("readChangeFeed", "истина") .option("начальная версия", 2) .table(имя_таблицы) ) элементы = df.select('*') запрос = (items.writeStream.outputMode("append").foreachBatch(лямбда-элементы, epoch_id: items.write.json()).start()) Ошибка перехода ниже
py4j.Py4JException: прокси-сервер Python вызвал исключение. Возвратное сообщение: Traceback (последний вызов — последний): Полное сообщение об ошибке Файл «/databricks/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py», строка 617, в _call_proxy return_value = getattr(self.pool[obj_id], метод)(*params) Файл «/databricks/spark/python/pyspark/sql/utils.py», строка 117, в вызове поднять е Файл "/databricks/spark/python/pyspark/sql/utils.py", строка 114, в вызове self.func(DataFrame(jdf, обернутая_сессия_jdf), пакет_id) Файл «», строка 1, в запрос = (items.writeStream.outputMode("append").foreachBatch(лямбда-элементы, epoch_id: items.write.json()).start()) Файл «/databricks/spark/python/pyspark/instrumentation_utils.py», строка 48, в оболочке. res = func(*args, **kwargs) Ошибка типа: в DataFrameWriter.json() отсутствует 1 обязательный позиционный аргумент: «путь» в py4j.Protocol.getReturnValue(Protocol.java:476) в py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:108) по адресу com.sun.proxy.$Proxy161.call (источник неизвестен) в org.apache.spark.sql.execution.streaming.sources.PythonForeachBatchHelper$.$anonfun$callForeachBatch$1(ForeachBatchSink.scala:486) в org.apache.spark.sql.execution.streaming.sources.PythonForeachBatchHelper$.$anonfun$callForeachBatch$1$adapted(ForeachBatchSink.scala:486) в org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.$anonfun$addBatchLegacy$1(ForeachBatchSink.scala:139) в scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) в org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.runWithAQE(ForeachBatchSink.scala:166) в org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatchLegacy(ForeachBatchSink.scala:139) в org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.$anonfun$addBatch$2(ForeachBatchSink.scala:101) в scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) в org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:662) в com.databricks.spark.util.UsageLogging$.withAttributionTags(UsageLogger.scala:607) в com.databricks.spark.util.UsageLogging$.withAttributionTags(UsageLogger.scala:616) в com.databricks.spark.util.UsageLogging.withAttributionTags(UsageLogger.scala:495) в com.databricks.spark.util.UsageLogging.withAttributionTags$(UsageLogger.scala:493) в org.apache.spark.sql.execution.streaming.StreamExecution.withAttributionTags(StreamExecution.scala:82) в org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:354) в org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.$anonfun$run$2(StreamExecution.scala:276) в scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) в com.databricks.unity.UCSEphemeralState$Handle.runWith(UCSEphemeralState.scala:41) в com.databricks.unity.HandleImpl.runWith(UCSHandle.scala:99) в com.databricks.unity.HandleImpl.$anonfun$runWithAndClose$1(UCSHandle.scala:104) в scala.util.Using$.resource(Using.scala:269) в com.databricks.unity.HandleImpl.runWithAndClose(UCSHandle.scala:103) в org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:276)
-
- Похожие темы
- Ответы
- Просмотры
- Последнее сообщение
-
-
Как я могу обрабатывать события для получения потоковых данных и файловых данных в WebSockets?
Anonymous » » в форуме C# - 0 Ответы
- 14 Просмотры
-
Последнее сообщение Anonymous
-