Как сделать линейную интерполяцию в Pyspark без Pandas UDF (только с использованием Spark API)?Python

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 Как сделать линейную интерполяцию в Pyspark без Pandas UDF (только с использованием Spark API)?

Сообщение Anonymous »

I have a Spark DataFrame with the following structure:



shock_rule_id
DATE
value

< /thead>


a < /td>
2024-01-01 < /td>
100 < /td>
< /tr>

< /td>
/> 2024-01-02 < /td>
null < /td>
< /tr>

< /td>
2024-01-03 < /td>
130

130

/> b < /td>
2024-01-01 < /td>
50 < /td>
< /tr>

b < /td>
2024-01-02 < /td>

2024-01-02 < /td>

2024-01-02 /> < /tr>

b < /td>
2024-01-03 < /td>
null < /td>
< /tr>

b < /td>
2024
b < /td>
2024 b < /td>
2024
b < /td>
2024
b < /td>
2024
b < /td>
2024 80 < /td>
< /tr>
< /tbody>
< /table> < /div>
Я хочу выполнить линейную интерполяцию значения столбца в каждой группе Shock_rule_id. Функции.# Row numbers to simulate index positions
df_pos = (
result_df
.withColumn("row_num", row_number().over(w))
.withColumn("prev_value", last("value", ignorenulls=True).over(w))
.withColumn("prev_row", last("row_num", ignorenulls=True).over(w))
.withColumn("next_value", first("value", ignorenulls=True).over(w.rowsBetween(0, Window.unboundedFollowing)))
.withColumn("next_row", first("row_num", ignorenulls=True).over(w.rowsBetween(0, Window.unboundedFollowing)))
)

df_interp = (
df_pos.withColumn(
"interpolated_value",
when(
col("value").isNotNull(), col("value")
).otherwise(
col("prev_value")
+ (col("next_value") - col("prev_value"))
* ((col("row_num") - col("prev_row"))
/ when((col("next_row") - col("prev_row")) == 0, 1)
.otherwise(col("next_row") - col("prev_row")))
)
)
)

# Final result
result = df_interp.select("shock_rule_id", "DATE", "interpolated_value")

< /code>
Но вывод не соответствует моим ожиданиям, я имею в виду не совпадать с выходом Pandas udf < /p>
def interpolate(pdf):
pdf = pdf.sort_values('DATE')
# if pdf['InterpolationType'].iloc[0] == 'linear':
pdf['value'] = pdf['value'].interpolate(method='linear')
pdf['shock_rule_id'] = pdf['shock_rule_id'].astype(int)
pdf['DATE'] = pd.to_datetime(pdf['DATE']) # Ensure DATE is datetime
return pdf[['shock_rule_id', 'DATE', 'value']] # Only necessary columns

# Interpolate only where needed
result_interpolated = df_to_interpolate.groupby('shock_rule_id').applyInPandas(
interpolate, schema="shock_rule_id int, DATE date, value double"
)

# Union with groups that had no missing values
result = result_interpolated.unionByName(df_no_missing.select('shock_rule_id', 'DATE', 'value'))


Подробнее здесь: https://stackoverflow.com/questions/797 ... ng-spark-a
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение
  • Как создать наиболее эффективную линейную интерполяцию в С++?
    Anonymous » » в форуме C++
    0 Ответы
    35 Просмотры
    Последнее сообщение Anonymous
  • Как реализовать линейную интерполяцию?
    Anonymous » » в форуме Python
    0 Ответы
    8 Просмотры
    Последнее сообщение Anonymous
  • Apache Spark (pyspark) — преобразование двоичного файла в str (UUID) без UDF
    Anonymous » » в форуме Python
    0 Ответы
    34 Просмотры
    Последнее сообщение Anonymous
  • Spark/pyspark в той же версии, но «py4j.Py4JException: конструктор org.apache.spark.api.python.PythonFunction не существ
    Anonymous » » в форуме Python
    0 Ответы
    68 Просмотры
    Последнее сообщение Anonymous
  • Оболочка Spark: spark.executor.extraJavaOptions не разрешено устанавливать параметры Spark.
    Anonymous » » в форуме Python
    0 Ответы
    54 Просмотры
    Последнее сообщение Anonymous

Вернуться в «Python»