Преобразование кадра потоковых данных в формат JSON в PythonPython

Программы на Python
Ответить Пред. темаСлед. тема
Гость
 Преобразование кадра потоковых данных в формат JSON в Python

Сообщение Гость »


из функций импорта pyspark.sql как F импортировать бото3 импортировать систему ## установить транслируемые переменные table_name = "dev.emp.master_events" df = ( spark.readStream.format("дельта") .option("readChangeFeed", "истина") .option("начальная версия", 2) .table(имя_таблицы) ) элементы = df.select('*') запрос = (items.writeStream.outputMode("append").foreachBatch(лямбда-элементы, epoch_id: items.write.json()).start()) Ошибка перехода ниже
py4j.Py4JException: прокси-сервер Python вызвал исключение. Возвратное сообщение: Traceback (последний вызов — последний): Полное сообщение об ошибке Файл «/databricks/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py», строка 617, в _call_proxy return_value = getattr(self.pool[obj_id], метод)(*params) Файл «/databricks/spark/python/pyspark/sql/utils.py», строка 117, в вызове поднять е Файл "/databricks/spark/python/pyspark/sql/utils.py", строка 114, в вызове self.func(DataFrame(jdf, обернутая_сессия_jdf), пакет_id) Файл «», строка 1, в запрос = (items.writeStream.outputMode("append").foreachBatch(лямбда-элементы, epoch_id: items.write.json()).start()) Файл «/databricks/spark/python/pyspark/instrumentation_utils.py», строка 48, в оболочке. res = func(*args, **kwargs) Ошибка типа: в DataFrameWriter.json() отсутствует 1 обязательный позиционный аргумент: «путь» в py4j.Protocol.getReturnValue(Protocol.java:476) в py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:108) по адресу com.sun.proxy.$Proxy161.call (источник неизвестен) в org.apache.spark.sql.execution.streaming.sources.PythonForeachBatchHelper$.$anonfun$callForeachBatch$1(ForeachBatchSink.scala:486) в org.apache.spark.sql.execution.streaming.sources.PythonForeachBatchHelper$.$anonfun$callForeachBatch$1$adapted(ForeachBatchSink.scala:486) в org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.$anonfun$addBatchLegacy$1(ForeachBatchSink.scala:139) в scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) в org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.runWithAQE(ForeachBatchSink.scala:166) в org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatchLegacy(ForeachBatchSink.scala:139) в org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.$anonfun$addBatch$2(ForeachBatchSink.scala:101) в scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) в org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:662) в com.databricks.spark.util.UsageLogging$.withAttributionTags(UsageLogger.scala:607) в com.databricks.spark.util.UsageLogging$.withAttributionTags(UsageLogger.scala:616) в com.databricks.spark.util.UsageLogging.withAttributionTags(UsageLogger.scala:495) в com.databricks.spark.util.UsageLogging.withAttributionTags$(UsageLogger.scala:493) в org.apache.spark.sql.execution.streaming.StreamExecution.withAttributionTags(StreamExecution.scala:82) в org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:354) в org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.$anonfun$run$2(StreamExecution.scala:276) в scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) в com.databricks.unity.UCSEphemeralState$Handle.runWith(UCSEphemeralState.scala:41) в com.databricks.unity.HandleImpl.runWith(UCSHandle.scala:99) в com.databricks.unity.HandleImpl.$anonfun$runWithAndClose$1(UCSHandle.scala:104) в scala.util.Using$.resource(Using.scala:269) в com.databricks.unity.HandleImpl.runWithAndClose(UCSHandle.scala:103) в org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:276)
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение
  • Преобразование кадра потоковых данных в формат JSON в Python
    Anonymous » » в форуме Python
    0 Ответы
    14 Просмотры
    Последнее сообщение Anonymous
  • Преобразование кадра потоковых данных в формат JSON в Python
    Anonymous » » в форуме Python
    0 Ответы
    26 Просмотры
    Последнее сообщение Anonymous
  • Преобразование кадра потоковых данных в формат JSON в Python
    Гость » » в форуме Python
    0 Ответы
    12 Просмотры
    Последнее сообщение Гость
  • Как я могу обрабатывать события для получения потоковых данных и файловых данных в WebSockets?
    Anonymous » » в форуме C#
    0 Ответы
    14 Просмотры
    Последнее сообщение Anonymous
  • Преобразование MT в формат MX (JSON в формат XML)
    Anonymous » » в форуме JAVA
    0 Ответы
    7 Просмотры
    Последнее сообщение Anonymous

Вернуться в «Python»