Код: Выделить всё
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import (
col, when, lit, split, sum as _sum, min as _min, max as _max, coalesce, date_format, udf, current_timestamp, to_date, explode
)
import pyspark.sql.functions as F
from functools import reduce
import datetime
from pyspark.sql.types import DoubleType
def do_the_work (dbt: SparkSession)-> DataFrame:
# Lots of stuff in here that manipulates my data to populate a dataframe
# This takes about 30 minutes
return mydata_df;
# ------------------------------------------------------------------------
# Main
# ------------------------------------------------------------------------
def model(dbt, session: SparkSession) -> DataFrame:
dbt.config(
materialized="table",
file_format="delta",
unique_key=['my_id'],
submission_method="job_cluster",
job_cluster_config={
"autoscale": {
"min_workers": 2,
"max_workers": 4
},
"spark_version": "15.4.x-scala2.12",
"node_type_id": "i3.2xlarge"
}
)
processed_df = do_the_work (dbt)
processed_df = processed_df.withColumn("update_timestamp",
current_timestamp())
return processed_df
В конце процесса он добавляет столбец метки времени, заполняемый значением current_timestamp() — это означает, что записываемая дата/время — это дата/время записи в базу данных, а не время добавления строки в базу данных. dataframe.
Этот код активируется при обычном запуске dbt вместе с загрузкой обычных таблиц на основе sql. Все работает нормально, все заполняется как положено, но откуда-то возникает большая задержка. Например, когда я смотрю на вчерашнее утреннее выполнение, я вижу, что каждая строка в таблице имеет метку времени обновления 2026-02-18T01:55:50.098+00:00. Когда я просматриваю файл журнала, я вижу, что обработка этой таблицы началась в 01:24:32, но завершилась в 02:37:43. Таким образом, эта часть работы заключалась в том, чтобы что-то делать в течение более 40 минут после записи данных в таблицу.
Как мне узнать, что происходит в это мертвое время, и как я могу предотвратить это? Выполнение этого задания уже занимает много времени, но затем оно еще дольше ничего не делает!
Я нашел эту, предположительно исправленную, ошибку, которая может быть связана: https://github.com/dbt-labs/dbt-adapters/issues/1102
Подробнее здесь: https://stackoverflow.com/questions/798 ... ame-in-dbt
Мобильная версия