Как ускорить отложенное вычисление большого словаря?Python

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 Как ускорить отложенное вычисление большого словаря?

Сообщение Anonymous »

Мне нужно запустить случайный классификатор леса, который я поместил в функцию, примерно 10 000 раз, потому что каждый раз я делаю выборку случайным образом. Я пытаюсь использовать Dask Delayed в кластере HPC с запланированным slurm. Мой сценарий работает, но я хотел бы знать, могу ли я внести какие-либо улучшения, чтобы сделать его быстрее.
Это базовая схема сценария, который я запускаю, и он работает нормально, но это довольно медленно. Общее время для 500x составляет 22 минуты... поэтому 10000x раз займет у меня 7 часов...
Мой скрипт Python использует dask.delayed

Код: Выделить всё

import pandas as pd
import numpy as np
from Bio import SeqIO
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn import metrics
from sklearn.metrics import confusion_matrix

import dask
from dask.distributed import Client
from dask_mpi import initialize
import gc

import time

start = time.time()
print(start)

# CONNECT TO DISTRIBUTED CLUSTER
initialize()
client = Client()

# THIS FUNCTION IS PARALLELIZED ACROSS MULTIPLE WORKERS
@dask.delayed(nout=2)
def run_the_classifier(data_df_merged_balanced):
data_df_merged_balanced = data_df_merged_balanced.groupby('Ecosystem_Type').sample(n=127)
# Import train_test_split function

X=data_df_merged_balanced.iloc[:,0:5787] # Features
y=data_df_merged_balanced['Ecosystem_Type']  # Labels
y = pd.get_dummies(y, dtype=int)
# Split dataset into training set and test set
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, stratify=y) # 80% training and 20% test

# Create a Gaussian Classifier
clf=RandomForestClassifier(n_estimators=10000,max_depth=5, class_weight='balanced_subsample')

#Train the model using the training sets y_pred=clf.predict(X_test)
clf.fit(X_train,y_train)

# test the model using the 30% test dataset
y_pred=clf.predict(X_test)

# get accuracy
accuracy = metrics.accuracy_score(y_test, y_pred)
feature_imp = pd.Series(clf.feature_importances_,index=data_df_numerical.columns).sort_values(ascending=False)

return accuracy, feature_imp

#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# A bunch of dataframe operations are here that produce data_df_merged_balanced - that is passed to the function
#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

accuracy_dict = {}
importance_dict = {}

if __name__ == '__main__':
for i in range(0, 500):
accuracy, feature_imp = run_the_classifier(data_df_merged_balanced)
accuracy_dict[i] = accuracy
importance_dict[i] = feature_imp
# client.run(gc.collect)
print(i)
time_sent = time.time()
# print(time_sent)
result_accuracy_dict = dask.compute(accuracy_dict)[0]
result_imp_dict = dask.compute(importance_dict)[0]
time_after_computing = time.time()

print(f"elapsed time to compute {i+1}: {time_after_computing - time_sent}")
df_importances = pd.DataFrame.from_dict(result_imp_dict)
df_importances_mean = df_importances.mean(axis=1).sort_values(ascending=False)
df_importances_mean.to_csv('averaged_feature_importances_500.csv')
print(df_importances_mean.head(n=20))
mean_accuracy = np.array(list(result_accuracy_dict.values())).mean()
mean_accuracy_ser = pd.Series(result_accuracy_dict)
mean_accuracy_ser.to_csv('accuracy_scores_500.csv')
end = time.time()
print(f"total elapsed time: {end - start}")
print(mean_accuracy)

client.close()
Большая часть времени уходит на шаг dask.compute — не знаете, как это ускорить??
Скрипт Slurm, используемый для отправки скрипт Python
Это мой скрипт Slurm, который я использую для отправки скрипта Python. Я использую 15 рабочих и 200G — если я увеличу количество рабочих, я получу много предупреждений о «неуправляемости или утечке памяти» или предупреждений о сборе мусора. Я также получаю предупреждения о сборке мусора, когда запускаю более 1000 последовательностей.

Код: Выделить всё

#!/bin/bash

#SBATCH --job-name=dask-py           # Job name
#SBATCH --nodes=1                    # Run all processes on a 10 node
#SBATCH --ntasks=15                  # Number of tasks (MPI workers)
#SBATCH --cpus-per-task=1            # Number of CPU cores per task
#SBATCH --mem=200G                   # Job memory request
#SBATCH --partition=standard,short,long,fat
#SBATCH --output=dask-py%j.log       # Standard output and error log
#SBATCH --error=dask-py%j.err       # Standard output and error log

echo "Running Dask-MPI"

module load mpi/openmpi/4.1.1

source conda.sh

conda activate working-env  # Activate conda environment

mpirun --oversubscribe -np 15 python3 RF_model_biome_predict_loop.py

'''
# mpirun -np argument needs 2 more workers than what you want for your script
# np rank 0 is for the scheduler
# np rank 1 is for python client process (distribution)
# remaining are workers for your script
'''
Примеры предупреждений

Код: Выделить всё

2024-10-21 17:16:21,239 - distributed.worker.memory - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distributed.dask.org/en/latest/worker-memory.html#memory-not-released-back-to-the-os for more information. -- Unmanaged
memory: 10.06 GiB -- Worker memory limit: 12.50 GiB
или

Код: Выделить всё

2024-10-22 12:59:46,569 - distributed.utils_perf - INFO - full garbage collection released 441.47 MiB from 44 reference cycles (threshold: 9.54 MiB)
Любые улучшения моего рабочего процесса или сценария будут очень полезны! Я новичок в dask и распараллеливании кода

Подробнее здесь: https://stackoverflow.com/questions/791 ... dictionary
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «Python»