- Используя функциональность dbt, которая позволяет создавать модель Python, я создал модель, которая считывает данные из некоторой таблицы BigQuery, выполняет некоторые вычисления и записывает данные обратно в BigQuery.
- Он использует dataproc (бессерверный режим отправки) для отправки модели как задания PySpark. p>
При запуске модели с таблицейматериализация, все работает как задумано. Однако при попытке использовать инкрементальную материализацию и использование свойства dbt.this для доступа к местоположению текущей модели код ломается.
Вот ошибочный код:
Код: Выделить всё
# Processs new rows only
if dbt.is_incremental:
# only new rows compared to max in current table
max_from_this = f"select max(created_at) from {dbt.this}"
df = df.filter(df.created_at >= session.sql(max_from_this).collect()[0][0])
Код: Выделить всё
df = df.filter(df.created_at >= session.sql(max_from_this).collect()[0][0])
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/session.py", line 1034, in sql
File "/usr/lib/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1321, in __call__
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 196, in deco
pyspark.sql.utils.AnalysisException: spark_catalog requires a single-part namespace, but got [x, y]
- Модели Python в dbt
Подробнее здесь: https://stackoverflow.com/questions/787 ... ntal-model