Это базовая схема сценария, который я запускаю, и он работает нормально, но это довольно медленно. Общее время для 500x составляет 22 минуты... поэтому 10000x раз займет у меня 7 часов...
Мой скрипт Python использует dask.delayed
Код: Выделить всё
import pandas as pd
import numpy as np
from Bio import SeqIO
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn import metrics
from sklearn.metrics import confusion_matrix
import dask
from dask.distributed import Client
from dask_mpi import initialize
import gc
import time
start = time.time()
print(start)
# CONNECT TO DISTRIBUTED CLUSTER
initialize()
client = Client()
# THIS FUNCTION IS PARALLELIZED ACROSS MULTIPLE WORKERS
@dask.delayed(nout=2)
def run_the_classifier(data_df_merged_balanced):
data_df_merged_balanced = data_df_merged_balanced.groupby('Ecosystem_Type').sample(n=127)
# Import train_test_split function
X=data_df_merged_balanced.iloc[:,0:5787] # Features
y=data_df_merged_balanced['Ecosystem_Type'] # Labels
y = pd.get_dummies(y, dtype=int)
# Split dataset into training set and test set
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, stratify=y) # 80% training and 20% test
# Create a Gaussian Classifier
clf=RandomForestClassifier(n_estimators=10000,max_depth=5, class_weight='balanced_subsample')
#Train the model using the training sets y_pred=clf.predict(X_test)
clf.fit(X_train,y_train)
# test the model using the 30% test dataset
y_pred=clf.predict(X_test)
# get accuracy
accuracy = metrics.accuracy_score(y_test, y_pred)
feature_imp = pd.Series(clf.feature_importances_,index=data_df_numerical.columns).sort_values(ascending=False)
return accuracy, feature_imp
#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# A bunch of dataframe operations are here that produce data_df_merged_balanced - that is passed to the function
#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
accuracy_dict = {}
importance_dict = {}
if __name__ == '__main__':
for i in range(0, 500):
accuracy, feature_imp = run_the_classifier(data_df_merged_balanced)
accuracy_dict[i] = accuracy
importance_dict[i] = feature_imp
# client.run(gc.collect)
print(i)
time_sent = time.time()
# print(time_sent)
result_accuracy_dict = dask.compute(accuracy_dict)[0]
result_imp_dict = dask.compute(importance_dict)[0]
time_after_computing = time.time()
print(f"elapsed time to compute {i+1}: {time_after_computing - time_sent}")
df_importances = pd.DataFrame.from_dict(result_imp_dict)
df_importances_mean = df_importances.mean(axis=1).sort_values(ascending=False)
df_importances_mean.to_csv('averaged_feature_importances_500.csv')
print(df_importances_mean.head(n=20))
mean_accuracy = np.array(list(result_accuracy_dict.values())).mean()
mean_accuracy_ser = pd.Series(result_accuracy_dict)
mean_accuracy_ser.to_csv('accuracy_scores_500.csv')
end = time.time()
print(f"total elapsed time: {end - start}")
print(mean_accuracy)
client.close()
Скрипт Slurm, используемый для отправки скрипт Python
Это мой скрипт Slurm, который я использую для отправки скрипта Python. Я использую 15 рабочих и 200G — если я увеличу количество рабочих, я получу много предупреждений о «неуправляемости или утечке памяти» или предупреждений о сборе мусора. Я также получаю предупреждения о сборке мусора, когда запускаю более 1000 последовательностей.
Код: Выделить всё
#!/bin/bash
#SBATCH --job-name=dask-py # Job name
#SBATCH --nodes=1 # Run all processes on a 10 node
#SBATCH --ntasks=15 # Number of tasks (MPI workers)
#SBATCH --cpus-per-task=1 # Number of CPU cores per task
#SBATCH --mem=200G # Job memory request
#SBATCH --partition=standard,short,long,fat
#SBATCH --output=dask-py%j.log # Standard output and error log
#SBATCH --error=dask-py%j.err # Standard output and error log
echo "Running Dask-MPI"
module load mpi/openmpi/4.1.1
source conda.sh
conda activate working-env # Activate conda environment
mpirun --oversubscribe -np 15 python3 RF_model_biome_predict_loop.py
'''
# mpirun -np argument needs 2 more workers than what you want for your script
# np rank 0 is for the scheduler
# np rank 1 is for python client process (distribution)
# remaining are workers for your script
'''
Код: Выделить всё
2024-10-21 17:16:21,239 - distributed.worker.memory - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distributed.dask.org/en/latest/worker-memory.html#memory-not-released-back-to-the-os for more information. -- Unmanaged
memory: 10.06 GiB -- Worker memory limit: 12.50 GiB
Код: Выделить всё
2024-10-22 12:59:46,569 - distributed.utils_perf - INFO - full garbage collection released 441.47 MiB from 44 reference cycles (threshold: 9.54 MiB)
Подробнее здесь: https://stackoverflow.com/questions/791 ... dictionary