Комплексное машинное обучение с помощью Spark (телекоммуникации)Python

Программы на Python
Ответить
Anonymous
 Комплексное машинное обучение с помощью Spark (телекоммуникации)

Сообщение Anonymous »

Прогнозирование отмены подписки на телекоммуникационные услуги с использованием PySpark и нейронных сетей
Этот блокнот создает конвейер машинного обучения в PySpark, позволяющий прогнозировать, отменит ли клиент телекоммуникационной компании свою подписку. Данные загружаются из двух файлов CSV — одного для обучения и одного для тестирования — и сохраняются в Spark DataFrame.
Этапы предварительной обработки очищают данные, кодируя столбцы «Да/Нет» как 1/0, удаляя столбцы с расходами по счетам, которые представляют собой линейную комбинацию других функций, и переименовывая целевой столбец в метку. Поскольку набор данных несбалансирован (отмен гораздо меньше, чем неотмен), класс меньшинства подвергается избыточной выборке с использованием случайной повторной выборки с заменой, поэтому оба класса одинаково представлены во время обучения.
Затем признаки собираются в один вектор и нормализуются с помощью StandardScaler. Классификатор многослойного персептрона (нейронной сети) обучается внутри конвейера, который объединяет ассемблер, масштабатор и классификатор. Перекрестно проверенный поиск по сетке пробует четыре различные архитектуры скрытых слоев и выбирает ту, которая имеет лучший показатель F1, который уравновешивает точность и полноту, что делает его более значимым показателем, чем точность в задаче классификации. Архитектура, вес и оценка F1 лучшей модели распечатываются, а окончательная модель сохраняется на диск.
train_df = spark.read.csv('subscription_train.csv', header=True, inferSchema=True)
test_df = spark.read.csv('subscription_test.csv', header=True, inferSchema=True)

bool_cols = ['Roaming enabled', 'Voicemail enabled', 'Cancelled']
cols_to_drop = ['Region', 'District code', 'Daily billing total',
'Evening billing total', 'Night billing total', 'Roaming billing total']

def encode_bool_cols(df):
for c in bool_cols:
df = df.withColumn(c, F.when(F.col(c).isin('Yes', 'True'), 1).otherwise(0))
return df.drop(*cols_to_drop)

train_clean = encode_bool_cols(train_df)
test_clean = encode_bool_cols(test_df)

train_clean = train_clean.withColumnRenamed('Cancelled', 'label')
test_clean = test_clean.withColumnRenamed('Cancelled', 'label')

train_clean.show(5)

majority_df = train_clean.filter(F.col('label') == 0)
minority_df = train_clean.filter(F.col('label') == 1)

majority_count = majority_df.count()
minority_count = minority_df.count()

majority_count
fraction = (majority_count / minority_count) + 0.1
minority_resampled = minority_df.sample(withReplacement=True, fraction=fraction, seed=42) \
.limit(majority_count)

train_balanced = majority_df.union(minority_resampled)

total_balanced = train_balanced.count()
train_balanced.groupBy('label') \
.count() \
.withColumn('proportion', F.round(F.col('count') / total_balanced, 4)) \
.orderBy('label') \
.show()

from pyspark.ml.feature import VectorAssembler

feature_cols = [c for c in train_balanced.columns if c != 'label']
print("Feature columns:", feature_cols)

assembler = VectorAssembler(inputCols=feature_cols, outputCol='features_raw')

from pyspark.ml.feature import StandardScaler

scaler = StandardScaler(inputCol='features_raw', outputCol='features',
withMean=True, withStd=True)

from pyspark.ml.classification import MultilayerPerceptronClassifier

num_features = len(feature_cols)
layers = [num_features, 16, 2]

mlp = MultilayerPerceptronClassifier(
featuresCol='features', labelCol='label',
maxIter=100, layers=layers, blockSize=128, seed=42
)

from pyspark.ml import Pipeline

pipeline = Pipeline(stages=[assembler, scaler, mlp])

from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

param_grid = ParamGridBuilder() \
.addGrid(mlp.layers, [
[num_features, 8, 2],
[num_features, 16, 2],
[num_features, 32, 2],
[num_features, 16, 8, 2]
]).build()

evaluator = MulticlassClassificationEvaluator(
labelCol='label', predictionCol='prediction', metricName='f1')

cv = CrossValidator(
estimator=pipeline, estimatorParamMaps=param_grid,
evaluator=evaluator, numFolds=3, seed=42)

cv_model = cv.fit(train_balanced)

best_mlp = cv_model.bestModel.stages[-1]

print("Optimal layers (nodes per layer):", best_mlp.getLayers())
print("Number of layers:", len(best_mlp.getLayers()))

best_mlp.weights)

test_predictions = cv_model.transform(test_clean)

f1_score = evaluator.evaluate(test_predictions)
print(f"Test set F1 score: {f1_score:.4f}")

cv_model.bestModel.save('subscription_mlp_model')


Подробнее здесь: https://stackoverflow.com/questions/798 ... rk-telcoms
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «Python»