Простаивает при использовании Pandas-on-spark apply() для распараллеливания очистки больших данных (невозможно загрузитьPython

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 Простаивает при использовании Pandas-on-spark apply() для распараллеливания очистки больших данных (невозможно загрузить

Сообщение Anonymous »

Во-первых, я работаю над azure-Databricks, набор данных не может быть загружен в память... так что, возможно, существует другой, более эффективный или более простой способ...
Я пытаюсь использовать Pandas-on-Spark для распараллеливания очистки больших данных. Я пробую многое, но всегда получаю ошибку или код простаивает.
Моя стратегия состоит в том, чтобы создать индекс (indx_df) с помощью pandas-on-spark, чтобы распараллелить процесс фильтрации, где ядра будут принимать размер фрагментов из indx_df (в данном случае 6 строк за раз)
Этот код — моя последняя попытка, этот остается бездействующим или выполняет задание, но без параллелизма (я пробую его на небольшом простой из оригинал:

Код: Выделить всё

path_table = 'my_path_to_data'
nb_pas_temps_itt = 6
kelvin = 273.15
t2m_min_max = {'min': -10 + kelvin, 'max': 10 + kelvin}

ds = xr.open_dataset(path_era5_time, engine='zarr', consolidated=True, chunks={})
Где ds — это такой xarray:
Изображение

Код: Выделить всё

time_size = ds.sizes['time']
indx = [pd.Series(np.arange(i, min(i + nb_pas_temps_itt, time_size))) for i in range(0, time_size, nb_pas_temps_itt)]
# to test on small part of the big data ([9:12])
indx_df = ps.DataFrame(indx[9:12])
indx_df выглядит следующим образом:
[img]https://i.sstatic.net /JfjLKu12.png[/img]

Код: Выделить всё

def clean_chunk(idx):
# some filtering to reduce the dataset
ds_bite = ds.isel(time=idx).to_dataframe()
ds_bite = ds_bite[ds_bite['ptype'] != 0]
ds_bite = ds_bite[ds_bite['tp'] > 0.001]
ds_bite = ds_bite.dropna()
ds_bite = ds_bite[ds_bite['t2m'].between(t2m_min_max['min'], t2m_min_max['max'])]
if not ds_bite.empty:
try: # append data if table exist
write_deltalake(f"/mnt/{path_table}/{table_name}", ds_bite, mode="append")

except: #create table if not existing
write_deltalake(f"/mnt/{path_table}/{table_name}", ds_bite)

indx_df.apply(clean_chunk, axis=1)

Я нашел это предложение, добавив подсказки в свое «применить». Но я не уверен, необходимо ли это из-за сохранения моих данных в дельта-таблице и отсутствия возврата кадра данных??

Код: Выделить всё

clean_chunk(idx) -> pd.DataFrame[zip(indx_df.columns, indx_df.dtypes)]
и получил эту ошибку...:
PicklingError: Не удалось сериализовать объект: PySparkRuntimeError: [CONTEXT_ONLY_VALID_ON_DRIVER] Похоже, вы пытаетесь сослаться на SparkContext из широковещательной переменной, действия или преобразования. SparkContext можно использовать только в драйвере, а не в коде, который он запускает на рабочих процессах. Дополнительную информацию см. в разделе SPARK-5063.
Файл /databricks/spark/python/pyspark/serializers.py:559 в CloudPickleSerializer.dumps(self, obj)
558 попробуйте:
--> 559 return cloudpickle.dumps(obj, Pickle_protocol)
560 кроме Pickle.PickleError:
Файл /databricks/spark/python/pyspark/serializers.py:569, в CloudPickleSerializer.dumps(self, obj)
567 msg = «Не удалось сериализовать объект: %s: % s" % (e.класс.имя, emsg)
568 print_exec(sys.stderr)
--> 569 поднять Pickle.PicklingError(msg)
Кто-нибудь заметил проблему? возможно, я полностью ошибаюсь, это моя первая попытка провести параллель с pandas-on-spark

Подробнее здесь: https://stackoverflow.com/questions/792 ... aning-cant
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «Python»