- Объединить несколько файлов NetCDF (в одной географической сетке, с теми же переменными) по времени
- Перенесите их в другую сетку
- Сохраните их как хранилище zarr, разбив на фрагменты так, чтобы во времени всегда оставался только один фрагмент.
Однако у меня возникли проблемы с настройкой рабочего процесса, не вызывая использование памяти резко увеличивается при вызове ds.to_zarr().
Я пытаюсь следовать лучшим практикам Dask (особенно этому). Упрощенная версия рабочего процесса:
import xarray as xr
import numpy as np
import xesmf as xe
from distributed import Client
# Start dask client
client = Client()
display(client)
@dask.delayed
def load(fn_list):
ds = xr.open_mfdataset(fn_list)
return ds
@dask.delayed
def process(ds):
# Do something to dataset, e.g., regridding
ref_grid = xr.Dataset(coords = {'lat':np.arange(-89.5,89.6),
'lon':np.arange(-179.5,179.6)})
rgrd = xe.Regridder(ds,ref_grid,'conservative')
ds = rgrd(ds)
return ds
def workflow(fn_list):
ds = load(fn_list)
ds = process(ds)
# Rechunk
ds = ds.chunk({'time':-1,'lat':12,'lon':12})
delayed = dask.delayed(ds.to_zarr)('test.zarr')
return delayed
out = dask.compute(workflow)
dask.compute(out)
Из того, что я понял, исследуя эту проблему, что-то в способе настройки графа задач приводит к загрузке всего массива и отправке его одному рабочему процессу при вызове dask.compute. () переходит к вызову .to_zarr().
Думаю, мой основной вопрос: зачем нужен вызов .to_zarr() все в памяти / как это настроить так, чтобы оно нет?
Версии:
zarr == 2.18.3
xarray == 2024.9.0
dask == 2024.9.1
Подробнее здесь: https://stackoverflow.com/questions/792 ... -without-b