I have a Spark DataFrame with the following structure:
shock_rule_id
DATE
value
< /thead>
a < /td>
2024-01-01 < /td>
100 < /td>
< /tr>
< /td>
/> 2024-01-02 < /td>
null < /td>
< /tr>
< /td>
2024-01-03 < /td>
130
130
/> b < /td>
2024-01-01 < /td>
50 < /td>
< /tr>
b < /td>
2024-01-02 < /td>
2024-01-02 < /td>
2024-01-02 /> < /tr>
b < /td>
2024-01-03 < /td>
null < /td>
< /tr>
b < /td>
2024
b < /td>
2024 b < /td>
2024
b < /td>
2024
b < /td>
2024
b < /td>
2024 80 < /td>
< /tr>
< /tbody>
< /table> < /div>
Я хочу выполнить линейную интерполяцию значения столбца в каждой группе Shock_rule_id. Функции.# Row numbers to simulate index positions
df_pos = (
result_df
.withColumn("row_num", row_number().over(w))
.withColumn("prev_value", last("value", ignorenulls=True).over(w))
.withColumn("prev_row", last("row_num", ignorenulls=True).over(w))
.withColumn("next_value", first("value", ignorenulls=True).over(w.rowsBetween(0, Window.unboundedFollowing)))
.withColumn("next_row", first("row_num", ignorenulls=True).over(w.rowsBetween(0, Window.unboundedFollowing)))
)
df_interp = (
df_pos.withColumn(
"interpolated_value",
when(
col("value").isNotNull(), col("value")
).otherwise(
col("prev_value")
+ (col("next_value") - col("prev_value"))
* ((col("row_num") - col("prev_row"))
/ when((col("next_row") - col("prev_row")) == 0, 1)
.otherwise(col("next_row") - col("prev_row")))
)
)
)
# Final result
result = df_interp.select("shock_rule_id", "DATE", "interpolated_value")
< /code>
Но вывод не соответствует моим ожиданиям, я имею в виду не совпадать с выходом Pandas udf < /p>
def interpolate(pdf):
pdf = pdf.sort_values('DATE')
# if pdf['InterpolationType'].iloc[0] == 'linear':
pdf['value'] = pdf['value'].interpolate(method='linear')
pdf['shock_rule_id'] = pdf['shock_rule_id'].astype(int)
pdf['DATE'] = pd.to_datetime(pdf['DATE']) # Ensure DATE is datetime
return pdf[['shock_rule_id', 'DATE', 'value']] # Only necessary columns
# Interpolate only where needed
result_interpolated = df_to_interpolate.groupby('shock_rule_id').applyInPandas(
interpolate, schema="shock_rule_id int, DATE date, value double"
)
# Union with groups that had no missing values
result = result_interpolated.unionByName(df_no_missing.select('shock_rule_id', 'DATE', 'value'))
Подробнее здесь: https://stackoverflow.com/questions/797 ... ng-spark-a
Как сделать линейную интерполяцию в Pyspark без Pandas UDF (только с использованием Spark API)? ⇐ Python
-
- Похожие темы
- Ответы
- Просмотры
- Последнее сообщение
-
-
Apache Spark (pyspark) — преобразование двоичного файла в str (UUID) без UDF
Anonymous » » в форуме Python - 0 Ответы
- 34 Просмотры
-
Последнее сообщение Anonymous
-