Pyspark создает логику сопряженияPython

Программы на Python
Ответить
Anonymous
 Pyspark создает логику сопряжения

Сообщение Anonymous »

Я работаю в Azure Synapse и привыкаю к ​​работе с pyspark. Я хочу создать логику сопоставления между строками в моем df, но не могу заставить ее работать. У меня есть столбец идентификатора и порядковый номер. Например:



ID
seqNum




100
3609


100
3610

< tr>
100
3616


100< /td>
3617


100
3622

< tr>
100
3623


100< /td>
3634


100
3642

< tr>
100
3443



Вот что должен вывести код:



ID
seqNum
pairID




100
3609
1


100
36101


100
3616
2


100
3617
2

100
3622
3


100
3623
3


1003634
Null


100
3642
4


100
34434



Строка с 3634 должна не быть парными, поскольку разница между порядковыми номерами должна быть равна единице.
I есть логика в Python, которая, кажется, работает, но тогда я не могу использовать возможности обработки из искры. Может ли кто-нибудь помочь мне создать логику в pyspark?

# window specification
windowSpec = Window.orderBy("BRIM_ZZ_APPL_TR_SEQ")

# Add prev and next sequence numbers
df = df.withColumn("prev_seq", lag("seqNum").over(windowSpec))
df = df.withColumn("next_seq", lead("seqNum").over(windowSpec))

# Add flags to indicate proximity
df = df.withColumn("diff_prev", col("ID") - col("prev_seq"))
df = df.withColumn("diff_next", col("next_seq") - col("seqNum"))

#make PairID
df = df.withColumn("PairID", lit(None).cast("int"))

# Assign PairID based on proximity logic
pair_id = 1
rows = df.collect() # Collect rows for iterative processing
paired_indices = set() # Track already paired rows
result = []

for i, row in enumerate(rows):
if i in paired_indices:
continue # Skip already paired rows

current = row["seqNum"]
prev_diff = row["diff_prev"]
next_diff = row["diff_next"]

# Pair with the row above if diff_prev == 1 and it is not already paired
if prev_diff == 1 and (i - 1) not in paired_indices:
result.append((current, pair_id, rows["seqNum"]))
result.append((rows["seqNum"], pair_id, current))
paired_indices.update([i, i - 1])
pair_id += 1

# Pair with the row below if diff_next == 1 and it is not already paired
elif next_diff == 1 and (i + 1) not in paired_indices:
result.append((current, pair_id, rows[i + 1]["seqNum"]))
result.append((rows[i + 1]["seqNum"], pair_id, current))
paired_indices.update([i, i + 1])
pair_id += 1

else:
result.append((current, None, None))

# to DataFrame
result_df = spark.createDataFrame(result, ["seqNum", "PairID", "Closest"])


Подробнее здесь: https://stackoverflow.com/questions/793 ... ring-logic
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «Python»