Код: Выделить всё
+-------+-------+
| level | value |
+-------+-------+
| 1 | 4 |
| 1 | 5 |
| 2 | 2 |
| 2 | 6 |
| 2 | 3 |
+-------+-------+
Код: Выделить всё
+-------+--------+-------+
| level | lable| value |
+-------+--------+-------+
| 1 | bb76 | 4 |
| 1 | bb76 | 5 |
| 2 | cv86 | 2 |
| 2 | cv86 | 6 |
| 2 | cv86 | 3 |
+-------+--------+-------+
Код: Выделить всё
+-------+-------+
| lable | value |
+-------+-------+
| bb76 | 9 |
| cv86 | 11 |
+-------+-------+
Код: Выделить всё
def create_objectid():
a = str(ObjectId())
return a
def add_lable(df):
df = df.cache()
df.count()
grouped_df = df.groupby('level').agg(sum(df.value).alias('temp'))
grouped_df = grouped_df.withColumnRenamed('level', 'level_temp')
grouped_df = grouped_df.withColumn('lable', udf_create_objectid())
grouped_df = grouped_df.drop('temp')
df = df.join(grouped_df.select('level_temp','lable'), col('level') == col('level_temp'), how="left").drop(grouped_df.level_temp)
return df
Я искал и обнаружил, что Spark Window имеет более высокую производительность. Затем я изменил последнюю функцию на эту. Поскольку pandas_udf нужен arg, поэтому я просто передаю его и печатаю:
Код: Выделить всё
@f.pandas_udf("string")
def create_objectid_on_window(v: pd.Series) -> str:
print('v:',v)
return str(ObjectId())
def add_lable(df):
w = Window.partitionBy('level')
df = df.withColumn('lable', create_objectid_on_window('level').over(w))
return df
Код: Выделить всё
AttributeError: 'NoneType' object has no attribute '_jvm'
Я прочитал этот вопрос и ответы; Я знаю, что это из-за функции UDF pandas. Как я могу это изменить?
Подробнее здесь: https://stackoverflow.com/questions/763 ... ame-window