Skip to content

Conversation

@erenavsarogullari
Copy link
Member

@erenavsarogullari erenavsarogullari commented Feb 10, 2026

What changes were proposed in this pull request?

Spark Adaptive Query Execution(AQE) framework coalesces the eligible shuffle partitions (e.g: empty/small-sized shuffle partitions) during the query execution. This optimization requires to be created coalesce groups for related Shuffle Stages (e.g: SortMergeJoin can have 1 ShuffleQueryStage per join-leg) to guarantee that both SMJ legs having the same num of partitions. To create coalesce groups for related ShuffleStages, Spark Plan Tree needs to be traversed by finding ShuffleQueryStages. SPARK-46590 has fixed incorrect coalesce grouping problem by adding BinaryExecNode Support for SparkPlan Tree traversal. This PR aims to introduce following complementary improvements on the top of SPARK-46590:
1- Adding warning log message to ShufflePartitionsUtil.coalescePartitionsWithoutSkew() when numOfPartitions of ShuffleStages in the same coalesce group are not equal. This is required for the consistency because ShufflePartitionsUtil.coalescePartitionsWithSkew() logs warning message for the same case,

2- Adding problematic shuffleStageIds to warning messages when numOfPartitions of ShuffleStages in the same coalesce group are not equal. This info can help for troubleshooting.

3- Aligning the warning logs for specially for both ShufflePartitionsUtil.coalescePartitionsWithoutSkew() and coalescePartitionsWithSkew() cases

4- 2 new UT cases are being added:
Current UT Cases cover following use cases:

skewed SortMergeJoin under Union under BroadcastNestedLoopJoin (BNLJ),
skewed SortMergeJoin under Union under CartesianProduct

This PR also adds following new UT cases:
4.1- skewed SortMergeJoin under Union under BroadcastHashJoin,
4.2- non-skewed SortMergeJoin under Union under BroadcastHashJoin

5- private def coalescePartitions() needs to be renamed because Scala does not allow the existence of default values in multiple overloaded methods. This causes following Scala compile-time problem:

[ERROR] [Error] /Users/eren.avsarogullari/Development/OSS/ossspark/sql/core/src/main/scala/
org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala:26: 
in object ShufflePartitionsUtil, multiple overloaded alternatives of method coalescePartitions define default arguments.

Why are the changes needed?

  • Warning message is useful for troubleshooting when numOfPartitions of ShuffleStages in the same coalesce group are not equal,
  • Problematic ShuffleStageIds can also help for the trouble shooting,
  • Additional UT coverage is also useful to verify coalesce grouping logic to avoid SPARK-46590 kinds of issues.

Does this PR introduce any user-facing change?

Yes, adding new warning message when numOfPartitions of ShuffleStages in the same coalesce group are not equal

How was this patch tested?

Added 2 new UT cases for existing use-cases to test coalesce grouping logic such as:

  • skewed SortMergeJoin under Union under BroadcastHashJoin,
  • non-skewed SortMergeJoin under Union under BroadcastHashJoin

Was this patch authored or co-authored using generative AI tooling?

No

…n numOfPartitions of ShuffleStages in the same coalesce group are not equal
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant