Потоковая передача вариантов использования в pysparkPython

Программы на Python
Ответить Пред. темаСлед. тема
Гость
 Потоковая передача вариантов использования в pyspark

Сообщение Гость »


Я работаю с Databricks в Azure, мои данные размещаются в ADLS2.
Текущая версия среды выполнения – 10.4 LTS (при необходимости я могу обновить)

У меня есть таблица Pimproduct:
идентификатор имя действие dlk_last_modified pim_timestamp 1 Статья1 А 01.03.2022 28 февраля 2022 22:34:00 [*]id: идентификатор статьи (уникальный). [*]имя: название статьи [*]Действие: действие, выполняемое над строкой (A = добавить, D = удалить). [*]dlk_last_modified: дата вставки в мою таблицу [*]pim_timestamp: дата извлечения из исходной системы
Каждые примерно 15 минут я получаю новый файл, содержащий модификацию, которую мне нужно вставить. Для каждой строки в моем файле я рассматриваю только самые последние идентификаторы pim_timestamp :
[*]Если строка имеет тип action=A и идентификатор не существует, я добавляю строку [*]Если строка имеет тип action=A и идентификатор существует, я заменяю существующую строку с тем же идентификатором новой строкой. [*]Если строка имеет action=D, мне нужно удалить идентификатор из таблицы.
Изначально изменения были ежедневными. Я использовал этот код:

из функций импорта pyspark.sql как F, Window как W df = spark.table("Pimproduct").unionByNames( spark.read.format("avro").load("/путь/к/ежедневным/данным") ) df = df.withColumn( "р-н", F.row_number().over(W.partitionBy("id").orderBy(F.col("pim_timestamp").desc())), ) df = df.where("rn = 1").where("действие 'D'") df.write.saveAsTable("pimproduct", format="delta", mode="overwrite") Но теперь я хочу сделать то же самое в потоковом режиме и не знаю, как это сделать. Я попробовал это:

импортировать временный файл из функций импорта pyspark.sql как F, Window как W df = spark.readSteam.table("Pimproduct").unionByNames( spark.readStream.schema(схема).format("avro").load("/путь/к/ежедневным/данным") ) df = df.withColumn( "р-н", F.row_number().over(W.partitionBy("id").orderBy(F.col("pim_timestamp").desc())), ) df = df.where("rn = 1").where("действие 'D'") с tempfile.TemporaryDirectory() как d: df.writeStream.toTable("Pimproduct", checkpointLocation=d) но у меня возникла ошибка:

Исключение AnalysisException: окна, не привязанные к времени, не поддерживаются при потоковой передаче кадров данных/наборов данных;

Есть идеи, как я могу выполнить этот паровой прием данных? Я открыт для предложений.
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение
  • Потоковая передача вариантов использования в pyspark
    Гость » » в форуме Python
    0 Ответы
    61 Просмотры
    Последнее сообщение Гость
  • Потоковая передача вариантов использования в pyspark
    Гость » » в форуме Python
    0 Ответы
    130 Просмотры
    Последнее сообщение Гость
  • Потоковая потоковая передача видео с камеры FLIR с помощью Python
    Anonymous » » в форуме Python
    0 Ответы
    34 Просмотры
    Последнее сообщение Anonymous
  • Потоковая потоковая передача Polars: Parquet Parquet на основе Shift (-1)
    Anonymous » » в форуме Python
    0 Ответы
    3 Просмотры
    Последнее сообщение Anonymous
  • WooCommerce: получите настраиваемое поле из вариантов продукта и отобразите его как суффикс к ценам вариантов.
    Anonymous » » в форуме Php
    0 Ответы
    91 Просмотры
    Последнее сообщение Anonymous

Вернуться в «Python»