Почему Parallel в joblib задерживается быстрее, чем блок карты и вычисления dasks ()Python

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 Почему Parallel в joblib задерживается быстрее, чем блок карты и вычисления dasks ()

Сообщение Anonymous »

Возможно, этот вопрос связан с этим. У меня есть 4D-массив, и я хотел бы применить функцию к каждому 2D-срезу в первых двух измерениях. Я реализовал анализ как для dask, так и для joblib. joblib занимает всего 0,10 минуты, вариант dask — 8 минут. Может кто-нибудь объяснить мне, почему?
Вот MRE:
import numpy as np
import pandas as pd
import time
import dask.array as da
import statsmodels.formula.api as smf
from joblib import Parallel,delayed
from statsmodels.tools.sm_exceptions import ConvergenceWarning
import warnings
warnings.simplefilter('ignore', ConvergenceWarning)

###############################################################################
## Preparation (only to create data)
###############################################################################

rng = np.random.RandomState(42)

def generate_4d_array(rng,time_steps=10,brain_regions=15,transitions=50, subjects=200,
time_autocorr=0.8, subject_autocorr=0.6, dtype=np.float32):
"""
Generates a 4D array with temporal and subject-based autocorrelation,
and performs Z-score normalization over axis 1 (brain regions).

Parameters:
- rng (np.random.RandomState): RandomState instance for reproducibility (mandatory).
- time_steps (int): Number of time points (default is 100).
- brain_regions (int): Number of brain regions (default is 300).
- transitions (int): Number of transitions per subject (default is 50).
- subjects (int): Number of subjects (default is 100).
- time_autocorr (float): Temporal autocorrelation coefficient (0 < time_autocorr < 1, default is 0.8).
- subject_autocorr (float): Subject autocorrelation coefficient (0 < subject_autocorr < 1, default is 0.6).
- dtype (data type): Data type of the array (default is np.float32).

Returns:
- numpy.ndarray: 4D array with shape (time_steps, brain_regions, transitions, subjects),
Z-score normalized over brain regions (axis 1).
"""

# Generate base signals for subjects
subject_base_signal = rng.normal(size=(subjects, brain_regions)).astype(dtype) * 2

# Precompute random noise for efficiency
noise = rng.normal(scale=0.5, size=(time_steps, brain_regions, transitions, subjects)).astype(dtype)
transition_noise = rng.normal(scale=0.5, size=(transitions, subjects, brain_regions)).astype(dtype)

# Initialize the 4D array
data = np.zeros((time_steps, brain_regions, transitions, subjects), dtype=dtype)

# Populate the 4D array with time series data
for subject in range(subjects):
base_signal = subject_base_signal[subject]
for transition in range(transitions):
transition_signal = base_signal + transition_noise[transition, subject]

time_series = np.zeros((time_steps, brain_regions), dtype=dtype)
time_series[0] = transition_signal + noise[0, :, transition, subject]

# Temporal autocorrelation generation
for t in range(1, time_steps):
time_series[t] = (time_autocorr * time_series[t - 1] +
(1 - time_autocorr) * transition_signal +
noise[t, :, transition, subject])

# Store in the data array
data[:, :, transition, subject] = time_series

# Perform Z-score normalization over axis 1 (brain regions)
mean = np.mean(data, axis=1, keepdims=True) # Mean of brain regions
std = np.std(data, axis=1, keepdims=True) # Standard deviation of brain regions
data = (data - mean) / std # Z-score normalization

return data

data = generate_4d_array(rng)

# how big is whole array and how big is each chunk?
array_gib = data.nbytes / (1024 ** 3)
chunk_gib = data[0, 0, :, :].nbytes / (1024 ** 3)
print(f"Memory occupied by array: {array_gib} GiB, Memory occupied by chunk: {chunk_gib} GiB")

###############################################################################
## With joblib
###############################################################################

def mixed_model(a):
'''Runs one mixed model for one region and one point in time over
all subjects and state transitions'''

# make a dataframe out of input chunk
df = pd.DataFrame(a)
df.index.name = 'transition'
df.columns.name = 'subject'
df = df.melt(ignore_index=False)

# run mixel model
result = smf.mixedlm("value ~ 1", df, groups=df["subject"]).fit(method=["cg"])
t_stat = result.tvalues['Intercept']

return t_stat

def mixed_models(data):
'''Compute mixed-model for every region and every point in time'''

time_points, regions = data.shape[:2]
result = np.zeros((time_points,regions)) # Result matrix (1001x376)

result = Parallel(n_jobs=-1)(delayed(mixed_model)(data[i,j])
for i in range(time_points)
for j in range(regions))

result = np.array(result).reshape((time_points, regions))

return result

start = time.time()
result_matrix_1 = mixed_models(data)
print(f"Took {(time.time() - start) / 60} minutes")

###############################################################################
## With Dask
###############################################################################

def mixed_model(chunk):
'''Runs one mixed model for one region and one point in time over
all subjects and state transitions'''

# make an array out of chunk
X = chunk[0,0,:,:]

# make a dataframe out of input chunk
X = pd.DataFrame(X)
X.index.name = 'transition'
X.columns.name = 'subject'
X = X.melt(ignore_index=False)

# run mixel model
result = smf.mixedlm("value ~ 1", X, groups=X["subject"]).fit(method=["cg"])
t_stat = result.tvalues['Intercept']

# return single value array
t_stat = np.array(t_stat)[None,None]

return t_stat

def mixed_models(data):

# map function to each chunk and compute
result_matrix = data.map_blocks(mixed_model,drop_axis=[2,3]).compute()

return result_matrix

start = time.time()

# convert to dask array (overwrite to not occupy RAM twice)
data = da.from_array(data,chunks=(1,1,data.shape[2],data.shape[3]))

# map function to each chunk and compute
result_matrix_2 = mixed_models(data)

print(f"Took {(time.time() - start) / 60} minutes")

###############################################################################
## Compare outputs
###############################################################################

print(f"Outputs are equal: {np.array_equal(result_matrix_1,result_matrix_2)}")


Подробнее здесь: https://stackoverflow.com/questions/792 ... nd-compute
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение
  • Joblib Parallel в Sagemaker Studio Classic не работает
    Anonymous » » в форуме Python
    0 Ответы
    22 Просмотры
    Последнее сообщение Anonymous
  • Отслеживание хода выполнения joblib.Parallel
    Anonymous » » в форуме Python
    0 Ответы
    10 Просмотры
    Последнее сообщение Anonymous
  • Отслеживание прогресса joblib.parallel выполнения
    Anonymous » » в форуме Python
    0 Ответы
    4 Просмотры
    Последнее сообщение Anonymous
  • Как вернуть генератор с помощью joblib.parallel ()?
    Anonymous » » в форуме Python
    0 Ответы
    4 Просмотры
    Последнее сообщение Anonymous
  • Parallel.ForEach и Parallel.For, похоже, ставят элементы в очередь в отдельных потоках.
    Гость » » в форуме C#
    0 Ответы
    91 Просмотры
    Последнее сообщение Гость

Вернуться в «Python»