Skip to content

Commit 3c26a9d

Browse files
[Enhancement] prefer four-phase plan for distinct aggregation in single node env when group by column is skew (backport #64668) (#64731)
Signed-off-by: shuming.li <[email protected]> Co-authored-by: shuming.li <[email protected]>
1 parent fbaf85e commit 3c26a9d

File tree

7 files changed

+118
-21
lines changed

7 files changed

+118
-21
lines changed

fe/fe-core/src/main/java/com/starrocks/sql/optimizer/Utils.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import com.starrocks.catalog.ScalarType;
2828
import com.starrocks.catalog.Table;
2929
import com.starrocks.catalog.Type;
30+
import com.starrocks.common.FeConstants;
3031
import com.starrocks.common.Pair;
3132
import com.starrocks.common.util.DebugUtil;
3233
import com.starrocks.qe.ConnectContext;
@@ -1081,4 +1082,18 @@ public static Pair<Map<ColumnRefOperator, ConstantOperator>, List<ScalarOperator
10811082

10821083
return new Pair<>(columnConstMap, otherPredicates);
10831084
}
1085+
1086+
/**
1087+
* If there's only one BE node, splitting into multi-phase has no benefit but only overhead
1088+
*/
1089+
public static boolean isSingleNodeExecution(ConnectContext context) {
1090+
return context.getAliveExecutionNodesNumber() == 1;
1091+
}
1092+
1093+
/**
1094+
* Check whether the FE is running in unit test mode
1095+
*/
1096+
public static boolean isRunningInUnitTest() {
1097+
return FeConstants.runningUnitTest;
1098+
}
10841099
}

fe/fe-core/src/main/java/com/starrocks/sql/optimizer/rule/transformation/SplitAggregateRule.java

Lines changed: 28 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,9 @@
2020
import com.starrocks.catalog.AggregateFunction;
2121
import com.starrocks.catalog.FunctionSet;
2222
import com.starrocks.catalog.Type;
23-
import com.starrocks.common.FeConstants;
2423
import com.starrocks.qe.ConnectContext;
2524
import com.starrocks.sql.optimizer.OptExpression;
25+
import com.starrocks.sql.optimizer.Utils;
2626
import com.starrocks.sql.optimizer.operator.AggType;
2727
import com.starrocks.sql.optimizer.operator.OperatorType;
2828
import com.starrocks.sql.optimizer.operator.logical.LogicalAggregationOperator;
@@ -39,6 +39,7 @@
3939
import java.util.Collection;
4040
import java.util.List;
4141
import java.util.Map;
42+
import java.util.stream.Collectors;
4243

4344
import static com.starrocks.qe.SessionVariableConstants.AggregationStage.AUTO;
4445
import static com.starrocks.qe.SessionVariableConstants.AggregationStage.TWO_STAGE;
@@ -100,19 +101,14 @@ protected Type getIntermediateType(CallOperator aggregation) {
100101
return af.getIntermediateType() == null ? af.getReturnType() : af.getIntermediateType();
101102
}
102103

103-
/**
104-
* If there's only one BE node, splitting into multi-phase has no benefit but only overhead
105-
*/
106-
private static boolean isSingleNodeExecution(ConnectContext context) {
107-
return context.getAliveExecutionNodesNumber() == 1;
108-
}
109-
110-
protected boolean isSuitableForTwoStageDistinct(OptExpression input, LogicalAggregationOperator operator,
104+
protected boolean isSuitableForTwoStageDistinct(ConnectContext connectContext,
105+
OptExpression input,
106+
LogicalAggregationOperator operator,
111107
List<ColumnRefOperator> distinctColumns) {
112108
if (distinctColumns.isEmpty()) {
113109
return true;
114110
}
115-
int aggMode = ConnectContext.get().getSessionVariable().getNewPlannerAggStage();
111+
int aggMode = connectContext.getSessionVariable().getNewPlannerAggStage();
116112
for (CallOperator callOperator : operator.getAggregations().values()) {
117113
if (callOperator.isDistinct() && !canGenerateTwoStageAggregate(callOperator)) {
118114
return false;
@@ -126,11 +122,29 @@ protected boolean isSuitableForTwoStageDistinct(OptExpression input, LogicalAggr
126122
// 1. Only single node in the cluster
127123
// 2. With GROUP-BY clause, otherwise the second-stage cannot be parallelized
128124
// 3. CBO_ENABLE_SINGLE_NODE_PREFER_TWO_STAGE_AGGREGATE is enabled
129-
if (aggMode == AUTO.ordinal()
130-
&& isSingleNodeExecution(ConnectContext.get())
131-
&& !FeConstants.runningUnitTest
125+
if (!Utils.isRunningInUnitTest()
126+
&& aggMode == AUTO.ordinal()
127+
&& Utils.isSingleNodeExecution(connectContext)
132128
&& CollectionUtils.isNotEmpty(operator.getGroupingKeys())
133-
&& ConnectContext.get().getSessionVariable().isCboEnableSingleNodePreferTwoStageAggregate()) {
129+
&& connectContext.getSessionVariable().isCboEnableSingleNodePreferTwoStageAggregate()) {
130+
// for single node's distinct, we prefer two phase agg when partition by column in exchange is not skew.
131+
// since in such case, there is no scale problem.
132+
// otherwise prefer four phase agg in isThreeStageMoreEfficient() since it has the best scalability.
133+
List<ColumnRefOperator> partitionByColumns = operator.getPartitionByColumns();
134+
Statistics inputStatistics = input.getGroupExpression().inputAt(0).getStatistics();
135+
List<ColumnStatistic> partitionByColumnStatistics = partitionByColumns
136+
.stream()
137+
.map(inputStatistics::getColumnStatistic)
138+
.collect(Collectors.toList());
139+
if (partitionByColumnStatistics.stream().anyMatch(ColumnStatistic::isUnknown)) {
140+
return true;
141+
}
142+
double aggOutputRow =
143+
StatisticsCalculator.computeGroupByStatistics(partitionByColumns, inputStatistics, Maps.newHashMap());
144+
// if partition by column's ndv is too small, two phase agg may not scale
145+
if (aggOutputRow <= LOW_AGGREGATE_EFFECT_COEFFICIENT * 10) {
146+
return false;
147+
}
134148
return true;
135149
}
136150

fe/fe-core/src/main/java/com/starrocks/sql/optimizer/rule/transformation/SplitMultiPhaseAggRule.java

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.starrocks.catalog.Function;
2323
import com.starrocks.catalog.FunctionSet;
2424
import com.starrocks.catalog.Type;
25+
import com.starrocks.qe.ConnectContext;
2526
import com.starrocks.qe.SessionVariable;
2627
import com.starrocks.sql.common.ErrorType;
2728
import com.starrocks.sql.common.StarRocksPlannerException;
@@ -98,15 +99,15 @@ public List<OptExpression> transform(OptExpression input, OptimizerContext conte
9899
"each can't have multi columns.", ErrorType.USER_ERROR);
99100
}
100101

101-
if (isSuitableForTwoStageDistinct(input, aggOp, distinctCols.get())) {
102+
if (isSuitableForTwoStageDistinct(context.getConnectContext(), input, aggOp, distinctCols.get())) {
102103
return List.of();
103104
}
104105

105106
if (aggOp.getGroupingKeys().isEmpty()) {
106107
return implementDistinctWithoutGroupByAgg(context.getColumnRefFactory(),
107108
input, aggOp, distinctCols.get());
108109
} else {
109-
return implementDistinctWithGroupByAgg(context.getSessionVariable(), context.getColumnRefFactory(), input, aggOp);
110+
return implementDistinctWithGroupByAgg(context.getConnectContext(), context.getColumnRefFactory(), input, aggOp);
110111
}
111112

112113
}
@@ -178,7 +179,7 @@ private OptExpression connectThreeStageAgg(OptExpression localExpr,
178179
// For SQL: select count(distinct k1) from test_basic group by v1;
179180
// Local Agg -> Distinct global Agg -> Global Agg
180181
private List<OptExpression> implementDistinctWithGroupByAgg(
181-
SessionVariable sv,
182+
ConnectContext connectContext,
182183
ColumnRefFactory columnRefFactory,
183184
OptExpression input,
184185
LogicalAggregationOperator oldAgg) {
@@ -206,13 +207,14 @@ private List<OptExpression> implementDistinctWithGroupByAgg(
206207
connectThreeStageAgg(localExpr, distinctGlobal, global, oldAgg.getGroupingKeys(), true));
207208
}
208209

209-
if (!isThreeStageMoreEfficient(sv, input, distinctGlobal.getGroupingKeys(), local.getPartitionByColumns())) {
210+
if (!isThreeStageMoreEfficient(connectContext, input, distinctGlobal.getGroupingKeys(), local.getPartitionByColumns())) {
210211
// 4-stage: Local(GB k1,v1) -> Distinct Global(GB k1,v1; PB k1,v1) -> Local(GB v1) -> Global(GB v1; PB v1)
211212
// Use grouping keys and distinct cols to distribute data, we need to continue split the global agg.
212213
return Lists.newArrayList(
213214
connectThreeStageAgg(localExpr, distinctGlobal, global, distinctGlobal.getGroupingKeys(), false));
214215
}
215216

217+
final SessionVariable sv = connectContext.getSessionVariable();
216218
if (!sv.isEnableCostBasedMultiStageAgg()) {
217219
// 3-stage: Local(GB k1,v1) -> Distinct Global(GB k1,v1; PB v1) -> Global(GB v1; PB v1)
218220
return Lists.newArrayList(
@@ -390,15 +392,25 @@ private ScalarOperator createCountDistinctAggParam(List<ScalarOperator> children
390392
return elseOperator;
391393
}
392394

393-
private boolean isThreeStageMoreEfficient(SessionVariable sv, OptExpression input, List<ColumnRefOperator> groupKeys,
395+
private boolean isThreeStageMoreEfficient(ConnectContext connectContext, OptExpression input,
396+
List<ColumnRefOperator> groupKeys,
394397
List<ColumnRefOperator> partitionByColumns) {
398+
final SessionVariable sv = connectContext.getSessionVariable();
395399
if (sv.getNewPlannerAggStage() == FOUR_STAGE.ordinal()) {
396400
return false;
397401
}
398402
if (sv.getNewPlannerAggStage() == THREE_STAGE.ordinal()) {
399403
return true;
400404
}
401405

406+
// for a single node, we prefer multi-phases agg when partitionByColumns between 1/2 agg is not skew.
407+
// and prefer four phase agg when partitionByColumns is skew
408+
if (!Utils.isRunningInUnitTest()
409+
&& sv.isCboEnableSingleNodePreferTwoStageAggregate()
410+
&& Utils.isSingleNodeExecution(connectContext)) {
411+
return false;
412+
}
413+
402414
Statistics inputStatistics = input.getGroupExpression().inputAt(0).getStatistics();
403415
Collection<ColumnStatistic> inputsColumnStatistics = inputStatistics.getColumnStatistics().values();
404416

fe/fe-core/src/main/java/com/starrocks/sql/optimizer/rule/transformation/SplitTwoPhaseAggRule.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ public List<OptExpression> transform(OptExpression input, OptimizerContext conte
7878
"each can't have multi columns.", ErrorType.USER_ERROR);
7979
}
8080

81-
if (!isSuitableForTwoStageDistinct(input, aggOp, distinctCols.get())) {
81+
if (!isSuitableForTwoStageDistinct(context.getConnectContext(), input, aggOp, distinctCols.get())) {
8282
return List.of();
8383
}
8484

fe/fe-core/src/test/java/com/starrocks/sql/plan/ReplayFromDumpTest.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,14 @@
1818
import com.starrocks.common.FeConstants;
1919
import com.starrocks.common.Pair;
2020
import com.starrocks.common.profile.Tracers;
21+
import com.starrocks.qe.ConnectContext;
2122
import com.starrocks.qe.SessionVariable;
2223
import com.starrocks.server.GlobalStateMgr;
2324
import com.starrocks.sql.common.QueryDebugOptions;
2425
import com.starrocks.sql.common.StarRocksPlannerException;
2526
import com.starrocks.sql.optimizer.OptExpression;
2627
import com.starrocks.sql.optimizer.OptimizerContext;
28+
import com.starrocks.sql.optimizer.Utils;
2729
import com.starrocks.sql.optimizer.base.CTEProperty;
2830
import com.starrocks.sql.optimizer.dump.QueryDumpInfo;
2931
import com.starrocks.sql.optimizer.rule.RuleSet;
@@ -1206,4 +1208,27 @@ public boolean isEnableViewBasedMVRewrite(View view) {
12061208
String plan = replayPair.second;
12071209
PlanTestBase.assertContains(plan, "single_mv_ads_biz_customer_combine_td_for_task_2y");
12081210
}
1211+
1212+
@Test
1213+
public void testSingleNodePlanWithMultiAggStage1() throws Exception {
1214+
new MockUp<Utils>() {
1215+
@Mock
1216+
public static boolean isSingleNodeExecution(ConnectContext context) {
1217+
return true;
1218+
}
1219+
@Mock
1220+
public static boolean isRunningInUnitTest() {
1221+
return false;
1222+
}
1223+
};
1224+
String plan = getPlanFragment("query_dump/single_node_plan1", TExplainLevel.NORMAL);
1225+
PlanTestBase.assertContains(plan, " 2:AGGREGATE (update serialize)\n" +
1226+
" | STREAMING\n" +
1227+
" | output: sum(106: sum), count(107: count), avg(108: avg), count(3: UserID)\n" +
1228+
" | group by: 10: RegionID\n" +
1229+
" | \n" +
1230+
" 1:AGGREGATE (update serialize)\n" +
1231+
" | output: sum(41: AdvEngineID), count(*), avg(21: ResolutionWidth)\n" +
1232+
" | group by: 3: UserID, 10: RegionID");
1233+
}
12091234
}

fe/fe-core/src/test/java/com/starrocks/utframe/UtFrameUtils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1000,7 +1000,7 @@ public static Pair<String, ExecPlan> getNewPlanAndFragmentFromDump(ConnectContex
10001000
} else if (statementBase instanceof InsertStmt) {
10011001
return getInsertExecPlan((InsertStmt) statementBase, connectContext);
10021002
} else {
1003-
Preconditions.checkState(false, "Do not support the statement");
1003+
Preconditions.checkState(false, "Do not support the statement:" + statementBase);
10041004
return null;
10051005
}
10061006
} finally {

0 commit comments

Comments
 (0)