Прогнозирование отмены подписки на телекоммуникационные услуги с использованием PySpark и нейронных сетей
Этот блокнот создает конвейер машинного обучения в PySpark, позволяющий прогнозировать, отменит ли клиент телекоммуникационной компании свою подписку. Данные загружаются из двух файлов CSV — одного для обучения и одного для тестирования — и сохраняются в Spark DataFrame.
Этапы предварительной обработки очищают данные, кодируя столбцы «Да/Нет» как 1/0, удаляя столбцы с расходами по счетам, которые представляют собой линейную комбинацию других функций, и переименовывая целевой столбец в метку. Поскольку набор данных несбалансирован (отмен гораздо меньше, чем неотмен), класс меньшинства подвергается избыточной выборке с использованием случайной повторной выборки с заменой, поэтому оба класса одинаково представлены во время обучения.
Затем признаки собираются в один вектор и нормализуются с помощью StandardScaler. Классификатор многослойного персептрона (нейронной сети) обучается внутри конвейера, который объединяет ассемблер, масштабатор и классификатор. Перекрестно проверенный поиск по сетке пробует четыре различные архитектуры скрытых слоев и выбирает ту, которая имеет лучший показатель F1, который уравновешивает точность и полноту, что делает его более значимым показателем, чем точность в задаче классификации. Архитектура, вес и оценка F1 лучшей модели распечатываются, а окончательная модель сохраняется на диск.
train_df = spark.read.csv('subscription_train.csv', header=True, inferSchema=True)
test_df = spark.read.csv('subscription_test.csv', header=True, inferSchema=True)
bool_cols = ['Roaming enabled', 'Voicemail enabled', 'Cancelled']
cols_to_drop = ['Region', 'District code', 'Daily billing total',
'Evening billing total', 'Night billing total', 'Roaming billing total']
def encode_bool_cols(df):
for c in bool_cols:
df = df.withColumn(c, F.when(F.col(c).isin('Yes', 'True'), 1).otherwise(0))
return df.drop(*cols_to_drop)
train_clean = encode_bool_cols(train_df)
test_clean = encode_bool_cols(test_df)
train_clean = train_clean.withColumnRenamed('Cancelled', 'label')
test_clean = test_clean.withColumnRenamed('Cancelled', 'label')
train_clean.show(5)
majority_df = train_clean.filter(F.col('label') == 0)
minority_df = train_clean.filter(F.col('label') == 1)
majority_count = majority_df.count()
minority_count = minority_df.count()
majority_count
fraction = (majority_count / minority_count) + 0.1
minority_resampled = minority_df.sample(withReplacement=True, fraction=fraction, seed=42) \
.limit(majority_count)
train_balanced = majority_df.union(minority_resampled)
total_balanced = train_balanced.count()
train_balanced.groupBy('label') \
.count() \
.withColumn('proportion', F.round(F.col('count') / total_balanced, 4)) \
.orderBy('label') \
.show()
from pyspark.ml.feature import VectorAssembler
feature_cols = [c for c in train_balanced.columns if c != 'label']
print("Feature columns:", feature_cols)
assembler = VectorAssembler(inputCols=feature_cols, outputCol='features_raw')
from pyspark.ml.feature import StandardScaler
scaler = StandardScaler(inputCol='features_raw', outputCol='features',
withMean=True, withStd=True)
from pyspark.ml.classification import MultilayerPerceptronClassifier
num_features = len(feature_cols)
layers = [num_features, 16, 2]
mlp = MultilayerPerceptronClassifier(
featuresCol='features', labelCol='label',
maxIter=100, layers=layers, blockSize=128, seed=42
)
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[assembler, scaler, mlp])
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
param_grid = ParamGridBuilder() \
.addGrid(mlp.layers, [
[num_features, 8, 2],
[num_features, 16, 2],
[num_features, 32, 2],
[num_features, 16, 8, 2]
]).build()
evaluator = MulticlassClassificationEvaluator(
labelCol='label', predictionCol='prediction', metricName='f1')
cv = CrossValidator(
estimator=pipeline, estimatorParamMaps=param_grid,
evaluator=evaluator, numFolds=3, seed=42)
cv_model = cv.fit(train_balanced)
best_mlp = cv_model.bestModel.stages[-1]
print("Optimal layers (nodes per layer):", best_mlp.getLayers())
print("Number of layers:", len(best_mlp.getLayers()))
best_mlp.weights)
test_predictions = cv_model.transform(test_clean)
f1_score = evaluator.evaluate(test_predictions)
print(f"Test set F1 score: {f1_score:.4f}")
cv_model.bestModel.save('subscription_mlp_model')
Подробнее здесь: https://stackoverflow.com/questions/798 ... rk-telcoms
Комплексное машинное обучение с помощью Spark (телекоммуникации) ⇐ Python
Программы на Python
-
Anonymous
1771995970
Anonymous
[b]Прогнозирование отмены подписки на телекоммуникационные услуги с использованием PySpark и нейронных сетей[/b]
Этот блокнот создает конвейер машинного обучения в PySpark, позволяющий прогнозировать, отменит ли клиент телекоммуникационной компании свою подписку. Данные загружаются из двух файлов CSV — одного для обучения и одного для тестирования — и сохраняются в Spark DataFrame.
Этапы предварительной обработки очищают данные, кодируя столбцы «Да/Нет» как 1/0, удаляя столбцы с расходами по счетам, которые представляют собой линейную комбинацию других функций, и переименовывая целевой столбец в метку. Поскольку набор данных несбалансирован (отмен гораздо меньше, чем неотмен), класс меньшинства подвергается избыточной выборке с использованием случайной повторной выборки с заменой, поэтому оба класса одинаково представлены во время обучения.
Затем признаки собираются в один вектор и нормализуются с помощью StandardScaler. Классификатор многослойного персептрона (нейронной сети) обучается внутри конвейера, который объединяет ассемблер, масштабатор и классификатор. Перекрестно проверенный поиск по сетке пробует четыре различные архитектуры скрытых слоев и выбирает ту, которая имеет лучший показатель F1, который уравновешивает точность и полноту, что делает его более значимым показателем, чем точность в задаче классификации. Архитектура, вес и оценка F1 лучшей модели распечатываются, а окончательная модель сохраняется на диск.
train_df = spark.read.csv('subscription_train.csv', header=True, inferSchema=True)
test_df = spark.read.csv('subscription_test.csv', header=True, inferSchema=True)
bool_cols = ['Roaming enabled', 'Voicemail enabled', 'Cancelled']
cols_to_drop = ['Region', 'District code', 'Daily billing total',
'Evening billing total', 'Night billing total', 'Roaming billing total']
def encode_bool_cols(df):
for c in bool_cols:
df = df.withColumn(c, F.when(F.col(c).isin('Yes', 'True'), 1).otherwise(0))
return df.drop(*cols_to_drop)
train_clean = encode_bool_cols(train_df)
test_clean = encode_bool_cols(test_df)
train_clean = train_clean.withColumnRenamed('Cancelled', 'label')
test_clean = test_clean.withColumnRenamed('Cancelled', 'label')
train_clean.show(5)
majority_df = train_clean.filter(F.col('label') == 0)
minority_df = train_clean.filter(F.col('label') == 1)
majority_count = majority_df.count()
minority_count = minority_df.count()
majority_count
fraction = (majority_count / minority_count) + 0.1
minority_resampled = minority_df.sample(withReplacement=True, fraction=fraction, seed=42) \
.limit(majority_count)
train_balanced = majority_df.union(minority_resampled)
total_balanced = train_balanced.count()
train_balanced.groupBy('label') \
.count() \
.withColumn('proportion', F.round(F.col('count') / total_balanced, 4)) \
.orderBy('label') \
.show()
from pyspark.ml.feature import VectorAssembler
feature_cols = [c for c in train_balanced.columns if c != 'label']
print("Feature columns:", feature_cols)
assembler = VectorAssembler(inputCols=feature_cols, outputCol='features_raw')
from pyspark.ml.feature import StandardScaler
scaler = StandardScaler(inputCol='features_raw', outputCol='features',
withMean=True, withStd=True)
from pyspark.ml.classification import MultilayerPerceptronClassifier
num_features = len(feature_cols)
layers = [num_features, 16, 2]
mlp = MultilayerPerceptronClassifier(
featuresCol='features', labelCol='label',
maxIter=100, layers=layers, blockSize=128, seed=42
)
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[assembler, scaler, mlp])
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
param_grid = ParamGridBuilder() \
.addGrid(mlp.layers, [
[num_features, 8, 2],
[num_features, 16, 2],
[num_features, 32, 2],
[num_features, 16, 8, 2]
]).build()
evaluator = MulticlassClassificationEvaluator(
labelCol='label', predictionCol='prediction', metricName='f1')
cv = CrossValidator(
estimator=pipeline, estimatorParamMaps=param_grid,
evaluator=evaluator, numFolds=3, seed=42)
cv_model = cv.fit(train_balanced)
best_mlp = cv_model.bestModel.stages[-1]
print("Optimal layers (nodes per layer):", best_mlp.getLayers())
print("Number of layers:", len(best_mlp.getLayers()))
best_mlp.weights)
test_predictions = cv_model.transform(test_clean)
f1_score = evaluator.evaluate(test_predictions)
print(f"Test set F1 score: {f1_score:.4f}")
cv_model.bestModel.save('subscription_mlp_model')
Подробнее здесь: [url]https://stackoverflow.com/questions/79895899/end-to-end-ml-with-spark-telcoms[/url]
Ответить
1 сообщение
• Страница 1 из 1
Перейти
- Кемерово-IT
- ↳ Javascript
- ↳ C#
- ↳ JAVA
- ↳ Elasticsearch aggregation
- ↳ Python
- ↳ Php
- ↳ Android
- ↳ Html
- ↳ Jquery
- ↳ C++
- ↳ IOS
- ↳ CSS
- ↳ Excel
- ↳ Linux
- ↳ Apache
- ↳ MySql
- Детский мир
- Для души
- ↳ Музыкальные инструменты даром
- ↳ Печатная продукция даром
- Внешняя красота и здоровье
- ↳ Одежда и обувь для взрослых даром
- ↳ Товары для здоровья
- ↳ Физкультура и спорт
- Техника - даром!
- ↳ Автомобилистам
- ↳ Компьютерная техника
- ↳ Плиты: газовые и электрические
- ↳ Холодильники
- ↳ Стиральные машины
- ↳ Телевизоры
- ↳ Телефоны, смартфоны, плашеты
- ↳ Швейные машинки
- ↳ Прочая электроника и техника
- ↳ Фототехника
- Ремонт и интерьер
- ↳ Стройматериалы, инструмент
- ↳ Мебель и предметы интерьера даром
- ↳ Cантехника
- Другие темы
- ↳ Разное даром
- ↳ Давай меняться!
- ↳ Отдам\возьму за копеечку
- ↳ Работа и подработка в Кемерове
- ↳ Давай с тобой поговорим...
Мобильная версия