Я работаю в Azure Synapse и привыкаю к работе с pyspark. Я хочу создать логику сопоставления между строками в моем df, но не могу заставить ее работать. У меня есть столбец идентификатора и порядковый номер. Например:
ID
seqNum
100
3609
100
3610
< tr>
100
3616
100< /td>
3617
100
3622
< tr>
100
3623
100< /td>
3634
100
3642
< tr>
100
3643
Вот что должен вывести код:
ID
seqNum
pairID
100
3609
1
100
36101
100
3616
2
100
3617
2
100
3622
3
100
3623
3
1003634
Null
100
3642
4
100
36434
Строка с 3634 должна не быть парными, поскольку разница между порядковыми номерами должна быть равна единице.
I есть логика в Python, которая, кажется, работает, но тогда я не могу использовать возможности обработки из искры. Может ли кто-нибудь помочь мне создать логику в pyspark?
# window specification
windowSpec = Window.orderBy("seqNum")
# Add prev and next sequence numbers
df = df.withColumn("prev_seq", lag("seqNum").over(windowSpec))
df = df.withColumn("next_seq", lead("seqNum").over(windowSpec))
# Add flags to indicate proximity
df = df.withColumn("diff_prev", col("ID") - col("prev_seq"))
df = df.withColumn("diff_next", col("next_seq") - col("seqNum"))
#make PairID
df = df.withColumn("PairID", lit(None).cast("int"))
# Assign PairID based on proximity logic
pair_id = 1
rows = df.collect() # Collect rows for iterative processing
paired_indices = set() # Track already paired rows
result = []
for i, row in enumerate(rows):
if i in paired_indices:
continue # Skip already paired rows
current = row["seqNum"]
prev_diff = row["diff_prev"]
next_diff = row["diff_next"]
# Pair with the row above if diff_prev == 1 and it is not already paired
if prev_diff == 1 and (i - 1) not in paired_indices:
result.append((current, pair_id, rows["seqNum"]))
result.append((rows["seqNum"], pair_id, current))
paired_indices.update([i, i - 1])
pair_id += 1
# Pair with the row below if diff_next == 1 and it is not already paired
elif next_diff == 1 and (i + 1) not in paired_indices:
result.append((current, pair_id, rows[i + 1]["seqNum"]))
result.append((rows[i + 1]["seqNum"], pair_id, current))
paired_indices.update([i, i + 1])
pair_id += 1
else:
result.append((current, None, None))
# to DataFrame
result_df = spark.createDataFrame(result, ["seqNum", "PairID", "Closest"])
Подробнее здесь: https://stackoverflow.com/questions/793 ... ring-logic
Pyspark создает логику сопряжения ⇐ Python
Программы на Python
1736509528
Anonymous
Я работаю в Azure Synapse и привыкаю к работе с pyspark. Я хочу создать логику сопоставления между строками в моем df, но не могу заставить ее работать. У меня есть столбец идентификатора и порядковый номер. Например:
ID
seqNum
100
3609
100
3610
< tr>
100
3616
100< /td>
3617
100
3622
< tr>
100
3623
100< /td>
3634
100
3642
< tr>
100
3643
Вот что должен вывести код:
ID
seqNum
pairID
100
3609
1
100
36101
100
3616
2
100
3617
2
100
3622
3
100
3623
3
1003634
Null
100
3642
4
100
36434
Строка с 3634 должна не быть парными, поскольку разница между порядковыми номерами должна быть равна единице.
I есть логика в Python, которая, кажется, работает, но тогда я не могу использовать возможности обработки из искры. Может ли кто-нибудь помочь мне создать логику в pyspark?
# window specification
windowSpec = Window.orderBy("seqNum")
# Add prev and next sequence numbers
df = df.withColumn("prev_seq", lag("seqNum").over(windowSpec))
df = df.withColumn("next_seq", lead("seqNum").over(windowSpec))
# Add flags to indicate proximity
df = df.withColumn("diff_prev", col("ID") - col("prev_seq"))
df = df.withColumn("diff_next", col("next_seq") - col("seqNum"))
#make PairID
df = df.withColumn("PairID", lit(None).cast("int"))
# Assign PairID based on proximity logic
pair_id = 1
rows = df.collect() # Collect rows for iterative processing
paired_indices = set() # Track already paired rows
result = []
for i, row in enumerate(rows):
if i in paired_indices:
continue # Skip already paired rows
current = row["seqNum"]
prev_diff = row["diff_prev"]
next_diff = row["diff_next"]
# Pair with the row above if diff_prev == 1 and it is not already paired
if prev_diff == 1 and (i - 1) not in paired_indices:
result.append((current, pair_id, rows[i - 1]["seqNum"]))
result.append((rows[i - 1]["seqNum"], pair_id, current))
paired_indices.update([i, i - 1])
pair_id += 1
# Pair with the row below if diff_next == 1 and it is not already paired
elif next_diff == 1 and (i + 1) not in paired_indices:
result.append((current, pair_id, rows[i + 1]["seqNum"]))
result.append((rows[i + 1]["seqNum"], pair_id, current))
paired_indices.update([i, i + 1])
pair_id += 1
else:
result.append((current, None, None))
# to DataFrame
result_df = spark.createDataFrame(result, ["seqNum", "PairID", "Closest"])
Подробнее здесь: [url]https://stackoverflow.com/questions/79345157/pyspark-creating-paring-logic[/url]
Ответить
1 сообщение
• Страница 1 из 1
Перейти
- Кемерово-IT
- ↳ Javascript
- ↳ C#
- ↳ JAVA
- ↳ Elasticsearch aggregation
- ↳ Python
- ↳ Php
- ↳ Android
- ↳ Html
- ↳ Jquery
- ↳ C++
- ↳ IOS
- ↳ CSS
- ↳ Excel
- ↳ Linux
- ↳ Apache
- ↳ MySql
- Детский мир
- Для души
- ↳ Музыкальные инструменты даром
- ↳ Печатная продукция даром
- Внешняя красота и здоровье
- ↳ Одежда и обувь для взрослых даром
- ↳ Товары для здоровья
- ↳ Физкультура и спорт
- Техника - даром!
- ↳ Автомобилистам
- ↳ Компьютерная техника
- ↳ Плиты: газовые и электрические
- ↳ Холодильники
- ↳ Стиральные машины
- ↳ Телевизоры
- ↳ Телефоны, смартфоны, плашеты
- ↳ Швейные машинки
- ↳ Прочая электроника и техника
- ↳ Фототехника
- Ремонт и интерьер
- ↳ Стройматериалы, инструмент
- ↳ Мебель и предметы интерьера даром
- ↳ Cантехника
- Другие темы
- ↳ Разное даром
- ↳ Давай меняться!
- ↳ Отдам\возьму за копеечку
- ↳ Работа и подработка в Кемерове
- ↳ Давай с тобой поговорим...
Мобильная версия