data < /p>
Код: Выделить всё
from pyspark.sql import SparkSession
from pyspark.sql.functions import sum, col, when, count
import pandas as pd
# Sample data
data = [(1, 12, 1, 5), (6, 8, 1, 6), (7, 15, 1, 7), (4, 9, 1, 12), (10, 11, 1, 9)]
columns = ["a", "b", "c", "d"]
df = spark.createDataFrame(data, columns)
< /code>
исходная функция < /p>
def check_fun(df,
a_input: str = None,
b_input: str = None,
c_input: str = None,
d_input: str = None):
columns = ['check', 'description', 'count']
all_res = pd.DataFrame(columns = columns)
# check1
if a_input is not None and b_input is not None:
check1_count = df.filter(col(a_input) > col(b_input)).count()
check_1_res = pd.DataFrame([['check1', 'a > b', check1_count]],
columns = columns)
all_res = all_res.append(check_1_res)
# check2
if a_input is not None and c_input is not None:
check2_count = df.filter(col(a_input) > col(c_input)).count()
check_2_res = pd.DataFrame([['check2', 'a > c', check2_count]],
columns = columns)
all_res = all_res.append(check_2_res)
# check3
if a_input is not None and d_input is not None:
check3_count = df.filter(col(a_input) > col(d_input)).count()
check_3_res = pd.DataFrame([['check3', 'a > d', check3_count]],
columns=columns)
all_res = all_res.append(check_3_res)
# check4
if b_input is not None and c_input is not None:
check4_count = df.filter(col(a_input) < col(d_input)).count()
check_4_res = pd.DataFrame([['check4', 'a < d', check4_count]],
columns=columns)
all_res = all_res.append(check_4_res)
return(all_res)
< /code>
Как я пытался его решить: < /p>
a = "a"
b = "b"
c = "c"
d = "d"
df.agg(
when(a is not None and b is not None, sum(when(col(a) > col(b), 1).otherwise(0)).otherwise(None).alias('check1')
)).show()
Подробнее здесь: https://stackoverflow.com/questions/793 ... inside-agg