Добавление дополнительных данных заставляет Pandas UDF вернуть ошибкуPython

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 Добавление дополнительных данных заставляет Pandas UDF вернуть ошибку

Сообщение Anonymous »

Я новичок в Pyspark, и я создал Pandas UDF. Цель этого UDF - принять серию и применить модель ML. У меня есть данные, в которых есть эти столбцы:
id, models_name, цены, units_sold, дата .....
ID - это в основном идентификатор продукта и является строкой. models_name - это LR, XGBOOST, Sarimax и т. Д. Цены и UNITS_SOLD - это времена. grouped_spark_df = spark_df.groupBy("ID").agg(
F.collect_list("prices").alias("prices"),
F.collect_list("units_sold").alias("units_sold"),
F.collect_list("date").alias("date"))

output:



ID
prices
units_sold
date
< /tr>
< /thead>


p_1 < /td>
td>>110,5, 11,0, 9,8] < /td>
[2, 3, 1] < /td>
[2024-01. "2024-03-02", "2024-03-03"] < /td>
< /tr>

p_2 < /td>
td>>15.0, 16.2] < /td>
[5, 7] < /td> /> ["2024-03-01", "2024-03-02"] < /td>
< /tr>

p_3 < /td>
>
p_3 < /td>
< /td>
["2024-03-01"] < /td>
< /tr>
< /tbody>
< /table> < /div>
Затем я добавляю модели_name на основе списка моделей < /p>
exploded_df = grouped_spark_df.withColumn("models_name", explode(array([lit(m) for m in models_list])))
< /code>



id < /th>
цены < /th>
units_sold < /th>
< /th> < /br /> models_mame_mame_mame_mam /> < /tr>
< /thead>


p_1 < /td>
td>>110,5, 11,0, 9,8] < /td>
[2, 3, 1] < /td>
[2024-01. "2024-03-02", "2024-03-03"] < /td>
a < /td>
< /tr>

p_1 < /td>
*. 1] < /td>
["2024-03-01", "2024-03-02", "2024-03-03"] < /td>
b < /td>
< /tr>

p_1 < /td>


p_1 < /td>


p_1 < /td> 11.0, 9.8] < /td>
[2, 3, 1] < /td>
["2024-03-01", "2024-03-02", "2024-03-03"] < /td>
c < /td>

/> p_2 < /td>
тнад. />
P_2
[15.0, 16.2]
[5, 7]
["2024-03-01", "2024-03-02"]
B
< /tr>

p_2 < /td>
n15.0, 16.2] < /td>
[5, 7] < /td>
["2024-03-01", "2024-03-02"] < /"Br /Br /Br. /> c < /td>
< /tr>

p_3 < /td>
td> nember7.5§/td>> [10] < /td>
["2024-03-01"] < /td>
["2024-03-01"] < /td> /> a < /td>
< /tr>

p_3 < /td>
td>>n,5,5§/td>> [10] < /td>
["2024-03-01"] < /td>
["2024-03-01"] < /td> /> b < /td>
< /tr>

p_3 < /td>
td>> gr /5σ,/td>> [10] < /td>
["2024-03-01"] < /td>
["2024-03-01"] < /td> /> c < /td>
< /tr>
< /tbody>
< /table> < /div>
Затем я называю свои панды UDF, как: < /p>
partitioned_df = exploded_df.repartition(50, "Bucket","ID","models_name").coalesce(10)
results_spark = partitioned_df.select(
col("ID"),
col("models_name"),
train_model_udf(
col("ID"), col("prices"), col("units_sold"), col("date")

).alias("model_metrics")
)
< /code>
Вот мой панд udf:
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import maptype, stringtype, floattype, arraytype
import pandas как pd
from skle train_test_split < /p>
def create_broadcast_and_udf (feat_cols, model_results_data, feature_set_name, target, dt_string):
#вещательные переменные ....
@pandas_udf (stringtype ()) < /p>
def train_model_udf(
ID, prices, units_sold, date, models_name):

# # Convert to Pandas DataFrame

data = pd.DataFrame({
"ID": ID, "prices": prices, "units_sold": units_sold, "date": date,
"models_name": models_name
})
for col in data.columns:
data[col] = data[col].apply(lambda x: x.tolist() if isinstance(x, np.ndarray) else x)

explode_cols = [col for col in data.columns if col != "ID" and isinstance(data[col][0], list)]

# Explode all selected columns
data = data.explode(explode_cols, ignore_index=True)

try:

new_data = []
ID = str(data["ID"].iloc[0])
# return pd.Series([models_name] * len(ID))
model_directory = f'{model_results_data}/{dt_string}/{models_name}/{feature_set_name}_{target}'
if not os.path.exists(model_directory): # checking if model directory exists or not
os.makedirs(model_directory)

train, test = train_test_split(data, test_size=0.2, shuffle=False)
combined_data = pd.concat([train, test]).sort_values(by=['ID', 'WEEK'])

data.to_csv(f"{model_directory}/{ID}_ID_CombinedData.csv", index=False)
model_write_path = f"{model_directory}/{ID}.pkl"

# training models
if models_name.lower() =='sarimax':
elasticity, y_train_pred, forecast ,mape_hj,wmape_hj,rmse,mse,mae = train_sarimax(train,test,model_write_path,price_feature,feat_cols,target,elasticity_method)
elif models_name.lower() =='lr':
elasticity, y_train_pred, forecast ,mape_hj,wmape_hj,rmse,mse,mae = train_lr(train,test,model_write_path,price_feature,broadcast_features.value,target)
elif models_name.lower() =='xgboost':
elasticity, y_train_pred, forecast ,mape_hj,wmape_hj,rmse,mse,mae = train_xgb(train,test,model_write_path,price_feature,feat_cols,target)
elif models_name.lower() =='gam':
spline_features = feat_cols.copy()
elasticity, y_train_pred, forecast ,mape_hj,wmape_hj,rmse,mse,mae = train_gam(data,test,ID,spline_features,price_feature,target,model_write_path)
print(price_feature)
print(target)
temp_df = data.copy()
predicted_volumes = np.concatenate((y_train_pred, forecast))
# Add new columns to the copy
temp_df['Experiment_Model'] = models_name
temp_df['feat_cols'] = "-".join(feat_cols)
temp_df['MAPE'] = mape_hj
temp_df['WMAPE'] = wmape_hj
temp_df['final_elasticity'] = elasticity
temp_df['predicted_volume'] = predicted_volumes
temp_df = temp_df.reset_index()
# Append the rows of the modified DataFrame to the list
new_data.extend(temp_df.to_dict('records'))

# calculation percent price for calculating simulated volume
test['pct_change_pr'] = test['updated_WAP'].pct_change()

# Initializing list to store simulated volumes
simulated_volumes = []

# Start with the first available volume in the test set as the initial simulated volume
initial_volume = test[target].iloc[0]
simulated_volumes.append(initial_volume)

# Iterate over the test set, starting from the second row
for i in range(1, len(test)):
previous_simulated_volume = simulated_volumes[-1] # Get the last simulated volume
pct_change_pr = test['pct_change_pr'].iloc # Current row's price change

# Calculate the new simulated volume based on the previous simulated volume
simulated_volume = previous_simulated_volume + previous_simulated_volume * (elasticity * pct_change_pr)

# Append the calculated volume to the list
simulated_volumes.append(simulated_volume)

# Add the simulated volumes to the test set as a new column
test['simulated_volume'] = simulated_volumes
test['elasticity'] = elasticity

# saving file to check if simulated volume is correctly calculated
test[['ID','updated_WAP','pct_change_pr',target,'elasticity','simulated_volume']].to_csv(f"{model_directory}/{ID}_simulated.csv")

# for plotting the graphs
temp_test_df = pd.DataFrame()

temp_test_df['date']= test[target].index

temp_test_df['actual']= test[target].values
temp_test_df['predicted']= forecast
temp_test_df['ppg_name']= ID
temp_test_df['features used']= [feat_cols]* len(temp_test_df)
temp_test_df['rmse']= rmse
temp_test_df['mse']= mse
# temp_test_df['mae']= mae
temp_test_df['mape']= mape_hj
temp_test_df['wmape']= wmape_hj
temp_test_df['elasticity']= elasticity
temp_test_df['point_wise_error'] = [abs(g - p) / g * 100 if g != 0 else 0 for g, p in zip(test[target], forecast)]
test = test.reset_index(drop=True)

temp_test_df['simulated_volume'] = test['simulated_volume']

train_temp_df = pd.DataFrame()
train_temp_df['date'] = train.index
train_temp_df['actual'] = train[target].values
train_temp_df['predicted'] = y_train_pred
train_temp_df['point_wise_error'] = [abs(g - p) / g * 100 if g != 0 else 0 for g, p in zip(train[target].values, y_train_pred)]
temp_test_df['set'] = 'Test'
train_temp_df['set'] = 'Train'

combined_df = pd.concat([train_temp_df, temp_test_df])

combined_df = combined_df.reset_index()
data = data.reset_index()
combined_df[price_feature] = data[price_feature]
combined_df.to_csv(f'{model_directory}/{ID}_ttPred.csv', index=False)
row_to_add = {
'Experiment':models_name,
'feat_cols':('-').join(feat_cols),
'ID': ID,
'elasticity':elasticity,
'mape':mape_hj,
'wmape':wmape_hj,
'rmse': rmse,
'mse': mse,
'Train':'MODEL TRAINED ON: '+str(len(data))}
except Exception as e:
print(f"Excpetion:-- {str(e)} --:in ID {data}")

# print(data[data['ID']==ppg_unique[combined_id]])
row_to_add = {
'Experiment':models_name,
'feat_cols':('-').join(feat_cols),
'ID': ID,
'elasticity':0,
'mape':0,
'wmape':0,
'rmse': 0,
'mse': 0,
'Train':f"Model not trained error with exception: {str(e)},,, {data}"}

final_df = pd.DataFrame(columns=['Experiment','feat_cols', 'ID','elasticity','mape','wmape','rmse', 'mse','Train'])
new_df = pd.DataFrame([row_to_add], columns=final_df.columns)
filePath = f'{model_directory}/Elasticities_{ID}.csv'
new_df.to_csv(filePath, index = False)
new_df = new_df.astype(str)
result_dicts = new_df.to_dict(orient="records")

return pd.Series(str(result_dicts))

return broadcast_features, train_model_udf
< /code>
Мой код работает нормально для некоторых идентификаторов, но когда я масштабирую его до 1600 идентификаторов, это дает мне эту ошибку:
Проблема не является самим номером 1600. Даже когда я запускаю его на 20 идентификаторах, это дает ошибку. Я включил в шорт -лист один идентификатор, и когда я правильно перераспределял данные, это помогло решить проблему для этого идентификатора, но когда я снова запускаю все идентификаторы, я все еще получаю ошибку. Скорость данных больше не является проблемой, так как я убедился, что мои данные равномерно распределены.
Длина списков зависит. Для некоторых идентификаторов это 41, в течение 65. Я попытался отфильтировать только те, которые меньше 41 длины, но все еще та же проблема. < /P>
pyspark.errors.exceptions.base.PySparkRuntimeError: [SCHEMA_MISMATCH_FOR_PANDAS_UDF] Result vector from pandas_udf was not the required length: expected 21, got 1.


Подробнее здесь: https://stackoverflow.com/questions/795 ... turn-error
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение
  • Добавление дополнительных данных заставляет Pandas UDF вернуть ошибку
    Anonymous » » в форуме Python
    0 Ответы
    5 Просмотры
    Последнее сообщение Anonymous
  • Как вернуть сложную структуру из UDF Pandas в PySpark?
    Anonymous » » в форуме Python
    0 Ответы
    12 Просмотры
    Последнее сообщение Anonymous
  • Как вернуть сложную структуру из UDF Pandas в PySpark?
    Anonymous » » в форуме Python
    0 Ответы
    14 Просмотры
    Последнее сообщение Anonymous
  • Как вернуть сложную структуру из UDF Pandas в PySpark?
    Anonymous » » в форуме Python
    0 Ответы
    18 Просмотры
    Последнее сообщение Anonymous
  • Как подвести итог дополнительных сборов дополнительных идентификаторов продукта в корзине Woocommerce
    Anonymous » » в форуме Php
    0 Ответы
    30 Просмотры
    Последнее сообщение Anonymous

Вернуться в «Python»