Потоковая передача вариантов использования в pysparkPython

Программы на Python
Ответить
Гость
 Потоковая передача вариантов использования в pyspark

Сообщение Гость »


Я работаю с Databricks в Azure, мои данные размещаются в ADLS2.
Текущая версия среды выполнения – 10.4 LTS (при необходимости я могу обновить)

У меня есть таблица Pimproduct:
идентификатор имя действие dlk_last_modified pim_timestamp 1 Статья1 А 01.03.2022 28 февраля 2022 22:34:00 [*]id: идентификатор статьи (уникальный). [*]имя: название статьи [*]Действие: действие, выполняемое над строкой (A = добавить, D = удалить). [*]dlk_last_modified: дата вставки в мою таблицу [*]pim_timestamp: дата извлечения из исходной системы
Каждые примерно 15 минут я получаю новый файл, содержащий модификацию, которую мне нужно вставить. Для каждой строки в моем файле я учитываю только самые последние идентификаторы pim_timestamp:
[*]Если строка имеет тип action=A и идентификатор не существует, я добавляю строку [*]Если строка имеет тип action=A и идентификатор существует, я заменяю существующую строку с тем же идентификатором новой строкой. [*]Если строка имеет action=D, мне нужно удалить идентификатор из таблицы.
Изначально изменения были ежедневными. Я использовал этот код:

из функций импорта pyspark.sql как F, Window как W df = spark.table("Pimproduct").unionByNames( spark.read.format("avro").load("/путь/к/ежедневным/данным") ) df = df.withColumn( "р-н", F.row_number().over(W.partitionBy("id").orderBy(F.col("pim_timestamp").desc())), ) df = df.where("rn = 1").where("действие 'D'") df.write.saveAsTable("pimproduct", format="delta", mode="overwrite") Но теперь я хочу сделать то же самое в потоковом режиме и не знаю, как это сделать. Я попробовал это:

импортировать временный файл из функций импорта pyspark.sql как F, Window как W df = spark.readSteam.table("Pimproduct").unionByNames( spark.readStream.schema(схема).format("avro").load("/путь/к/ежедневным/данным") ) df = df.withColumn( "р-н", F.row_number().over(W.partitionBy("id").orderBy(F.col("pim_timestamp").desc())), ) df = df.where("rn = 1").where("действие 'D'") с tempfile.TemporaryDirectory() как d: df.writeStream.toTable("Pimproduct", checkpointLocation=d) но я получил ошибку:

Исключение AnalysisException: окна, не привязанные к времени, не поддерживаются при потоковой передаче кадров данных/наборов данных;

Есть идеи, как можно выполнить прием потоковых данных? Я открыт для предложений.
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «Python»