Код: Выделить всё
# Group by ItemA and collect all ItemB values as a set
item_sets = df.groupby('ItemA').agg(collect_set('ItemB').alias('ItemB_set'))
# Repartition the dataframe to ensure even distribution of data
item_sets = item_sets.repartition(100)
# Cross join the sets with each other (thus, creating all pairs of ItemA)
cross_item_sets = item_sets.alias('i').crossJoin(item_sets.alias('j'))
# Calculate the intersection and union for each pair
def jaccard_similarity(row):
set_i = set(row['i']['ItemB_set'])
set_j = set(row['j']['ItemB_set'])
intersection_size = len(set_i.intersection(set_j))
union_size = len(set_i.union(set_j))
return Row(ItemA_i=row['i']['ItemA'], ItemA_j=row['j']['ItemA'], M_ij=intersection_size / union_size if union_size > 0 else 0)
# Apply the function
similarity_rdd = cross_item_sets.rdd.map(jaccard_similarity).repartition(200)
# Specify the schema for the dataframe
schema = StructType([
StructField("ItemA", StringType(), True),
StructField("ItemB", StringType(), True),
StructField("jaccard_sim", FloatType(), True)
])
# Convert the RDD back to Dataframe
similarity_df = spark.createDataFrame(similarity_rdd, schema)
# Show results
similarity_df.show(10, truncate=False)
Код: Выделить всё
Stages: Succeeded/Total --> 0/4
Tasks (for all stages): Succeeded/Total --> 0/10155 (14 running)
Я не могу увеличить размер кластера Spark предоставленные мне ресурсы.
Как мне запустить код?
Подробнее здесь: https://stackoverflow.com/questions/791 ... ge-dataset