Потоковая передача вариантов использования в pyspark ⇐ Python
Потоковая передача вариантов использования в pyspark
Я работаю с Databricks в Azure, мои данные размещаются в ADLS2.
Текущая версия среды выполнения – 10.4 LTS (при необходимости я могу обновить)
У меня есть таблица Pimproduct:
идентификатор имя действие dlk_last_modified pim_timestamp 1 Статья1 А 01.03.2022 28 февраля 2022 22:34:00 [*]id: идентификатор статьи (уникальный). [*]имя: название статьи [*]Действие: действие, выполняемое над строкой (A = добавить, D = удалить). [*]dlk_last_modified: дата вставки в мою таблицу [*]pim_timestamp: дата извлечения из исходной системы
Каждые примерно 15 минут я получаю новый файл, содержащий модификацию, которую мне нужно вставить. Для каждой строки в моем файле я рассматриваю только самые последние идентификаторы pim_timestamp :
[*]Если строка имеет тип action=A и идентификатор не существует, я добавляю строку [*]Если строка имеет тип action=A и идентификатор существует, я заменяю существующую строку с тем же идентификатором новой строкой. [*]Если строка имеет action=D, мне нужно удалить идентификатор из таблицы.
Изначально изменения были ежедневными. Я использовал этот код:
из функций импорта pyspark.sql как F, Window как W df = spark.table("Pimproduct").unionByNames( spark.read.format("avro").load("/путь/к/ежедневным/данным") ) df = df.withColumn( "р-н", F.row_number().over(W.partitionBy("id").orderBy(F.col("pim_timestamp").desc())), ) df = df.where("rn = 1").where("действие 'D'") df.write.saveAsTable("pimproduct", format="delta", mode="overwrite") Но теперь я хочу сделать то же самое в потоковом режиме и не знаю, как это сделать. Я попробовал это:
импортировать временный файл из функций импорта pyspark.sql как F, Window как W df = spark.readSteam.table("Pimproduct").unionByNames( spark.readStream.schema(схема).format("avro").load("/путь/к/ежедневным/данным") ) df = df.withColumn( "р-н", F.row_number().over(W.partitionBy("id").orderBy(F.col("pim_timestamp").desc())), ) df = df.where("rn = 1").where("действие 'D'") с tempfile.TemporaryDirectory() как d: df.writeStream.toTable("Pimproduct", checkpointLocation=d) но у меня возникла ошибка:
Исключение AnalysisException: окна, не привязанные к времени, не поддерживаются при потоковой передаче кадров данных/наборов данных;
Есть идеи, как я могу выполнить этот паровой прием данных? Я открыт для предложений.
Я работаю с Databricks в Azure, мои данные размещаются в ADLS2.
Текущая версия среды выполнения – 10.4 LTS (при необходимости я могу обновить)
У меня есть таблица Pimproduct:
идентификатор имя действие dlk_last_modified pim_timestamp 1 Статья1 А 01.03.2022 28 февраля 2022 22:34:00 [*]id: идентификатор статьи (уникальный). [*]имя: название статьи [*]Действие: действие, выполняемое над строкой (A = добавить, D = удалить). [*]dlk_last_modified: дата вставки в мою таблицу [*]pim_timestamp: дата извлечения из исходной системы
Каждые примерно 15 минут я получаю новый файл, содержащий модификацию, которую мне нужно вставить. Для каждой строки в моем файле я рассматриваю только самые последние идентификаторы pim_timestamp :
[*]Если строка имеет тип action=A и идентификатор не существует, я добавляю строку [*]Если строка имеет тип action=A и идентификатор существует, я заменяю существующую строку с тем же идентификатором новой строкой. [*]Если строка имеет action=D, мне нужно удалить идентификатор из таблицы.
Изначально изменения были ежедневными. Я использовал этот код:
из функций импорта pyspark.sql как F, Window как W df = spark.table("Pimproduct").unionByNames( spark.read.format("avro").load("/путь/к/ежедневным/данным") ) df = df.withColumn( "р-н", F.row_number().over(W.partitionBy("id").orderBy(F.col("pim_timestamp").desc())), ) df = df.where("rn = 1").where("действие 'D'") df.write.saveAsTable("pimproduct", format="delta", mode="overwrite") Но теперь я хочу сделать то же самое в потоковом режиме и не знаю, как это сделать. Я попробовал это:
импортировать временный файл из функций импорта pyspark.sql как F, Window как W df = spark.readSteam.table("Pimproduct").unionByNames( spark.readStream.schema(схема).format("avro").load("/путь/к/ежедневным/данным") ) df = df.withColumn( "р-н", F.row_number().over(W.partitionBy("id").orderBy(F.col("pim_timestamp").desc())), ) df = df.where("rn = 1").where("действие 'D'") с tempfile.TemporaryDirectory() как d: df.writeStream.toTable("Pimproduct", checkpointLocation=d) но у меня возникла ошибка:
Исключение AnalysisException: окна, не привязанные к времени, не поддерживаются при потоковой передаче кадров данных/наборов данных;
Есть идеи, как я могу выполнить этот паровой прием данных? Я открыт для предложений.
-
- Похожие темы
- Ответы
- Просмотры
- Последнее сообщение
-
-
Потоковая потоковая передача Polars: Parquet Parquet на основе Shift (-1)
Anonymous » » в форуме Python - 0 Ответы
- 3 Просмотры
-
Последнее сообщение Anonymous
-