Dask – Как оптимизировать вычисление первой строки каждого раздела в кадре данных dask?Python

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 Dask – Как оптимизировать вычисление первой строки каждого раздела в кадре данных dask?

Сообщение Anonymous »

Моя общая цель — прочитать несколько CSV-файлов, выполнить некоторые вычисления, сохранить их как базу данных паркета, используя опцию раздела_on в функции to_parquet.
Я не могу переиндексировать и перераспределить перед сохранением из-за ограниченной памяти. При сохранении каждый файл будет представлять собой отдельный раздел и, следовательно, отдельный файл паркета. Я не могу использовать имя файла по умолчанию part.0.parquet, так как в будущем мне может понадобиться добавить файлы в тот же каталог, и они также могут быть part.0.parquet.
Поэтому я хочу присвоить каждому файлу паркета имя исходного файла CSV, из которого он взят.
Для этого, когда я впервые читаю файл CSV, я добавьте столбец с именем файла (--> все строки в каждом разделе будут иметь одно и то же имя файла). Затем я читаю первую строку каждого раздела (и, в частности, столбец с исходным именем файла CSV) и создаю список имен файлов. Затем я использую опцию name_function в функции to_parquet.
Я добиваюсь того, чего хотел, но таким образом мне приходится вызывать .compute(), а это занимает очень много времени.
Есть ли у вас идеи, как я могу ограничить вычисления первой строкой каждого раздела?
Это мой текущий код:
def get_first_element(partition):
return partition['orig_file_name'].iloc[0]

first_elements = ddf.map_partitions(get_first_element).compute()

def name_function(part_idx):
return f"{first_elements[part_idx]}.parquet"

ddf.to_parquet(path=target_directory,
engine='pyarrow',
partition_on=['date', 'hour'],
name_function=name_function,
write_index=True)

Заранее большое спасибо за любые предложения!
Изменить
Этот код повторяет мою следующую проблему. Предложение @mdurant:
@dask.delayed
def process(file_path):
df = pd.DataFrame({'col1':[0, 1, 2, 3], 'col2':[4, 5, 6, 7], 'col3':[88, 88, 99, 99]}) # this is read_csv in my code
file_name = 'aaa'

df.to_parquet(f'{file_name}.parquet',
partition_cols=['col3'])

dask.compute(*[process(f) for f in [1]])


Подробнее здесь: https://stackoverflow.com/questions/786 ... ion-in-a-d
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «Python»