Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1844,6 +1844,19 @@ public void setMaintenancePlan(ExecPlan maintenancePlan) {
@Override
public void inferDistribution(DistributionInfo info) throws DdlException {
if (info.getBucketNum() == 0) {
// if mv has been already refreshed, deduce bucket num from existing tablets
boolean hasRefreshed = getVisiblePartitions().stream().anyMatch(Partition::hasData);
if (hasRefreshed) {
int numBucket = CatalogUtils.calAvgBucketNumOfRecentPartitions(this,
FeConstants.DEFAULT_INFER_BUCKET_NUM_RECENT_PARTITION_NUM,
Config.enable_auto_tablet_distribution);
// use the numBucket only when it's greater than 1, otherwise skip
if (numBucket > 1) {
info.setBucketNum(numBucket);
return;
}
}

int inferredBucketNum = 0;
for (BaseTableInfo base : getBaseTableInfos()) {
Optional<Table> optTable = MvUtils.getTable(base);
Expand All @@ -1853,8 +1866,11 @@ public void inferDistribution(DistributionInfo info) throws DdlException {
Table table = optTable.get();
if (table.isNativeTableOrMaterializedView()) {
OlapTable olapTable = (OlapTable) table;
DistributionInfo dist = olapTable.getDefaultDistributionInfo();
inferredBucketNum = Math.max(inferredBucketNum, dist.getBucketNum());
// deduce bucket num from base table rather than use its distribution info
int numBucket = CatalogUtils.calAvgBucketNumOfRecentPartitions(olapTable,
FeConstants.DEFAULT_INFER_BUCKET_NUM_RECENT_PARTITION_NUM,
Config.enable_auto_tablet_distribution);
inferredBucketNum = Math.max(inferredBucketNum, numBucket);
}
}
if (inferredBucketNum == 0) {
Expand Down
11 changes: 8 additions & 3 deletions fe/fe-core/src/main/java/com/starrocks/catalog/OlapTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
import com.starrocks.common.AnalysisException;
import com.starrocks.common.Config;
import com.starrocks.common.DdlException;
import com.starrocks.common.FeConstants;
import com.starrocks.common.InvalidOlapTableStateException;
import com.starrocks.common.Pair;
import com.starrocks.common.io.DeepCopy;
Expand Down Expand Up @@ -1367,15 +1368,19 @@ public DistributionInfo getDefaultDistributionInfo() {
return defaultDistributionInfo;
}

/*
* Infer the distribution info based on partitions and cluster status
/**
* Infer the distribution info based on partitions and cluster status:
* - For hash distribution, if bucket num is not set, we will set it to
* average bucket num of recent partitions.
* - For random distribution, if mutable bucket num is not set, we will set it with a deduced value.
*/
public void inferDistribution(DistributionInfo info) throws DdlException {
if (info.getType() == DistributionInfo.DistributionInfoType.HASH) {
// infer bucket num
if (info.getBucketNum() == 0) {
int numBucket = CatalogUtils.calAvgBucketNumOfRecentPartitions(this,
5, Config.enable_auto_tablet_distribution);
FeConstants.DEFAULT_INFER_BUCKET_NUM_RECENT_PARTITION_NUM,
Config.enable_auto_tablet_distribution);
info.setBucketNum(numBucket);
}
} else if (info.getType() == DistributionInfo.DistributionInfoType.RANDOM) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ public class FeConstants {
public static final int MAX_COUNTER_NUM_OF_TOP_K = 100000;

public static final int DEFAULT_UNPARTITIONED_TABLE_BUCKET_NUM = 16;
// When inferring bucket num from recent partitions, we will check at most 5 recent partitions
public static final int DEFAULT_INFER_BUCKET_NUM_RECENT_PARTITION_NUM = 5;

public static final int MAX_LIST_PARTITION_NAME_LENGTH = 50;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -390,9 +390,7 @@ public void testJoin2() {
PlanTestBase.assertContains(plan, " TABLE: mv0\n" +
" PREAGGREGATION: ON\n" +
" PREDICATES: 9: k1 = 1\n" +
" partitions=1/1\n" +
" rollup: mv0\n" +
" tabletRatio=1/3");
" partitions=1/1\n");
}
}
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,20 +141,17 @@ public void testTransparentRewriteWithScanMv() {
" PREAGGREGATION: ON\n" +
" PREDICATES: 5: k1 = 1\n" +
" partitions=1/1\n" +
" rollup: mv0\n" +
" tabletRatio=1/3",
" rollup: mv0\n",
" TABLE: mv0\n" +
" PREAGGREGATION: ON\n" +
" PREDICATES: 5: k1 < 3\n" +
" partitions=1/1\n" +
" rollup: mv0\n" +
" tabletRatio=3/3",
" rollup: mv0\n",
" TABLE: mv0\n" +
" PREAGGREGATION: ON\n" +
" PREDICATES: 5: k1 < 2\n" +
" partitions=1/1\n" +
" rollup: mv0\n" +
" tabletRatio=3/3",
" rollup: mv0\n"
};
for (int i = 0; i < sqls.length; i++) {
String query = sqls[i];
Expand Down Expand Up @@ -216,33 +213,22 @@ public void testTransparentRewriteWithScanMv() {
String[] expectPlans = {
" TABLE: m1\n" +
" PREAGGREGATION: ON\n" +
" PREDICATES: 13: k1 < 6, 14: k2 LIKE 'a%'\n" +
" partitions=1/3",
" PREDICATES: 13: k1 < 6, 14: k2 LIKE 'a%'\n",
" TABLE: mv0\n" +
" PREAGGREGATION: ON\n" +
" PREDICATES: 9: k1 < 6, 10: k2 LIKE 'a%'\n" +
" partitions=1/1", // case 1
" PREDICATES: 9: k1 < 6, 10: k2 LIKE 'a%'\n",
" TABLE: m1\n" +
" PREAGGREGATION: ON\n" +
" PREDICATES: 13: k1 > 0, 13: k1 < 6, 14: k2 LIKE 'a%'\n" +
" partitions=1/3",
" PREDICATES: 13: k1 > 0, 13: k1 < 6, 14: k2 LIKE 'a%'\n",
" TABLE: mv0\n" +
" PREAGGREGATION: ON\n" +
" PREDICATES: 9: k1 > 0, 9: k1 < 6, 10: k2 LIKE 'a%'\n" +
" partitions=1/1\n" +
" rollup: mv0", // case 2
" PREDICATES: 9: k1 > 0, 9: k1 < 6, 10: k2 LIKE 'a%'\n",
" TABLE: m1\n" +
" PREAGGREGATION: ON\n" +
" PREDICATES: 13: k1 > 1, 13: k1 < 6, 14: k2 LIKE 'a%'\n" +
" partitions=1/3\n" +
" rollup: m1\n" +
" tabletRatio=3/3",
" PREDICATES: 13: k1 > 1, 13: k1 < 6, 14: k2 LIKE 'a%'\n",
" TABLE: mv0\n" +
" PREAGGREGATION: ON\n" +
" PREDICATES: 9: k1 > 1, 9: k1 < 6, 10: k2 LIKE 'a%'\n" +
" partitions=1/1\n" +
" rollup: mv0\n" +
" tabletRatio=3/3", // case 3
" PREDICATES: 9: k1 > 1, 9: k1 < 6, 10: k2 LIKE 'a%'\n"
};
for (int i = 0; i < sqls.length; i++) {
System.out.println("start to test " + i);
Expand Down Expand Up @@ -270,50 +256,35 @@ public void testTransparentRewriteWithScanMv() {
" PREAGGREGATION: ON\n" +
" PREDICATES: 9: k1 != 3, 10: k2 LIKE 'a%'\n" +
" partitions=1/1\n" +
" rollup: mv0\n" +
" tabletRatio=3/3",
" rollup: mv0\n",
" TABLE: m1\n" +
" PREAGGREGATION: ON\n" +
" PREDICATES: 13: k1 != 3, 14: k2 LIKE 'a%'\n" +
" partitions=2/3\n" +
" rollup: m1\n" +
" tabletRatio=6/6",
" partitions=2/3\n",
" TABLE: mv0\n" +
" PREAGGREGATION: ON\n" +
" PREDICATES: 9: k1 > 0, 10: k2 LIKE 'a%'\n" +
" partitions=1/1\n" +
" rollup: mv0\n" +
" tabletRatio=3/3",
" partitions=1/1\n",
" TABLE: m1\n" +
" PREAGGREGATION: ON\n" +
" PREDICATES: 13: k1 > 0, 14: k2 LIKE 'a%'\n" +
" partitions=2/3\n" +
" rollup: m1\n" +
" tabletRatio=6/6",
" partitions=2/3\n",
" TABLE: mv0\n" +
" PREAGGREGATION: ON\n" +
" PREDICATES: 9: k1 > 1, 10: k2 LIKE 'a%'\n" +
" partitions=1/1\n" +
" rollup: mv0\n" +
" tabletRatio=3/3",
" partitions=1/1\n",
" TABLE: m1\n" +
" PREAGGREGATION: ON\n" +
" PREDICATES: 13: k1 > 1, 14: k2 LIKE 'a%'\n" +
" partitions=2/3\n" +
" rollup: m1\n" +
" tabletRatio=6/6",
" partitions=2/3\n",
" TABLE: mv0\n" +
" PREAGGREGATION: ON\n" +
" PREDICATES: 9: k1 > 0, 10: k2 LIKE 'a%'\n" +
" partitions=1/1\n" +
" rollup: mv0\n" +
" tabletRatio=3/3",
" partitions=1/1\n",
" TABLE: m1\n" +
" PREAGGREGATION: ON\n" +
" PREDICATES: 13: k1 > 0, 14: k2 LIKE 'a%'\n" +
" partitions=2/3\n" +
" rollup: m1\n" +
" tabletRatio=6/6",
" partitions=2/3\n",
};
for (int i = 0; i < sqls.length; i++) {
String query = sqls[i];
Expand Down
Loading