Выявление трех последовательных месяцев снижения дохода и совокупного балла в PySparkPython

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 Выявление трех последовательных месяцев снижения дохода и совокупного балла в PySpark

Сообщение Anonymous »

У меня есть два кадра данных PySpark, df1 и df2, содержащие информацию о пользователях за 12 месяцев 2023 года. df1 содержит идентификатор пользователя и совокупный балл за каждый месяц, а df2 содержит идентификатор пользователя и его зарплату за каждый месяц.
Я пытаюсь определить три месяца подряд, когда как доход, так и совокупный балл снижались для каждого пользователя. В частности, я хочу найти случаи, когда доход и совокупный балл снизились за одни и те же три месяца.
Я уже пытался вычислить разницу между доходом за последовательные месяцы и совокупным баллом, определить последовательные уменьшается и соответствующим образом фильтрует кадры данных. Однако у меня возникли проблемы с получением правильных результатов.
Вот подход, который я опробовал на данный момент:
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import lag, col, when, sum as spark_sum

# Initialize Spark session
spark = SparkSession.builder \
.appName("Consecutive Decrease Analysis") \
.getOrCreate()

# Assuming your DataFrames are named 'df1' and 'df2'

# Define the window specification for both DataFrames
windowSpec1 = Window.partitionBy("user_id").orderBy("month")
windowSpec2 = Window.partitionBy("user_id").orderBy("month")

# Calculate the differences between consecutive months' income and composite scores for both DataFrames
df1 = df1.withColumn("prev_income", lag(col("salary")).over(windowSpec1))
df1 = df1.withColumn("income_diff", col("salary") - col("prev_income"))
df2 = df2.withColumn("prev_score", lag(col("comp_score")).over(windowSpec2))
df2 = df2.withColumn("score_diff", col("comp_score") - col("prev_score"))

# Define the number of consecutive months for the analysis
num_consecutive_months = 3

# Calculate flags indicating if income and score decreased or not for both DataFrames
df1 = df1.withColumn("income_decrease_flag", when(col("income_diff") < 0, 1).otherwise(0))
df2 = df2.withColumn("score_decrease_flag", when(col("score_diff") < 0, 1).otherwise(0))

# Use window functions to count consecutive decreases for both income and score
df1 = df1.withColumn("consecutive_income_decreases",
spark_sum(col("income_decrease_flag")).over(windowSpec1.rowsBetween(-num_consecutive_months + 1, Window.currentRow)))
df2 = df2.withColumn("consecutive_score_decreases",
spark_sum(col("score_decrease_flag")).over(windowSpec2.rowsBetween(-num_consecutive_months + 1, Window.currentRow)))

# Filter the DataFrame to select users with three consecutive months of both income and score decrease
consecutive_decrease_users = df1.filter((col("consecutive_income_decreases") == num_consecutive_months - 1) &
(col("consecutive_score_decreases") == num_consecutive_months - 1)) \
.select("user_id", "month")

# Show the users and months where both income and score decreased consecutively
consecutive_decrease_users.show()


Подробнее здесь: https://stackoverflow.com/questions/784 ... e-score-in
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «Python»