У меня есть простой конвейер Bronze → Silver → Gold, но я продолжаю получать:
pyspark.errors.Exceptions.connect.AnalysisException:
[TABLE_OR_VIEW_NOT_FOUND] Невозможно найти таблицу или представление `bronze_raw`
даже несмотря на то, что bronze_raw определен с помощью @sdp.table.
Код: Выделить всё
Environment
• Spark: 4.1.0
• PySpark
• Spark Connect (used implicitly by spark-pipelines)
• Local machine (macOS)
• Running with: spark-pipelines run pipeline.yml
Спецификация конвейера: Pipeline.yml
Код: Выделить всё
name: bronze_silver_gold_pipeline
storage:
root: file:///tmp/spark-pipelines/bronze_silver_gold_pipeline
libraries:
- glob:
include: "pipeline_definitions.py"
Код: Выделить всё
from pyspark import pipelines as sdp
import pyspark.sql.functions as F
spark = SparkSession.active()
# Bronze Layer
@sdp.table(name="bronze_raw")
def bronze_raw():
return (
spark.read
.option("header", True)
.csv("file:///Users/abhisheknarayanchaudhury/Desktop/Spark Learning/dirty_data_2.csv")
)
# Silver Layer
@sdp.materialized_view(name="silver_cleaned")
def silver_cleaned():
df_bronze = spark.table("bronze_raw")
df = (
df_bronze
.withColumn("rn", F.monotonically_increasing_id())
.filter(F.col("rn") > 1)
.drop("rn")
)
return df
# Gold Layer
@sdp.materialized_view(name="gold_unpivoted")
def gold_unpivoted():
df_silver = spark.table("silver_cleaned")
return df_silver
Исключение анализа: [TABLE_OR_VIEW_NOT_FOUND]
Невозможно найти таблицу или представление `bronze_raw`.
'UnresolvedRelation [bronze_raw]
Ошибка возникает во время регистрации конвейера.
То, что я уже пробовал.
• Определение brone_raw с помощью @sdp.table
• Обеспечение абсолютных путей к файлам для CSV
• Удаление SparkSession.builder и SparkSession.getActiveSession()
• Использование spark.table("bronze_raw") для нисходящих зависимостей
Несмотря на это, silver_cleaned не может разрешить бронзовый_raw.
Вопрос
- Как правильно ссылаться на вышестоящие таблицы в декларативных конвейерах Spark?
- Есть spark.table("bronze_raw") правильный подход?
- Есть ли что-то особенное в контексте выполнения Spark Connect или SDP, вызывающее эту ошибку?
- Существуют ли дополнительные требования для объявления или материализации восходящих таблиц? Любой минимальный рабочий пример или объяснение модели выполнения будет очень полезен.
Подробнее здесь: https://stackoverflow.com/questions/798 ... am-table-e
Мобильная версия