Я пытаюсь использовать Pandas-on-Spark для распараллеливания очистки больших данных. Я пробую многое, но всегда получаю ошибку или код простаивает.
Моя стратегия состоит в том, чтобы создать индекс (indx_df) с помощью pandas-on-spark, чтобы распараллелить процесс фильтрации, где ядра будут принимать размер фрагментов из indx_df (в данном случае 6 строк за раз)
Этот код — моя последняя попытка, этот остается бездействующим или выполняет задание, но без параллелизма (я пробую его на небольшом простой из оригинал:
Код: Выделить всё
path_table = 'my_path_to_data'
nb_pas_temps_itt = 6
kelvin = 273.15
t2m_min_max = {'min': -10 + kelvin, 'max': 10 + kelvin}
ds = xr.open_dataset(path_era5_time, engine='zarr', consolidated=True, chunks={})

Код: Выделить всё
time_size = ds.sizes['time']
indx = [pd.Series(np.arange(i, min(i + nb_pas_temps_itt, time_size))) for i in range(0, time_size, nb_pas_temps_itt)]
# to test on small part of the big data ([9:12])
indx_df = ps.DataFrame(indx[9:12])
[img]https://i.sstatic.net /JfjLKu12.png[/img]
Код: Выделить всё
def clean_chunk(idx):
# some filtering to reduce the dataset
ds_bite = ds.isel(time=idx).to_dataframe()
ds_bite = ds_bite[ds_bite['ptype'] != 0]
ds_bite = ds_bite[ds_bite['tp'] > 0.001]
ds_bite = ds_bite.dropna()
ds_bite = ds_bite[ds_bite['t2m'].between(t2m_min_max['min'], t2m_min_max['max'])]
if not ds_bite.empty:
try: # append data if table exist
write_deltalake(f"/mnt/{path_table}/{table_name}", ds_bite, mode="append")
except: #create table if not existing
write_deltalake(f"/mnt/{path_table}/{table_name}", ds_bite)
indx_df.apply(clean_chunk, axis=1)
Код: Выделить всё
clean_chunk(idx) -> pd.DataFrame[zip(indx_df.columns, indx_df.dtypes)]
PicklingError: Не удалось сериализовать объект: PySparkRuntimeError: [CONTEXT_ONLY_VALID_ON_DRIVER] Похоже, вы пытаетесь сослаться на SparkContext из широковещательной переменной, действия или преобразования. SparkContext можно использовать только в драйвере, а не в коде, который он запускает на рабочих процессах. Дополнительную информацию см. в разделе SPARK-5063.
Файл /databricks/spark/python/pyspark/serializers.py:559 в CloudPickleSerializer.dumps(self, obj)
558 попробуйте:
--> 559 return cloudpickle.dumps(obj, Pickle_protocol)
560 кроме Pickle.PickleError:
Файл /databricks/spark/python/pyspark/serializers.py:569, в CloudPickleSerializer.dumps(self, obj)
567 msg = «Не удалось сериализовать объект: %s: % s" % (e.класс.имя, emsg)
568 print_exec(sys.stderr)
--> 569 поднять Pickle.PicklingError(msg)
Кто-нибудь заметил проблему? возможно, я полностью ошибаюсь, это моя первая попытка провести параллель с pandas-on-spark
Подробнее здесь: https://stackoverflow.com/questions/792 ... aning-cant