Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/sql/optimizer/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.starrocks.catalog.ScalarType;
import com.starrocks.catalog.Table;
import com.starrocks.catalog.Type;
import com.starrocks.common.FeConstants;
import com.starrocks.common.Pair;
import com.starrocks.common.util.DebugUtil;
import com.starrocks.qe.ConnectContext;
Expand Down Expand Up @@ -1081,4 +1082,18 @@ public static Pair<Map<ColumnRefOperator, ConstantOperator>, List<ScalarOperator

return new Pair<>(columnConstMap, otherPredicates);
}

/**
* If there's only one BE node, splitting into multi-phase has no benefit but only overhead
*/
public static boolean isSingleNodeExecution(ConnectContext context) {
return context.getAliveExecutionNodesNumber() == 1;
}

/**
* Check whether the FE is running in unit test mode
*/
public static boolean isRunningInUnitTest() {
return FeConstants.runningUnitTest;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@
import com.starrocks.catalog.AggregateFunction;
import com.starrocks.catalog.FunctionSet;
import com.starrocks.catalog.Type;
import com.starrocks.common.FeConstants;
import com.starrocks.qe.ConnectContext;
import com.starrocks.sql.optimizer.OptExpression;
import com.starrocks.sql.optimizer.Utils;
import com.starrocks.sql.optimizer.operator.AggType;
import com.starrocks.sql.optimizer.operator.OperatorType;
import com.starrocks.sql.optimizer.operator.logical.LogicalAggregationOperator;
Expand All @@ -39,6 +39,7 @@
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import static com.starrocks.qe.SessionVariableConstants.AggregationStage.AUTO;
import static com.starrocks.qe.SessionVariableConstants.AggregationStage.TWO_STAGE;
Expand Down Expand Up @@ -100,19 +101,14 @@ protected Type getIntermediateType(CallOperator aggregation) {
return af.getIntermediateType() == null ? af.getReturnType() : af.getIntermediateType();
}

/**
* If there's only one BE node, splitting into multi-phase has no benefit but only overhead
*/
private static boolean isSingleNodeExecution(ConnectContext context) {
return context.getAliveExecutionNodesNumber() == 1;
}

protected boolean isSuitableForTwoStageDistinct(OptExpression input, LogicalAggregationOperator operator,
protected boolean isSuitableForTwoStageDistinct(ConnectContext connectContext,
OptExpression input,
LogicalAggregationOperator operator,
List<ColumnRefOperator> distinctColumns) {
if (distinctColumns.isEmpty()) {
return true;
}
int aggMode = ConnectContext.get().getSessionVariable().getNewPlannerAggStage();
int aggMode = connectContext.getSessionVariable().getNewPlannerAggStage();
for (CallOperator callOperator : operator.getAggregations().values()) {
if (callOperator.isDistinct() && !canGenerateTwoStageAggregate(callOperator)) {
return false;
Expand All @@ -126,11 +122,29 @@ protected boolean isSuitableForTwoStageDistinct(OptExpression input, LogicalAggr
// 1. Only single node in the cluster
// 2. With GROUP-BY clause, otherwise the second-stage cannot be parallelized
// 3. CBO_ENABLE_SINGLE_NODE_PREFER_TWO_STAGE_AGGREGATE is enabled
if (aggMode == AUTO.ordinal()
&& isSingleNodeExecution(ConnectContext.get())
&& !FeConstants.runningUnitTest
if (!Utils.isRunningInUnitTest()
&& aggMode == AUTO.ordinal()
&& Utils.isSingleNodeExecution(connectContext)
&& CollectionUtils.isNotEmpty(operator.getGroupingKeys())
&& ConnectContext.get().getSessionVariable().isCboEnableSingleNodePreferTwoStageAggregate()) {
&& connectContext.getSessionVariable().isCboEnableSingleNodePreferTwoStageAggregate()) {
// for single node's distinct, we prefer two phase agg when partition by column in exchange is not skew.
// since in such case, there is no scale problem.
// otherwise prefer four phase agg in isThreeStageMoreEfficient() since it has the best scalability.
List<ColumnRefOperator> partitionByColumns = operator.getPartitionByColumns();
Statistics inputStatistics = input.getGroupExpression().inputAt(0).getStatistics();
List<ColumnStatistic> partitionByColumnStatistics = partitionByColumns
.stream()
.map(inputStatistics::getColumnStatistic)
.collect(Collectors.toList());
if (partitionByColumnStatistics.stream().anyMatch(ColumnStatistic::isUnknown)) {
return true;
}
double aggOutputRow =
StatisticsCalculator.computeGroupByStatistics(partitionByColumns, inputStatistics, Maps.newHashMap());
// if partition by column's ndv is too small, two phase agg may not scale
if (aggOutputRow <= LOW_AGGREGATE_EFFECT_COEFFICIENT * 10) {
return false;
}
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.starrocks.catalog.Function;
import com.starrocks.catalog.FunctionSet;
import com.starrocks.catalog.Type;
import com.starrocks.qe.ConnectContext;
import com.starrocks.qe.SessionVariable;
import com.starrocks.sql.common.ErrorType;
import com.starrocks.sql.common.StarRocksPlannerException;
Expand Down Expand Up @@ -98,15 +99,15 @@ public List<OptExpression> transform(OptExpression input, OptimizerContext conte
"each can't have multi columns.", ErrorType.USER_ERROR);
}

if (isSuitableForTwoStageDistinct(input, aggOp, distinctCols.get())) {
if (isSuitableForTwoStageDistinct(context.getConnectContext(), input, aggOp, distinctCols.get())) {
return List.of();
}

if (aggOp.getGroupingKeys().isEmpty()) {
return implementDistinctWithoutGroupByAgg(context.getColumnRefFactory(),
input, aggOp, distinctCols.get());
} else {
return implementDistinctWithGroupByAgg(context.getSessionVariable(), context.getColumnRefFactory(), input, aggOp);
return implementDistinctWithGroupByAgg(context.getConnectContext(), context.getColumnRefFactory(), input, aggOp);
}

}
Expand Down Expand Up @@ -178,7 +179,7 @@ private OptExpression connectThreeStageAgg(OptExpression localExpr,
// For SQL: select count(distinct k1) from test_basic group by v1;
// Local Agg -> Distinct global Agg -> Global Agg
private List<OptExpression> implementDistinctWithGroupByAgg(
SessionVariable sv,
ConnectContext connectContext,
ColumnRefFactory columnRefFactory,
OptExpression input,
LogicalAggregationOperator oldAgg) {
Expand Down Expand Up @@ -206,13 +207,14 @@ private List<OptExpression> implementDistinctWithGroupByAgg(
connectThreeStageAgg(localExpr, distinctGlobal, global, oldAgg.getGroupingKeys(), true));
}

if (!isThreeStageMoreEfficient(sv, input, distinctGlobal.getGroupingKeys(), local.getPartitionByColumns())) {
if (!isThreeStageMoreEfficient(connectContext, input, distinctGlobal.getGroupingKeys(), local.getPartitionByColumns())) {
// 4-stage: Local(GB k1,v1) -> Distinct Global(GB k1,v1; PB k1,v1) -> Local(GB v1) -> Global(GB v1; PB v1)
// Use grouping keys and distinct cols to distribute data, we need to continue split the global agg.
return Lists.newArrayList(
connectThreeStageAgg(localExpr, distinctGlobal, global, distinctGlobal.getGroupingKeys(), false));
}

final SessionVariable sv = connectContext.getSessionVariable();
if (!sv.isEnableCostBasedMultiStageAgg()) {
// 3-stage: Local(GB k1,v1) -> Distinct Global(GB k1,v1; PB v1) -> Global(GB v1; PB v1)
return Lists.newArrayList(
Expand Down Expand Up @@ -390,15 +392,25 @@ private ScalarOperator createCountDistinctAggParam(List<ScalarOperator> children
return elseOperator;
}

private boolean isThreeStageMoreEfficient(SessionVariable sv, OptExpression input, List<ColumnRefOperator> groupKeys,
private boolean isThreeStageMoreEfficient(ConnectContext connectContext, OptExpression input,
List<ColumnRefOperator> groupKeys,
List<ColumnRefOperator> partitionByColumns) {
final SessionVariable sv = connectContext.getSessionVariable();
if (sv.getNewPlannerAggStage() == FOUR_STAGE.ordinal()) {
return false;
}
if (sv.getNewPlannerAggStage() == THREE_STAGE.ordinal()) {
return true;
}

// for a single node, we prefer multi-phases agg when partitionByColumns between 1/2 agg is not skew.
// and prefer four phase agg when partitionByColumns is skew
if (!Utils.isRunningInUnitTest()
&& sv.isCboEnableSingleNodePreferTwoStageAggregate()
&& Utils.isSingleNodeExecution(connectContext)) {
return false;
}

Statistics inputStatistics = input.getGroupExpression().inputAt(0).getStatistics();
Collection<ColumnStatistic> inputsColumnStatistics = inputStatistics.getColumnStatistics().values();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public List<OptExpression> transform(OptExpression input, OptimizerContext conte
"each can't have multi columns.", ErrorType.USER_ERROR);
}

if (!isSuitableForTwoStageDistinct(input, aggOp, distinctCols.get())) {
if (!isSuitableForTwoStageDistinct(context.getConnectContext(), input, aggOp, distinctCols.get())) {
return List.of();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@
import com.starrocks.common.FeConstants;
import com.starrocks.common.Pair;
import com.starrocks.common.profile.Tracers;
import com.starrocks.qe.ConnectContext;
import com.starrocks.qe.SessionVariable;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.sql.common.QueryDebugOptions;
import com.starrocks.sql.common.StarRocksPlannerException;
import com.starrocks.sql.optimizer.OptExpression;
import com.starrocks.sql.optimizer.OptimizerContext;
import com.starrocks.sql.optimizer.Utils;
import com.starrocks.sql.optimizer.base.CTEProperty;
import com.starrocks.sql.optimizer.dump.QueryDumpInfo;
import com.starrocks.sql.optimizer.rule.RuleSet;
Expand Down Expand Up @@ -1206,4 +1208,27 @@ public boolean isEnableViewBasedMVRewrite(View view) {
String plan = replayPair.second;
PlanTestBase.assertContains(plan, "single_mv_ads_biz_customer_combine_td_for_task_2y");
}

@Test
public void testSingleNodePlanWithMultiAggStage1() throws Exception {
new MockUp<Utils>() {
@Mock
public static boolean isSingleNodeExecution(ConnectContext context) {
return true;
}
@Mock
public static boolean isRunningInUnitTest() {
return false;
}
};
String plan = getPlanFragment("query_dump/single_node_plan1", TExplainLevel.NORMAL);
PlanTestBase.assertContains(plan, " 2:AGGREGATE (update serialize)\n" +
" | STREAMING\n" +
" | output: sum(106: sum), count(107: count), avg(108: avg), count(3: UserID)\n" +
" | group by: 10: RegionID\n" +
" | \n" +
" 1:AGGREGATE (update serialize)\n" +
" | output: sum(41: AdvEngineID), count(*), avg(21: ResolutionWidth)\n" +
" | group by: 3: UserID, 10: RegionID");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1000,7 +1000,7 @@ public static Pair<String, ExecPlan> getNewPlanAndFragmentFromDump(ConnectContex
} else if (statementBase instanceof InsertStmt) {
return getInsertExecPlan((InsertStmt) statementBase, connectContext);
} else {
Preconditions.checkState(false, "Do not support the statement");
Preconditions.checkState(false, "Do not support the statement:" + statementBase);
return null;
}
} finally {
Expand Down
Loading
Loading