Примечание: версия моего кода для pandas аналогична приведенной ниже, она работает нормально, но занимает так много времени.
Цель
Прочитать/сканировать все файлы csv/feather, а затем переписать их как один файл паркета.
Но прежде чем сделать это, мне нужно прочитать все файлы csv/feather, а затем объединить их в 1 кадр данных.
Проблема и Вопрос
Проблема: когда я запускаю код на ноутбуке VSCode Jupyter, но ядро продолжает зависать
Код: Выделить всё
Canceled future for execute_request message before replies were done
The Kernel crashed while executing code in the the current cell or a previous cell. Please review the code in the cell(s) to identify a possible cause of the failure. Click here for more info. View Jupyter log for further details.
Текущий рабочий процесс
- Прочитайте или отсканируйте файл из папки
- Объедините файл со следующим
Код: Выделить всё
all_tables = glob.glob(os.path.join(EXPORT_TABLE_FOLDER, "*.{}".format(table_format)))
parquet_name = "simplified_all_spatial_join_data_{}_p0.parquet".format(location_name)
parquet_path = os.path.join(EXPORT_TABLE_FOLDER, parquet_name)
# read the first file
if table_format == 'csv':
temp_all_spatial_join_data = pl.scan_csv(all_tables[0], infer_schema_length=0)
else:
temp_all_spatial_join_data = pl.scan_ipc(all_tables[0], infer_schema_length=0)
# read the rest of the files
if not os.path.exists(parquet_path):
# clone the first scan as a placeholder (to be concatenated later)
collected_temp_all_spatial_join_data = temp_all_spatial_join_data.collect().clone()
# iterate through the files
for table, iteration in tqdm(zip(all_tables[1:], range(len(all_tables[1:]))), total = len(all_tables[1:])):
if table_format == 'csv':
temp = pl.scan_csv(table, infer_schema_length=0)
else:
temp = pl.scan_ipc(table, infer_schema_length=0)
temp_all_spatial_join_data = pl.concat([temp_all_spatial_join_data, temp])
# each 25th iteration, collect the lazyframe as dataframe,
if iteration % 25 == 0:
collected_temp_all_spatial_join_data = pl.concat([
collected_temp_all_spatial_join_data,
temp_all_spatial_join_data.collect()
]).unique()
# then re-assign the temp_all_spatial_join_data as the current file being scanned
if table_format == 'csv':
temp_all_spatial_join_data = pl.scan_csv(table, infer_schema_length=0)
else:
temp_all_spatial_join_data = pl.scan_ipc(table, infer_schema_length=0)
else:
print ('WARNING: This file already exists!\nSkipping the process')
# write the concatenated files into a parquet file
collected_temp_all_spatial_join_data.write_parquet(parquet_path)
Часть if iteration % 25 == 0: — это моя попытка минимизировать память, используемую для хранения плана запроса, путем разделения их на фрагменты по 25 файлов, сбора их в DataFrame, а затем сброса плана запроса. Это работает для меньшего количества файлов (до сотен), но ядро продолжает зависать, когда размер файла достигает тысяч, даже если я уменьшаю фрагмент.
Подробнее здесь: https://stackoverflow.com/questions/757 ... csv-feathe