Проблемы при использовании Dask в Dataframe в PythonPython

Программы на Python
Ответить
Anonymous
 Проблемы при использовании Dask в Dataframe в Python

Сообщение Anonymous »

Я новичок в использовании параллельных вычислений в Phyton.
У меня есть около 80 огромных файлов CSV (по 32 ГБ каждый), которые мне нужно обработать в Python, чтобы получить из них несколько строк. p>
Структура файла

Код: Выделить всё

 'Barra', 'D1', 'D2','D3','D4','D5','D6','D7','D8','CMG'
Одна из возможностей — перевести весь CSV в базу данных, но я хотел бы изучить возможность использования Dask для его обработки (если время обработки для всех файлов превышает 20 минут, вероятно, я это сделаю). переключитесь на базу данных, если она быстрее).
У меня есть следующий код, но он не работает :(

Код: Выделить всё

import dask.dataframe as dd
import pandas as pd
from datetime import datetime
from tkinter import filedialog as fd
from tkinter import ttk
filename1 = fd.askopenfilename(title="ARCHIVO CSV ENTRADA")
filename2 = fd.asksaveasfile(initialfile = 'Untitled.csv', defaultextension=".csv", title="ARCHIVO DE SALIDA")
dt1=datetime.now()
barra=['barra1','barra3','barra5','barra7','barra9','barra11','barra13','barra15','barra17','barra19']
ichunk=1
def Proceso(chunk_df,barra):
# Proceso de elecciòn
chunk_df.columns = ['ID', 'barra', 'D1', 'D2','D3','D4','D5','D6','D7','D8','CMG']
chunk1 = chunk_df[chunk_df['barra'].isin(barra)]
return chunk1
for chunk in pd.read_csv(filename1, chunksize=100000):
chunk_df = dd.from_pandas(chunk, npartitions=5)
chunk1_df = chunk_df.map_partitions(chunk_df,barra).compute()
chunk1.to_csv(filename2.name, index=False, mode='a', header=False)
print("pasada:", ichunk)
ichunk=ichunk+1
dt2=datetime.now()
print("FIN: ",dt2-dt1)
Даск оставляет следующее сообщение:

Код: Выделить всё

AssertionError                            Traceback (most recent call last)
Cell In[12], line 21
19 for chunk in pd.read_csv(filename1, chunksize=100000):
20     chunk_df = dd.from_pandas(chunk, npartitions=5)
---> 21     chunk1_df = chunk_df.map_partitions(chunk_df,barra).compute()
22     chunk1.to_csv(filename2.name, index=False, mode='a', header=False)
23     print("pasada:", ichunk)

File C:\ProgramData\anaconda3\Lib\site-packages\dask\dataframe\core.py:1006, in _Frame.map_partitions(self, func, *args, **kwargs)
878 @insert_meta_param_description(pad=12)
879 def map_partitions(self, func, *args, **kwargs):
880     """Apply Python function on each DataFrame partition.
881
882     Note that the index and divisions are assumed to remain unchanged.
(...)
1004     None as the division.
1005     """
-> 1006     return map_partitions(func, self, *args, **kwargs)

File C:\ProgramData\anaconda3\Lib\site-packages\dask\dataframe\core.py:6924, in map_partitions(func, meta, enforce_metadata, transform_divisions, align_dataframes, *args, **kwargs)
6921 name = kwargs.pop("token", None)
6922 parent_meta = kwargs.pop("parent_meta", None)
-> 6924 assert callable(func)
6925 if name is not None:
6926     token = tokenize(meta, *args, **kwargs)

AssertionError:
На данный момент без Dask время обработки одного файла составляет ~8 минут, и мне нужно сократить его до минимально возможного.
Я' m использую чанки, чтобы избежать проблем с памятью (старая структура без Dask). Могу ли я удалить эту часть, когда использую Dask?
Есть идеи по устранению ошибки?
Спасибо!

Подробнее здесь: https://stackoverflow.com/questions/792 ... -in-python
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «Python»