Оптимизированная функция для чтения и форматирования файлов Excel в Databricks.Python

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 Оптимизированная функция для чтения и форматирования файлов Excel в Databricks.

Сообщение Anonymous »

Я написал небольшую функцию для чтения данных из файлов CSV и сохранения результатов в отформатированных книгах Excel. Код будет выполняться в записной книжке Azure Databricks, работающей в искровом кластере. Как мне оптимизировать эту функцию?

Код: Выделить всё

!pip install xlsxwriter --quiet --disable-pip-version-check

import pandas as pd

import numpy as np

import xlsxwriter

from pyspark.sql import SparkSession

from pyspark.sql.functions import col, round, to_date

from pyspark.sql.types import DecimalType, DateType, TimestampType

import shutil

def check_lists_length(*lists):

return not(all(len(lst) == len(lists[0]) for lst in lists))

def client_file_creator_from_csv(sheet_names, table_names, file_names, csv_list, output_path):

"""

Reads a list of CSV files and returns formatted Excel Workbooks in Azure Databricks

Parameters:

sheet_names (list): A list of strings representing worksheet names of the final workbooks

table_names (list): A list of strings representing table headers of the final workbooks

file_names (list): A list of strings representing file names for the final workbooks

csv_list (list): A list of strings representing the input paths of the CSV files

output_path (string) : A string representing the output path for the Excel workbooks

"""

if check_lists_length(sheet_names, table_names, file_names, csv_list):

raise Exception("The number of CSV files and file names and sheet names and table names must be the same")

#Convert CSV file paths to dbfs format for use with dbutils and pyspark

csv_list = ["dbfs:" + str.split(x,"dbfs")[1] for x in csv_list]

#Convert output file paths to dbfs format for use with dbutils and pyspark

path_api = "dbfs:" + str.split(output_path,"dbfs")[1]

#Create output path in case it doesn't exist

dbutils.fs.mkdirs(path_api)

#Create temporary path in dbfs to store ExcelWriter objects

temp_path = "/tmp/"  + str.split(dbutils.notebook.entry_point.getDbutils().notebook().getContext().userName().get(), sep = '@')[0]

#Create lists to store pandas dataframes and ExcelWriter objects

dfs = list()

writers = list()

for i,csv in enumerate(csv_list):

df = spark.read.option("header", True).option("nullValue", "null").option("delimiter", ",").option("quote", "\"").option("escape", "\"").option("multiLine", True).option("inferSchema", True).option("enforceSchema", True).option("mode", "DROPMALFORMED").csv(csv)

if df.count() == 0:

raise Exception('File does not have any rows')

dcml_cols = [f.name for f in df.schema.fields if isinstance(f.dataType, DecimalType)]

time_cols = [f.name for f in df.schema.fields if isinstance(f.dataType, TimestampType)]

for col_name in dcml_cols:

df = df.withColumn(col_name, round(col(col_name).cast('double'),2))

for col_name in time_cols:

df = df.withColumn(col_name, to_date(col(col_name)))

dfs.append(df.select("*").toPandas())

writers.append(pd.ExcelWriter(f"{temp_path}_{i}.xlsx", engine="xlsxwriter"))

for df, writer, sheet_name, table_name, file_name in zip(dfs,writers, sheet_names, table_names, file_names):

#Since Excel has a row limit of 1,048,576 rows per worksheet, larger dataframes must be split across multiple worksheets

split_df = np.array_split(df,(df.shape[0]//1000000)+1)

workbook = writer.book

#Add standard formatting as required

top_border = workbook.add_format({'top': 2})

bottom_border = workbook.add_format({'bottom': 2})

left_border = workbook.add_format({'left': 2})

right_border = workbook.add_format({'right': 2})

top_left_border = workbook.add_format({'top': 2, 'left': 2})

top_right_border = workbook.add_format({'top': 2, 'right': 2})

bottom_left_border = workbook.add_format({'bottom': 2, 'left': 2})

bottom_right_border = workbook.add_format({'bottom': 2, 'right': 2})

table_title_format = workbook.add_format({'bold': True, 'font_color': 'white', 'bg_color': '#305496', 'align': 'left', 'valign': 'vcenter'})

title_format = workbook.add_format({'bold': True, 'font_color': 'white', 'bg_color': 'black', 'align': 'left', 'valign': 'vcenter'})

for j, df_part in enumerate(split_df):

df_part.to_excel(writer, sheet_name = sheet_name + (("_"+ str(j)) if j > 0 else ""), index = False, header = False, startrow = 4, startcol = 1)

worksheet = writer.sheets[sheet_name + (("_"+ str(j)) if j > 0 else "")]

for col_num, value in enumerate(df_part.columns.values):

worksheet.write(3,col_num+1,value,table_title_format)

num_rows = df_part.shape[0]

num_cols = df_part.shape[1]

worksheet.conditional_format(3,2,3,num_cols-1,{"type": "no_errors", "format": top_border})

worksheet.conditional_format(4,1,num_rows+2,1,{"type": "no_errors", "format": left_border})

worksheet.conditional_format(4,num_cols,num_rows+2,num_cols,{"type": "no_errors", "format": right_border})

worksheet.conditional_format(num_rows+3,2,num_rows+3,num_cols-1,{"type": "no_errors", "format": bottom_border})

worksheet.conditional_format(3,1,3,1,{"type": "no_errors", "format": top_left_border})

worksheet.conditional_format(3,num_cols,3,num_cols,{"type": "no_errors", "format": top_right_border})

worksheet.conditional_format(num_rows+3,1,num_rows+3,1,{"type": "no_errors", "format": bottom_left_border})

worksheet.conditional_format(num_rows+3,num_cols,num_rows+3,num_cols,{"type": "no_errors", "format": bottom_right_border})

worksheet.hide_gridlines(2)

worksheet.autofilter(3,1,num_rows+3,num_cols)

worksheet.autofit()

worksheet.set_column_pixels(0, 0, 20)

worksheet.merge_range(1,1,1,num_cols, table_name + (("_"+ str(j)) if j >  0 else ""), title_format)

workbook.close()

shutil.copy(writer,output_path + file_name + ".xlsx")

print(f"File {file_name}.xlsx created successfully")

Я думал о нескольких возможностях оптимизации, но они не очень сработали:
  • Выполнение всей обработки данных использование pyspark вместо pandas:
    Это было невозможно из-за необходимости форматировать книгу Excel
    с помощью xlsxwriter.
  • Использование альтернативной библиотеки для xlsxwriter: я не мог Я не имитирую точную
    функциональность, которую я имею в этой функции.
  • Параллельная обработка фреймов данных и отказ от их хранения в
    списке: мой код стал труднее для понимания, и я не был уверен, как
    правильно использовать искровой кластер
Если у кого-то есть идеи или предложения о том, как можно улучшить эту функцию, я бы с радостью приветствую их. В настоящее время узел драйвера перегружен, и наблюдается резкий скачок использования оперативной памяти из-за pandas и xlsxwriter.

Подробнее здесь: https://stackoverflow.com/questions/783 ... databricks
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «Python»