Для выполнения нашего кода мы используем блоки данных. Я пытаюсь вести журналы, которые хранятся в таблице. Помимо прочего, мне также нужен идентификатор выполнения задания и имя задания/задачи, чтобы я мог вернуться и проверить задание на основе журналов и наоборот. Предлагают ли блоки данных эту информацию при выполнении задания?
Я нашел пример, в котором эта информация извлекается внутри блокнота, но я могу найти что угодно для блоков данных.
Пример для блокнотов:
# Databricks notebook source
from pyspark.sql.types import (
IntegerType,
StructField,
StructType,
TimestampType,
StringType
)
from pyspark.dbutils import DBUtils
import json
from delta import DeltaTable
import datetime as dt
from datetime import datetime
dbutils.widgets.dropdown("env", "dev", ["dev", "prod"])
env = dbutils.widgets.get("env")
print('env: ',env)
def save_jobs_log(log_data, job_log_dir):
job_schema = StructType(
[
StructField("job_log_id", StringType()),
StructField("run_id", StringType()),
StructField("job_name", StringType()),
StructField("notebookId", StringType()),
StructField("user", StringType()),
StructField("clusterId", StringType()),
StructField("jobParametersCount", StringType()),
StructField("startTimestamp", StringType()),
StructField("taskKey", StringType()),
StructField("operation", StringType()),
StructField("target_table", StringType()),
StructField("updated_rows", IntegerType()),
StructField("processed_ts", TimestampType()),
]
)
if not DeltaTable.isDeltaTable(spark, job_log_dir):
df = spark.createDataFrame([], schema=job_schema)
df.write.format("delta").option("overwriteSchema", "True").mode("append").save(job_log_dir)
df = spark.createDataFrame(log_data, schema=job_schema)
df.write.format("delta").mode("append").save(job_log_dir)
def store_job_logs(df, operation, target_table, job_log_dir):
dbutils = DBUtils(spark)
run_params = (
dbutils.notebook.entry_point.getDbutils().notebook().getContext().toJson()
)
run_params_json = json.loads(run_params)
print("run_params_json", json.dumps(run_params_json,indent=4))
log_data = [
{
"job_log_id": run_params_json["tags"]["jobId"],
"run_id": run_params_json["currentRunId"]["id"],
"job_name": run_params_json["tags"]["jobName"],
"notebookId": run_params_json["tags"]["notebookId"],
"user": run_params_json["tags"]["user"],
"clusterId": run_params_json["tags"]["clusterId"],
"jobParametersCount": run_params_json["tags"]["jobParametersCount"],
"startTimestamp": datetime.utcfromtimestamp(int(run_params_json["tags"]["startTimestamp"])/1000).strftime('%Y-%m-%d %H:%M:%S'),
"taskKey": run_params_json["tags"]["taskKey"],
"operation": operation,
"target_table": target_table,
"updated_rows": df.count(),
"processed_ts": dt.datetime.now(),
}
]
save_jobs_log(log_data, job_log_dir)
def do_stuff(df, target_table):
"""Place your logic"""
df.write.mode("overwrite").saveAsTable(target_table)
if __name__ == "__main__":
person = [
(1, "John", 10),
(2, "Alex", 20),
(3, "Nikol", 30),
]
rdd = sc.parallelize(person)
columns = ["ID", "NAME", "AGE"]
df = rdd.toDF(columns)
target_table = "person"
do_stuff(df, target_table)
operation = "person_save_details"
job_log_dir = "/mnt/demo/job_logs/"
store_job_logs(df, operation, target_table, job_log_dir)
Подробнее здесь: https://stackoverflow.com/questions/798 ... ks-job-run
Получение метаданных задания, таких как идентификатор и имя запуска задания, при запуске задания блоков данных. ⇐ Python
Программы на Python
1772024385
Anonymous
Для выполнения нашего кода мы используем блоки данных. Я пытаюсь вести журналы, которые хранятся в таблице. Помимо прочего, мне также нужен идентификатор выполнения задания и имя задания/задачи, чтобы я мог вернуться и проверить задание на основе журналов и наоборот. Предлагают ли блоки данных эту информацию при выполнении задания?
Я нашел пример, в котором эта информация извлекается внутри блокнота, но я могу найти что угодно для блоков данных.
Пример для блокнотов:
# Databricks notebook source
from pyspark.sql.types import (
IntegerType,
StructField,
StructType,
TimestampType,
StringType
)
from pyspark.dbutils import DBUtils
import json
from delta import DeltaTable
import datetime as dt
from datetime import datetime
dbutils.widgets.dropdown("env", "dev", ["dev", "prod"])
env = dbutils.widgets.get("env")
print('env: ',env)
def save_jobs_log(log_data, job_log_dir):
job_schema = StructType(
[
StructField("job_log_id", StringType()),
StructField("run_id", StringType()),
StructField("job_name", StringType()),
StructField("notebookId", StringType()),
StructField("user", StringType()),
StructField("clusterId", StringType()),
StructField("jobParametersCount", StringType()),
StructField("startTimestamp", StringType()),
StructField("taskKey", StringType()),
StructField("operation", StringType()),
StructField("target_table", StringType()),
StructField("updated_rows", IntegerType()),
StructField("processed_ts", TimestampType()),
]
)
if not DeltaTable.isDeltaTable(spark, job_log_dir):
df = spark.createDataFrame([], schema=job_schema)
df.write.format("delta").option("overwriteSchema", "True").mode("append").save(job_log_dir)
df = spark.createDataFrame(log_data, schema=job_schema)
df.write.format("delta").mode("append").save(job_log_dir)
def store_job_logs(df, operation, target_table, job_log_dir):
dbutils = DBUtils(spark)
run_params = (
dbutils.notebook.entry_point.getDbutils().notebook().getContext().toJson()
)
run_params_json = json.loads(run_params)
print("run_params_json", json.dumps(run_params_json,indent=4))
log_data = [
{
"job_log_id": run_params_json["tags"]["jobId"],
"run_id": run_params_json["currentRunId"]["id"],
"job_name": run_params_json["tags"]["jobName"],
"notebookId": run_params_json["tags"]["notebookId"],
"user": run_params_json["tags"]["user"],
"clusterId": run_params_json["tags"]["clusterId"],
"jobParametersCount": run_params_json["tags"]["jobParametersCount"],
"startTimestamp": datetime.utcfromtimestamp(int(run_params_json["tags"]["startTimestamp"])/1000).strftime('%Y-%m-%d %H:%M:%S'),
"taskKey": run_params_json["tags"]["taskKey"],
"operation": operation,
"target_table": target_table,
"updated_rows": df.count(),
"processed_ts": dt.datetime.now(),
}
]
save_jobs_log(log_data, job_log_dir)
def do_stuff(df, target_table):
"""Place your logic"""
df.write.mode("overwrite").saveAsTable(target_table)
if __name__ == "__main__":
person = [
(1, "John", 10),
(2, "Alex", 20),
(3, "Nikol", 30),
]
rdd = sc.parallelize(person)
columns = ["ID", "NAME", "AGE"]
df = rdd.toDF(columns)
target_table = "person"
do_stuff(df, target_table)
operation = "person_save_details"
job_log_dir = "/mnt/demo/job_logs/"
store_job_logs(df, operation, target_table, job_log_dir)
Подробнее здесь: [url]https://stackoverflow.com/questions/79896172/retrieve-job-metadata-like-job-run-id-and-name-in-a-databricks-job-run[/url]
Ответить
1 сообщение
• Страница 1 из 1
Перейти
- Кемерово-IT
- ↳ Javascript
- ↳ C#
- ↳ JAVA
- ↳ Elasticsearch aggregation
- ↳ Python
- ↳ Php
- ↳ Android
- ↳ Html
- ↳ Jquery
- ↳ C++
- ↳ IOS
- ↳ CSS
- ↳ Excel
- ↳ Linux
- ↳ Apache
- ↳ MySql
- Детский мир
- Для души
- ↳ Музыкальные инструменты даром
- ↳ Печатная продукция даром
- Внешняя красота и здоровье
- ↳ Одежда и обувь для взрослых даром
- ↳ Товары для здоровья
- ↳ Физкультура и спорт
- Техника - даром!
- ↳ Автомобилистам
- ↳ Компьютерная техника
- ↳ Плиты: газовые и электрические
- ↳ Холодильники
- ↳ Стиральные машины
- ↳ Телевизоры
- ↳ Телефоны, смартфоны, плашеты
- ↳ Швейные машинки
- ↳ Прочая электроника и техника
- ↳ Фототехника
- Ремонт и интерьер
- ↳ Стройматериалы, инструмент
- ↳ Мебель и предметы интерьера даром
- ↳ Cантехника
- Другие темы
- ↳ Разное даром
- ↳ Давай меняться!
- ↳ Отдам\возьму за копеечку
- ↳ Работа и подработка в Кемерове
- ↳ Давай с тобой поговорим...
Мобильная версия