Декларативные конвейеры Spark (SDP) — TABLE_OR_VIEW_NOT_FOUND для восходящей таблицы, даже если она определена.Python

Программы на Python
Ответить
Anonymous
 Декларативные конвейеры Spark (SDP) — TABLE_OR_VIEW_NOT_FOUND для восходящей таблицы, даже если она определена.

Сообщение Anonymous »

Я пытаюсь изучить декларативные конвейеры Spark (Spark 4.0 / pyspark.pipelines) локально с помощью интерфейса командной строки Spark-Pipelines.
У меня есть простой конвейер Bronze → Silver → Gold, но я продолжаю получать:

pyspark.errors.Exceptions.connect.AnalysisException:
[TABLE_OR_VIEW_NOT_FOUND] Невозможно найти таблицу или представление `bronze_raw`

даже несмотря на то, что bronze_raw определен с помощью @sdp.table.

Код: Выделить всё

Environment
•   Spark: 4.1.0
•   PySpark
•   Spark Connect (used implicitly by spark-pipelines)
•   Local machine (macOS)
•   Running with: spark-pipelines run pipeline.yml
Ниже приведены два файла в моей рабочей папке без подпапок.
Спецификация конвейера: Pipeline.yml

Код: Выделить всё

name: bronze_silver_gold_pipeline

storage:
root: file:///tmp/spark-pipelines/bronze_silver_gold_pipeline

libraries:
- glob:
include: "pipeline_definitions.py"
Определения конвейера: (pipeline_defiitions.py)

Код: Выделить всё

from pyspark import pipelines as sdp
import pyspark.sql.functions as F
spark = SparkSession.active()
# Bronze Layer
@sdp.table(name="bronze_raw")
def bronze_raw():
return (
spark.read
.option("header", True)
.csv("file:///Users/abhisheknarayanchaudhury/Desktop/Spark Learning/dirty_data_2.csv")
)

# Silver Layer
@sdp.materialized_view(name="silver_cleaned")
def silver_cleaned():
df_bronze = spark.table("bronze_raw")

df = (
df_bronze
.withColumn("rn", F.monotonically_increasing_id())
.filter(F.col("rn") > 1)
.drop("rn")
)

return df

# Gold Layer
@sdp.materialized_view(name="gold_unpivoted")
def gold_unpivoted():
df_silver = spark.table("silver_cleaned")
return df_silver
Сообщение об ошибке:

Исключение анализа: [TABLE_OR_VIEW_NOT_FOUND]
Невозможно найти таблицу или представление `bronze_raw`.
'UnresolvedRelation [bronze_raw]

Ошибка возникает во время регистрации конвейера.
То, что я уже пробовал.
• Определение brone_raw с помощью @sdp.table
• Обеспечение абсолютных путей к файлам для CSV
• Удаление SparkSession.builder и SparkSession.getActiveSession()
• Использование spark.table("bronze_raw") для нисходящих зависимостей
Несмотря на это, silver_cleaned не может разрешить бронзовый_raw.
Вопрос
  • Как правильно ссылаться на вышестоящие таблицы в декларативных конвейерах Spark?
  • Есть spark.table("bronze_raw") правильный подход?
  • Есть ли что-то особенное в контексте выполнения Spark Connect или SDP, вызывающее эту ошибку?
  • Существуют ли дополнительные требования для объявления или материализации восходящих таблиц? Любой минимальный рабочий пример или объяснение модели выполнения будет очень полезен.
Ожидаемое поведение: искровые конвейеры должны распознавать bronze_raw как вышестоящую зависимость и разрешать silver_cleaned ссылаться на нее.

Подробнее здесь: https://stackoverflow.com/questions/798 ... am-table-e
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «Python»