Код: Выделить всё
df = spark.createDataFrame([['A', 'A06', 'B', 'B02', '202412'], ['A', 'A04', 'B', 'B03', '202501'], ['B', 'B01', 'C', 'C02', '202411'], ['B', 'B03', 'A', 'A06', '202502']], 'entity_code: string, entity_rollup: string, target_entity_code: string, target_entity_rollup: string, period: string')
df.show()
df.createOrReplaceTempView('v1')
+-----------+-------------+------------------+--------------------+------+
|entity_code|entity_rollup|target_entity_code|target_entity_rollup|period|
+-----------+-------------+------------------+--------------------+------+
| A| A06| B| B02|202412|
| A| A04| B| B03|202501|
| B| B01| C| C02|202411|
| B| B03| A| A06|202502|
+-----------+-------------+------------------+--------------------+------+
Код: Выделить всё
SELECT DISTINCT
STACK(
2
, entity_code, entity_rollup
, target_entity_code, target_entity_rollup
) AS (entity_code, entity_rollup)
, period
FROM v1
Running 'explain' on the above gives
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[entity_code#22625, entity_rollup#22626, period#21414], functions=[])
+- Exchange hashpartitioning(entity_code#22625, entity_rollup#22626, period#21414, 200), ENSURE_REQUIREMENTS, [plan_id=12977]
+- HashAggregate(keys=[entity_code#22625, entity_rollup#22626, period#21414], functions=[])
+- Project [entity_code#22625, entity_rollup#22626, period#21414]
+- Generate stack(2, entity_code#21410, entity_rollup#21411, target_entity_code#21412, target_entity_rollup#21413), [period#21414], false, [entity_code#22625, entity_rollup#22626]
+- LocalTableScan [entity_code#21410, entity_rollup#21411, target_entity_code#21412, target_entity_rollup#21413, period#21414]
Код: Выделить всё
SELECT entity_code, entity_rollup, period
FROM v1
UNION
SELECT target_entity_code, target_entity_rollup, period
FROM v1
Running 'explain' on above query did not have the FileScan, so added that from actual query as well
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[entity_source_id#24129, entity_rollup#24130, entity#24131, period#24175], functions=[])
+- Exchange hashpartitioning(entity_source_id#24129, entity_rollup#24130, entity#24131, period#24175, 200), ENSURE_REQUIREMENTS, [plan_id=14134]
+- HashAggregate(keys=[entity_source_id#24129, entity_rollup#24130, entity#24131, period#24175], functions=[])
+- Union
:- Project [billing_entity_code#24170 AS entity_source_id#24129, billing_entity_region#24171 AS entity_rollup#24130, billing_entity_name#24169 AS entity#24131, period#24175]
: +- FileScan ... Batched: true, DataFilters: [], Format: Parquet, Location: ..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct
Подробнее здесь: [url]https://stackoverflow.com/questions/79576707/performance-comparison-between-union-and-stack-in-spark[/url]
Мобильная версия