Я работаю над проектом по быстрому тестированию тысяч результирующих переменных на стандартном наборе предикторов и ковариат с использованием полярных переменных. Он работает очень хорошо, с ускорением в 16 раз по сравнению с сопоставимым пакетом в R. Мой вопрос заключается в том, как лучше всего сочетать пользовательскую функцию моделирования с полярами для использования многопоточности в полярных и научных пакетах Python (numpy, scipy, sklearn и т. д.)? Это моя текущая реализация
# This collects all the common operations that need to be done
# for all predictors/dependents
lf = lf.collect().lazy()
result_lazyframes = []
for predictor in config.predictor_columns:
for dependent in config.dependent_columns:
logger.trace(f"Analyzing predictor '{predictor}' with dependent '{dependent}'.")
# Placeholder for actual analysis logic
result_lazyframe = perform_analysis(lf, predictor, dependent, config)
if result_lazyframe is not None:
result_lazyframes.append(result_lazyframe)
# Store or log results as needed
if not result_lazyframes:
logger.error("No valid analyses were performed. Please check your configuration and data.")
return pl.DataFrame()
# Collect in batches with progress
batch_size = min(100, max(10, num_groups // 10))
all_results = []
for i in range(0, len(result_lazyframes), batch_size):
batch = result_lazyframes[i : i + batch_size]
results = pl.collect_all(batch)
all_results.extend(results)
completed = min(i + batch_size, len(result_lazyframes))
def perform_analysis(
lf: pl.LazyFrame, predictor: str, dependent: str, config: MASConfig
) -> pl.LazyFrame:
"""Perform the actual analysis for a given predictor and dependent variable"""
# Select only the relevant columns and drop missing values in the predictor and dependent
columns = [predictor, dependent, *config.covariate_columns]
analysis_lf = lf.select(columns)
model_func = partial(_run_association, predictor=predictor, dependent=dependent, config=config)
expected_schema = _get_schema(config)
result_lf = (
analysis_lf
.select(pl.struct(columns).alias("association_struct"))
.select(
pl.col("association_struct")
.map_batches(model_func, returns_scalar=True, return_dtype=expected_schema)
.alias("result")
)
)
return result_lf
Внутри model_func имеется защита threadpool_limits из библиотеки threadpoolctl, позволяющая ограничить количество потоков, которые могут использовать научные пакеты Python. Хорошая ли это система или можно ее улучшить?
Я работаю над проектом по быстрому тестированию тысяч результирующих переменных на стандартном наборе предикторов и ковариат с использованием полярных переменных. Он работает очень хорошо, с ускорением в 16 раз по сравнению с сопоставимым пакетом в R. Мой вопрос заключается в том, как лучше всего сочетать пользовательскую функцию моделирования с полярами для использования многопоточности в полярных и научных пакетах Python (numpy, scipy, sklearn и т. д.)? Это моя текущая реализация [code]# This collects all the common operations that need to be done # for all predictors/dependents lf = lf.collect().lazy() result_lazyframes = [] for predictor in config.predictor_columns: for dependent in config.dependent_columns: logger.trace(f"Analyzing predictor '{predictor}' with dependent '{dependent}'.") # Placeholder for actual analysis logic result_lazyframe = perform_analysis(lf, predictor, dependent, config) if result_lazyframe is not None: result_lazyframes.append(result_lazyframe) # Store or log results as needed if not result_lazyframes: logger.error("No valid analyses were performed. Please check your configuration and data.") return pl.DataFrame() # Collect in batches with progress batch_size = min(100, max(10, num_groups // 10)) all_results = [] for i in range(0, len(result_lazyframes), batch_size): batch = result_lazyframes[i : i + batch_size] results = pl.collect_all(batch) all_results.extend(results) completed = min(i + batch_size, len(result_lazyframes))
def perform_analysis( lf: pl.LazyFrame, predictor: str, dependent: str, config: MASConfig ) -> pl.LazyFrame: """Perform the actual analysis for a given predictor and dependent variable""" # Select only the relevant columns and drop missing values in the predictor and dependent columns = [predictor, dependent, *config.covariate_columns] analysis_lf = lf.select(columns) model_func = partial(_run_association, predictor=predictor, dependent=dependent, config=config) expected_schema = _get_schema(config) result_lf = ( analysis_lf .select(pl.struct(columns).alias("association_struct")) .select( pl.col("association_struct") .map_batches(model_func, returns_scalar=True, return_dtype=expected_schema) .alias("result") ) ) return result_lf [/code] Внутри model_func имеется защита threadpool_limits из библиотеки threadpoolctl, позволяющая ограничить количество потоков, которые могут использовать научные пакеты Python. Хорошая ли это система или можно ее улучшить?