Я новичок в использовании параллельных вычислений в Phyton.
У меня есть около 80 огромных файлов CSV (по 32 ГБ каждый), которые мне нужно обработать в Python, чтобы получить из них несколько строк. p>
Структура файла
Одна из возможностей — перевести весь CSV в базу данных, но я хотел бы изучить возможность использования Dask для его обработки (если время обработки для всех файлов превышает 20 минут, вероятно, я это сделаю). переключитесь на базу данных, если она быстрее).
У меня есть следующий код, но он не работает
AssertionError Traceback (most recent call last)
Cell In[12], line 21
19 for chunk in pd.read_csv(filename1, chunksize=100000):
20 chunk_df = dd.from_pandas(chunk, npartitions=5)
---> 21 chunk1_df = chunk_df.map_partitions(chunk_df,barra).compute()
22 chunk1.to_csv(filename2.name, index=False, mode='a', header=False)
23 print("pasada:", ichunk)
File C:\ProgramData\anaconda3\Lib\site-packages\dask\dataframe\core.py:1006, in _Frame.map_partitions(self, func, *args, **kwargs)
878 @insert_meta_param_description(pad=12)
879 def map_partitions(self, func, *args, **kwargs):
880 """Apply Python function on each DataFrame partition.
881
882 Note that the index and divisions are assumed to remain unchanged.
(...)
1004 None as the division.
1005 """
-> 1006 return map_partitions(func, self, *args, **kwargs)
File C:\ProgramData\anaconda3\Lib\site-packages\dask\dataframe\core.py:6924, in map_partitions(func, meta, enforce_metadata, transform_divisions, align_dataframes, *args, **kwargs)
6921 name = kwargs.pop("token", None)
6922 parent_meta = kwargs.pop("parent_meta", None)
-> 6924 assert callable(func)
6925 if name is not None:
6926 token = tokenize(meta, *args, **kwargs)
AssertionError:
На данный момент без Dask время обработки одного файла составляет ~8 минут, и мне нужно сократить его до минимально возможного.
Я' m использую чанки, чтобы избежать проблем с памятью (старая структура без Dask). Могу ли я удалить эту часть, когда использую Dask?
Есть идеи по устранению ошибки?
Спасибо!
Я новичок в использовании параллельных вычислений в Phyton. У меня есть около 80 огромных файлов CSV (по 32 ГБ каждый), которые мне нужно обработать в Python, чтобы получить из них несколько строк. p> Структура файла [code] 'Barra', 'D1', 'D2','D3','D4','D5','D6','D7','D8','CMG' [/code] Одна из возможностей — перевести весь CSV в базу данных, но я хотел бы изучить возможность использования Dask для его обработки (если время обработки для всех файлов превышает 20 минут, вероятно, я это сделаю). переключитесь на базу данных, если она быстрее). У меня есть следующий код, но он не работает :( [code]import dask.dataframe as dd import pandas as pd from datetime import datetime from tkinter import filedialog as fd from tkinter import ttk filename1 = fd.askopenfilename(title="ARCHIVO CSV ENTRADA") filename2 = fd.asksaveasfile(initialfile = 'Untitled.csv', defaultextension=".csv", title="ARCHIVO DE SALIDA") dt1=datetime.now() barra=['barra1','barra3','barra5','barra7','barra9','barra11','barra13','barra15','barra17','barra19'] ichunk=1 def Proceso(chunk_df,barra): # Proceso de elecciòn chunk_df.columns = ['ID', 'barra', 'D1', 'D2','D3','D4','D5','D6','D7','D8','CMG'] chunk1 = chunk_df[chunk_df['barra'].isin(barra)] return chunk1 for chunk in pd.read_csv(filename1, chunksize=100000): chunk_df = dd.from_pandas(chunk, npartitions=5) chunk1_df = chunk_df.map_partitions(chunk_df,barra).compute() chunk1.to_csv(filename2.name, index=False, mode='a', header=False) print("pasada:", ichunk) ichunk=ichunk+1 dt2=datetime.now() print("FIN: ",dt2-dt1) [/code] Даск оставляет следующее сообщение: [code]AssertionError Traceback (most recent call last) Cell In[12], line 21 19 for chunk in pd.read_csv(filename1, chunksize=100000): 20 chunk_df = dd.from_pandas(chunk, npartitions=5) ---> 21 chunk1_df = chunk_df.map_partitions(chunk_df,barra).compute() 22 chunk1.to_csv(filename2.name, index=False, mode='a', header=False) 23 print("pasada:", ichunk)
File C:\ProgramData\anaconda3\Lib\site-packages\dask\dataframe\core.py:1006, in _Frame.map_partitions(self, func, *args, **kwargs) 878 @insert_meta_param_description(pad=12) 879 def map_partitions(self, func, *args, **kwargs): 880 """Apply Python function on each DataFrame partition. 881 882 Note that the index and divisions are assumed to remain unchanged. (...) 1004 None as the division. 1005 """ -> 1006 return map_partitions(func, self, *args, **kwargs)
File C:\ProgramData\anaconda3\Lib\site-packages\dask\dataframe\core.py:6924, in map_partitions(func, meta, enforce_metadata, transform_divisions, align_dataframes, *args, **kwargs) 6921 name = kwargs.pop("token", None) 6922 parent_meta = kwargs.pop("parent_meta", None) -> 6924 assert callable(func) 6925 if name is not None: 6926 token = tokenize(meta, *args, **kwargs)
AssertionError: [/code] На данный момент без Dask время обработки одного файла составляет ~8 минут, и мне нужно сократить его до минимально возможного. Я' m использую чанки, чтобы избежать проблем с памятью (старая структура без Dask). Могу ли я удалить эту часть, когда использую Dask? Есть идеи по устранению ошибки? Спасибо!