У меня есть два кадра данных PySpark, df1 и df2, содержащие информацию о пользователях за 12 месяцев 2023 года. df1 содержит идентификатор пользователя и совокупный балл за каждый месяц, а df2 содержит идентификатор пользователя и его зарплату за каждый месяц.
Я пытаюсь определить три месяца подряд, когда как доход, так и совокупный балл снижались для каждого пользователя. В частности, я хочу найти случаи, когда доход и совокупный балл снизились за одни и те же три месяца.
Я уже пытался вычислить разницу между доходом за последовательные месяцы и совокупным баллом, определить последовательные уменьшается и соответствующим образом фильтрует кадры данных. Однако у меня возникли проблемы с получением правильных результатов.
Вот подход, который я опробовал на данный момент:
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import lag, col, when, sum as spark_sum
# Initialize Spark session
spark = SparkSession.builder \
.appName("Consecutive Decrease Analysis") \
.getOrCreate()
# Assuming your DataFrames are named 'df1' and 'df2'
# Define the window specification for both DataFrames
windowSpec1 = Window.partitionBy("user_id").orderBy("month")
windowSpec2 = Window.partitionBy("user_id").orderBy("month")
# Calculate the differences between consecutive months' income and composite scores for both DataFrames
df1 = df1.withColumn("prev_income", lag(col("salary")).over(windowSpec1))
df1 = df1.withColumn("income_diff", col("salary") - col("prev_income"))
df2 = df2.withColumn("prev_score", lag(col("comp_score")).over(windowSpec2))
df2 = df2.withColumn("score_diff", col("comp_score") - col("prev_score"))
# Define the number of consecutive months for the analysis
num_consecutive_months = 3
# Calculate flags indicating if income and score decreased or not for both DataFrames
df1 = df1.withColumn("income_decrease_flag", when(col("income_diff") < 0, 1).otherwise(0))
df2 = df2.withColumn("score_decrease_flag", when(col("score_diff") < 0, 1).otherwise(0))
# Use window functions to count consecutive decreases for both income and score
df1 = df1.withColumn("consecutive_income_decreases",
spark_sum(col("income_decrease_flag")).over(windowSpec1.rowsBetween(-num_consecutive_months + 1, Window.currentRow)))
df2 = df2.withColumn("consecutive_score_decreases",
spark_sum(col("score_decrease_flag")).over(windowSpec2.rowsBetween(-num_consecutive_months + 1, Window.currentRow)))
# Filter the DataFrame to select users with three consecutive months of both income and score decrease
consecutive_decrease_users = df1.filter((col("consecutive_income_decreases") == num_consecutive_months - 1) &
(col("consecutive_score_decreases") == num_consecutive_months - 1)) \
.select("user_id", "month")
# Show the users and months where both income and score decreased consecutively
consecutive_decrease_users.show()
Подробнее здесь: https://stackoverflow.com/questions/784 ... e-score-in
Выявление трех последовательных месяцев снижения дохода и совокупного балла в PySpark ⇐ Python
-
- Похожие темы
- Ответы
- Просмотры
- Последнее сообщение
-
-
Сортировать массив названий месяцев из трех букв в хронологическом порядке [дубликат]
Anonymous » » в форуме Php - 0 Ответы
- 21 Просмотры
-
Последнее сообщение Anonymous
-