Я написал небольшую функцию для чтения данных из файлов CSV и сохранения результатов в отформатированных книгах Excel. Код будет выполняться в записной книжке Azure Databricks, работающей в искровом кластере. Как мне оптимизировать эту функцию?
!pip install xlsxwriter --quiet --disable-pip-version-check
import pandas as pd
import numpy as np
import xlsxwriter
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, round, to_date
from pyspark.sql.types import DecimalType, DateType, TimestampType
import shutil
def check_lists_length(*lists):
return not(all(len(lst) == len(lists[0]) for lst in lists))
def client_file_creator_from_csv(sheet_names, table_names, file_names, csv_list, output_path):
"""
Reads a list of CSV files and returns formatted Excel Workbooks in Azure Databricks
Parameters:
sheet_names (list): A list of strings representing worksheet names of the final workbooks
table_names (list): A list of strings representing table headers of the final workbooks
file_names (list): A list of strings representing file names for the final workbooks
csv_list (list): A list of strings representing the input paths of the CSV files
output_path (string) : A string representing the output path for the Excel workbooks
"""
if check_lists_length(sheet_names, table_names, file_names, csv_list):
raise Exception("The number of CSV files and file names and sheet names and table names must be the same")
#Convert CSV file paths to dbfs format for use with dbutils and pyspark
csv_list = ["dbfs:" + str.split(x,"dbfs")[1] for x in csv_list]
#Convert output file paths to dbfs format for use with dbutils and pyspark
path_api = "dbfs:" + str.split(output_path,"dbfs")[1]
#Create output path in case it doesn't exist
dbutils.fs.mkdirs(path_api)
#Create temporary path in dbfs to store ExcelWriter objects
temp_path = "/tmp/" + str.split(dbutils.notebook.entry_point.getDbutils().notebook().getContext().userName().get(), sep = '@')[0]
#Create lists to store pandas dataframes and ExcelWriter objects
dfs = list()
writers = list()
for i,csv in enumerate(csv_list):
df = spark.read.option("header", True).option("nullValue", "null").option("delimiter", ",").option("quote", "\"").option("escape", "\"").option("multiLine", True).option("inferSchema", True).option("enforceSchema", True).option("mode", "DROPMALFORMED").csv(csv)
if df.count() == 0:
raise Exception('File does not have any rows')
dcml_cols = [f.name for f in df.schema.fields if isinstance(f.dataType, DecimalType)]
time_cols = [f.name for f in df.schema.fields if isinstance(f.dataType, TimestampType)]
for col_name in dcml_cols:
df = df.withColumn(col_name, round(col(col_name).cast('double'),2))
for col_name in time_cols:
df = df.withColumn(col_name, to_date(col(col_name)))
dfs.append(df.select("*").toPandas())
writers.append(pd.ExcelWriter(f"{temp_path}_{i}.xlsx", engine="xlsxwriter"))
for df, writer, sheet_name, table_name, file_name in zip(dfs,writers, sheet_names, table_names, file_names):
#Since Excel has a row limit of 1,048,576 rows per worksheet, larger dataframes must be split across multiple worksheets
split_df = np.array_split(df,(df.shape[0]//1000000)+1)
workbook = writer.book
#Add standard formatting as required
top_border = workbook.add_format({'top': 2})
bottom_border = workbook.add_format({'bottom': 2})
left_border = workbook.add_format({'left': 2})
right_border = workbook.add_format({'right': 2})
top_left_border = workbook.add_format({'top': 2, 'left': 2})
top_right_border = workbook.add_format({'top': 2, 'right': 2})
bottom_left_border = workbook.add_format({'bottom': 2, 'left': 2})
bottom_right_border = workbook.add_format({'bottom': 2, 'right': 2})
table_title_format = workbook.add_format({'bold': True, 'font_color': 'white', 'bg_color': '#305496', 'align': 'left', 'valign': 'vcenter'})
title_format = workbook.add_format({'bold': True, 'font_color': 'white', 'bg_color': 'black', 'align': 'left', 'valign': 'vcenter'})
for j, df_part in enumerate(split_df):
df_part.to_excel(writer, sheet_name = sheet_name + (("_"+ str(j)) if j > 0 else ""), index = False, header = False, startrow = 4, startcol = 1)
worksheet = writer.sheets[sheet_name + (("_"+ str(j)) if j > 0 else "")]
for col_num, value in enumerate(df_part.columns.values):
worksheet.write(3,col_num+1,value,table_title_format)
num_rows = df_part.shape[0]
num_cols = df_part.shape[1]
worksheet.conditional_format(3,2,3,num_cols-1,{"type": "no_errors", "format": top_border})
worksheet.conditional_format(4,1,num_rows+2,1,{"type": "no_errors", "format": left_border})
worksheet.conditional_format(4,num_cols,num_rows+2,num_cols,{"type": "no_errors", "format": right_border})
worksheet.conditional_format(num_rows+3,2,num_rows+3,num_cols-1,{"type": "no_errors", "format": bottom_border})
worksheet.conditional_format(3,1,3,1,{"type": "no_errors", "format": top_left_border})
worksheet.conditional_format(3,num_cols,3,num_cols,{"type": "no_errors", "format": top_right_border})
worksheet.conditional_format(num_rows+3,1,num_rows+3,1,{"type": "no_errors", "format": bottom_left_border})
worksheet.conditional_format(num_rows+3,num_cols,num_rows+3,num_cols,{"type": "no_errors", "format": bottom_right_border})
worksheet.hide_gridlines(2)
worksheet.autofilter(3,1,num_rows+3,num_cols)
worksheet.autofit()
worksheet.set_column_pixels(0, 0, 20)
worksheet.merge_range(1,1,1,num_cols, table_name + (("_"+ str(j)) if j > 0 else ""), title_format)
workbook.close()
shutil.copy(writer,output_path + file_name + ".xlsx")
print(f"File {file_name}.xlsx created successfully")
Я думал о нескольких возможностях оптимизации, но они не очень сработали:
Выполнение всей обработки данных использование pyspark вместо pandas:
Это было невозможно из-за необходимости форматировать книгу Excel
с помощью xlsxwriter.
Использование альтернативной библиотеки для xlsxwriter: я не мог Я не имитирую точную
функциональность, которую я имею в этой функции.
Параллельная обработка фреймов данных и отказ от их хранения в
списке: мой код стал труднее для понимания, и я не был уверен, как
правильно использовать искровой кластер
Если у кого-то есть идеи или предложения о том, как можно улучшить эту функцию, я бы с радостью приветствую их. В настоящее время узел драйвера перегружен, и наблюдается резкий скачок использования оперативной памяти из-за pandas и xlsxwriter.
Я написал небольшую функцию для чтения данных из файлов CSV и сохранения результатов в отформатированных книгах Excel. Код будет выполняться в записной книжке Azure Databricks, работающей в искровом кластере. Как мне оптимизировать эту функцию? [code] !pip install xlsxwriter --quiet --disable-pip-version-check
import pandas as pd
import numpy as np
import xlsxwriter
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, round, to_date
from pyspark.sql.types import DecimalType, DateType, TimestampType
import shutil
def check_lists_length(*lists):
return not(all(len(lst) == len(lists[0]) for lst in lists))
print(f"File {file_name}.xlsx created successfully")
[/code] Я думал о нескольких возможностях оптимизации, но они не очень сработали: [list] [*]Выполнение всей обработки данных использование pyspark вместо pandas: Это было невозможно из-за необходимости форматировать книгу Excel с помощью xlsxwriter. [*]Использование альтернативной библиотеки для xlsxwriter: я не мог Я не имитирую точную функциональность, которую я имею в этой функции. [*]Параллельная обработка фреймов данных и отказ от их хранения в списке: мой код стал труднее для понимания, и я не был уверен, как правильно использовать искровой кластер [/list] Если у кого-то есть идеи или предложения о том, как можно улучшить эту функцию, я бы с радостью приветствую их. В настоящее время узел драйвера перегружен, и наблюдается резкий скачок использования оперативной памяти из-за pandas и xlsxwriter.
Я изучаю Spark, поэтому в качестве задачи нам нужно было создать колесо локально, а затем установить его в Databricks (я использую Azure Databricks) и протестировать его, запустив из блокнота Databrick. Эта программа предполагает чтение файла CSV...
Я запустил приведенный ниже код для экспорта модели ML в mlflow на основе Azure Databricks , но, похоже, получаю эту ошибку: Хост или токен MLflow настроены неправильно .
Я не могу понять, в чем проблема. URL-адрес рабочей области и токен PAT...
Я выполнил приведенный ниже код для экспорта модели машинного обучения в mlflow на основе Azure Databricks , но, похоже, получаю эту ошибку
MLflow host or token is not configured correctly
Я не могу понять, в чем проблема. URL-адрес рабочей области...
Я выполнил приведенный ниже код для экспорта модели машинного обучения в mlflow на основе Azure Databricks , но, похоже, получаю эту ошибку
MLflow host or token is not configured correctly
Я не могу понять, в чем проблема. URL-адрес рабочей области...