Skip to content

Commit 350898d

Browse files
branch-4.0: [fix](nereids) fix broker load planner don't support multi file group #56372 (#56514)
Cherry-picked from #56372 Co-authored-by: Xin Liao <[email protected]>
1 parent e23179f commit 350898d

File tree

7 files changed

+204
-65
lines changed

7 files changed

+204
-65
lines changed

fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PlanTranslatorContext.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ public class PlanTranslatorContext {
6969
private final ConnectContext connectContext;
7070
private final List<PlanFragment> planFragments = Lists.newArrayList();
7171

72-
private final DescriptorTable descTable = new DescriptorTable();
72+
private DescriptorTable descTable;
7373

7474
private final RuntimeFilterTranslator translator;
7575

@@ -124,15 +124,28 @@ public PlanTranslatorContext(CascadesContext ctx) {
124124
this.translator = new RuntimeFilterTranslator(ctx.getRuntimeFilterContext());
125125
this.topnFilterContext = ctx.getTopnFilterContext();
126126
this.runtimeFilterV2Context = ctx.getRuntimeFilterV2Context();
127+
this.descTable = new DescriptorTable();
127128
}
128129

130+
public PlanTranslatorContext(CascadesContext ctx, DescriptorTable descTable) {
131+
this.connectContext = ctx.getConnectContext();
132+
this.translator = new RuntimeFilterTranslator(ctx.getRuntimeFilterContext());
133+
this.topnFilterContext = ctx.getTopnFilterContext();
134+
this.runtimeFilterV2Context = ctx.getRuntimeFilterV2Context();
135+
this.descTable = descTable;
136+
}
137+
138+
/**
139+
* Constructor for testing purposes with default values.
140+
*/
129141
@VisibleForTesting
130142
public PlanTranslatorContext() {
131143
this.connectContext = null;
132144
this.translator = null;
133145
this.topnFilterContext = new TopnFilterContext();
134146
IdGenerator<RuntimeFilterId> runtimeFilterIdGen = RuntimeFilterId.createGenerator();
135147
this.runtimeFilterV2Context = new RuntimeFilterContextV2(runtimeFilterIdGen);
148+
this.descTable = new DescriptorTable();
136149
}
137150

138151
/**

fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsLoadPlanInfoCollector.java

Lines changed: 34 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -96,12 +96,12 @@ public class NereidsLoadPlanInfoCollector extends DefaultPlanVisitor<Void, PlanT
9696
* store OlapTableSink and required information for FileLoadScanNode
9797
*/
9898
public static class LoadPlanInfo {
99-
private DescriptorTable descriptorTable;
10099
// the file source tuple's id, the tuple represents original columns reading from file
101100
private TupleId srcTupleId;
102101
// the file source tuple's slots
103102
private List<SlotId> srcSlotIds;
104103
// FileLoadScanNode's output tuple, represents remapped columns
104+
// For multiple file groups in Broker load, this destTuple is shared across all file groups
105105
private TupleDescriptor destTuple;
106106
// the map of slots in destTuple and its mapping expression
107107
private Map<SlotId, Expr> destSlotIdToExprMap;
@@ -119,10 +119,6 @@ public static class LoadPlanInfo {
119119
// OlapTableSink for dest table
120120
private OlapTableSink olapTableSink;
121121

122-
public DescriptorTable getDescriptorTable() {
123-
return descriptorTable;
124-
}
125-
126122
public TupleDescriptor getDestTuple() {
127123
return destTuple;
128124
}
@@ -264,13 +260,14 @@ public NereidsLoadPlanInfoCollector(OlapTable destTable, NereidsLoadTaskInfo tas
264260
/**
265261
* visit logical plan tree and create a LoadPlanInfo
266262
*/
267-
public LoadPlanInfo collectLoadPlanInfo(LogicalPlan logicalPlan) {
263+
public LoadPlanInfo collectLoadPlanInfo(LogicalPlan logicalPlan, DescriptorTable descTable,
264+
TupleDescriptor scanDescriptor) {
268265
this.logicalPlan = logicalPlan;
269266
CascadesContext cascadesContext = CascadesContext.initContext(new StatementContext(),
270267
logicalPlan, PhysicalProperties.ANY);
271-
PlanTranslatorContext context = new PlanTranslatorContext(cascadesContext);
268+
PlanTranslatorContext context = new PlanTranslatorContext(cascadesContext, descTable);
269+
loadPlanInfo.destTuple = scanDescriptor;
272270
logicalPlan.accept(this, context);
273-
loadPlanInfo.descriptorTable = context.getDescTable();
274271
return loadPlanInfo;
275272
}
276273

@@ -344,29 +341,40 @@ public Void visitLogicalProject(LogicalProject<? extends Plan> logicalProject, P
344341

345342
List<Expr> projectList = outputs.stream().map(e -> ExpressionTranslator.translate(e, context))
346343
.collect(Collectors.toList());
347-
List<Slot> slotList = outputs.stream().map(NamedExpression::toSlot).collect(Collectors.toList());
348-
349-
// ignore projectList's nullability and set the expr's nullable info same as dest table column
350-
// why do this? looks like be works in this way...
351-
// and we have to do some extra work in visitLogicalFilter because this ood behavior
352-
int size = slotList.size();
353-
List<Slot> newSlotList = new ArrayList<>(size);
354-
for (int i = 0; i < size; ++i) {
355-
SlotReference slot = (SlotReference) slotList.get(i);
356-
Column col = destTable.getColumn(slot.getName());
357-
if (col != null) {
358-
slot = slot.withColumn(col);
359-
if (col.isAutoInc()) {
360-
newSlotList.add(slot.withNullable(true));
344+
345+
// For Broker load with multiple file groups, all file groups share the same destTuple.
346+
// Create slots for destTuple only when processing the first file group (when slots are empty).
347+
// Subsequent file groups will reuse the slots created by the first file group.
348+
if (loadPlanInfo.destTuple.getSlots().isEmpty()) {
349+
List<Slot> slotList = outputs.stream().map(NamedExpression::toSlot).collect(Collectors.toList());
350+
351+
// ignore projectList's nullability and set the expr's nullable info same as
352+
// dest table column
353+
// why do this? looks like be works in this way...
354+
// and we have to do some extra work in visitLogicalFilter because this ood
355+
// behavior
356+
int size = slotList.size();
357+
List<Slot> newSlotList = new ArrayList<>(size);
358+
for (int i = 0; i < size; ++i) {
359+
SlotReference slot = (SlotReference) slotList.get(i);
360+
Column col = destTable.getColumn(slot.getName());
361+
if (col != null) {
362+
slot = slot.withColumn(col);
363+
if (col.isAutoInc()) {
364+
newSlotList.add(slot.withNullable(true));
365+
} else {
366+
newSlotList.add(slot.withNullable(col.isAllowNull()));
367+
}
361368
} else {
362-
newSlotList.add(slot.withNullable(col.isAllowNull()));
369+
newSlotList.add(slot);
363370
}
364-
} else {
365-
newSlotList.add(slot);
371+
}
372+
373+
for (Slot slot : newSlotList) {
374+
context.createSlotDesc(loadPlanInfo.destTuple, (SlotReference) slot, destTable);
366375
}
367376
}
368377

369-
loadPlanInfo.destTuple = generateTupleDesc(newSlotList, destTable, context);
370378
loadPlanInfo.destSlotIdToExprMap = Maps.newHashMap();
371379
List<SlotDescriptor> slotDescriptorList = loadPlanInfo.destTuple.getSlots();
372380
for (int i = 0; i < slotDescriptorList.size(); ++i) {

fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsLoadingTaskPlanner.java

Lines changed: 37 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.apache.doris.analysis.BrokerDesc;
2121
import org.apache.doris.analysis.DescriptorTable;
2222
import org.apache.doris.analysis.PartitionNames;
23+
import org.apache.doris.analysis.TupleDescriptor;
2324
import org.apache.doris.analysis.UserIdentity;
2425
import org.apache.doris.catalog.Column;
2526
import org.apache.doris.catalog.OlapTable;
@@ -74,7 +75,7 @@ public class NereidsLoadingTaskPlanner {
7475
private final boolean singleTabletLoadPerSink;
7576
private final boolean enableMemtableOnSinkNode;
7677
private UserIdentity userInfo;
77-
private DescriptorTable descTable;
78+
private final DescriptorTable descTable = new DescriptorTable();
7879

7980
// Output params
8081
private List<PlanFragment> fragments = Lists.newArrayList();
@@ -144,49 +145,59 @@ public void plan(TUniqueId loadId, List<List<TBrokerFileStatus>> fileStatusesLis
144145
}
145146

146147
Preconditions.checkState(!fileGroups.isEmpty() && fileGroups.size() == fileStatusesList.size());
147-
NereidsFileGroupInfo fileGroupInfo = new NereidsFileGroupInfo(loadJobId, txnId, table, brokerDesc,
148-
fileGroups.get(0), fileStatusesList.get(0), filesAdded, strictMode, loadParallelism);
149-
NereidsLoadScanProvider loadScanProvider = new NereidsLoadScanProvider(fileGroupInfo,
150-
partialUpdateInputColumns);
151-
NereidsParamCreateContext context = loadScanProvider.createLoadContext();
148+
152149
PartitionNames partitionNames = getPartitionNames();
153-
LogicalPlan streamLoadPlan = NereidsLoadUtils.createLoadPlan(fileGroupInfo, partitionNames, context,
154-
isPartialUpdate, partialUpdateNewKeyPolicy);
155150
long txnTimeout = timeoutS == 0 ? ConnectContext.get().getExecTimeoutS() : timeoutS;
156151
if (txnTimeout > Integer.MAX_VALUE) {
157152
txnTimeout = Integer.MAX_VALUE;
158153
}
159154
NereidsBrokerLoadTask nereidsBrokerLoadTask = new NereidsBrokerLoadTask(txnId, (int) txnTimeout,
160155
sendBatchParallelism,
161156
strictMode, enableMemtableOnSinkNode, partitionNames);
162-
NereidsLoadPlanInfoCollector planInfoCollector = new NereidsLoadPlanInfoCollector(table, nereidsBrokerLoadTask,
163-
loadId, dbId, isPartialUpdate ? TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS : TUniqueKeyUpdateMode.UPSERT,
164-
partialUpdateNewKeyPolicy, partialUpdateInputColumns, context.exprMap);
165-
NereidsLoadPlanInfoCollector.LoadPlanInfo loadPlanInfo = planInfoCollector.collectLoadPlanInfo(streamLoadPlan);
166-
descTable = loadPlanInfo.getDescriptorTable();
167-
FileLoadScanNode fileScanNode = new FileLoadScanNode(new PlanNodeId(0), loadPlanInfo.getDestTuple());
157+
158+
TupleDescriptor scanTupleDesc = descTable.createTupleDescriptor();
159+
scanTupleDesc.setTable(table);
160+
// Collect all file group infos, contexts, and load plan infos
168161
List<NereidsFileGroupInfo> fileGroupInfos = new ArrayList<>(fileGroups.size());
169162
List<NereidsParamCreateContext> contexts = new ArrayList<>(fileGroups.size());
170-
fileGroupInfos.add(fileGroupInfo);
171-
contexts.add(context);
172-
for (int i = 1; i < fileGroups.size(); ++i) {
173-
fileGroupInfos.add(new NereidsFileGroupInfo(loadJobId, txnId, table, brokerDesc,
174-
fileGroups.get(i), fileStatusesList.get(i), filesAdded, strictMode, loadParallelism));
175-
NereidsParamCreateContext paramCreateContext = new NereidsParamCreateContext();
176-
paramCreateContext.fileGroup = fileGroups.get(i);
177-
contexts.add(paramCreateContext);
163+
List<NereidsLoadPlanInfoCollector.LoadPlanInfo> loadPlanInfos =
164+
new ArrayList<>(fileGroups.size());
165+
166+
// Create a separate plan for each file group
167+
for (int i = 0; i < fileGroups.size(); ++i) {
168+
NereidsFileGroupInfo fileGroupInfo = new NereidsFileGroupInfo(loadJobId, txnId, table, brokerDesc,
169+
fileGroups.get(i), fileStatusesList.get(i), filesAdded, strictMode, loadParallelism);
170+
NereidsLoadScanProvider loadScanProvider = new NereidsLoadScanProvider(fileGroupInfo,
171+
partialUpdateInputColumns);
172+
NereidsParamCreateContext context = loadScanProvider.createLoadContext();
173+
LogicalPlan loadPlan = NereidsLoadUtils.createLoadPlan(fileGroupInfo, partitionNames, context,
174+
isPartialUpdate, partialUpdateNewKeyPolicy);
175+
176+
NereidsLoadPlanInfoCollector planInfoCollector = new NereidsLoadPlanInfoCollector(table,
177+
nereidsBrokerLoadTask, loadId, dbId,
178+
isPartialUpdate ? TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS : TUniqueKeyUpdateMode.UPSERT,
179+
partialUpdateNewKeyPolicy, partialUpdateInputColumns, context.exprMap);
180+
NereidsLoadPlanInfoCollector.LoadPlanInfo loadPlanInfo = planInfoCollector.collectLoadPlanInfo(loadPlan,
181+
descTable, scanTupleDesc);
182+
183+
fileGroupInfos.add(fileGroupInfo);
184+
contexts.add(context);
185+
loadPlanInfos.add(loadPlanInfo);
178186
}
179-
fileScanNode.finalizeForNereids(loadId, fileGroupInfos, contexts, loadPlanInfo);
187+
188+
// Create a single FileLoadScanNode for all file groups
189+
FileLoadScanNode fileScanNode = new FileLoadScanNode(new PlanNodeId(0), loadPlanInfos.get(0).getDestTuple());
190+
fileScanNode.finalizeForNereids(loadId, fileGroupInfos, contexts, loadPlanInfos);
180191
scanNodes.add(fileScanNode);
181192

182-
// 3. Plan fragment
193+
// Create plan fragment
183194
PlanFragment sinkFragment = new PlanFragment(new PlanFragmentId(0), fileScanNode, DataPartition.RANDOM);
184195
sinkFragment.setParallelExecNum(loadParallelism);
185-
sinkFragment.setSink(loadPlanInfo.getOlapTableSink());
196+
sinkFragment.setSink(loadPlanInfos.get(0).getOlapTableSink());
186197

187198
fragments.add(sinkFragment);
188199

189-
// 4. finalize
200+
// finalize
190201
for (PlanFragment fragment : fragments) {
191202
fragment.finalize(null);
192203
}

fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsStreamLoadPlanner.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
package org.apache.doris.nereids.load;
1919

2020
import org.apache.doris.analysis.BrokerDesc;
21+
import org.apache.doris.analysis.DescriptorTable;
22+
import org.apache.doris.analysis.TupleDescriptor;
2123
import org.apache.doris.catalog.AggregateType;
2224
import org.apache.doris.catalog.Column;
2325
import org.apache.doris.catalog.Database;
@@ -245,10 +247,14 @@ public TPipelineFragmentParams plan(TUniqueId loadId, int fragmentInstanceIdInde
245247
NereidsLoadPlanInfoCollector planInfoCollector = new NereidsLoadPlanInfoCollector(destTable, taskInfo, loadId,
246248
db.getId(), uniquekeyUpdateMode, partialUpdateNewRowPolicy, partialUpdateInputColumns,
247249
context.exprMap);
248-
NereidsLoadPlanInfoCollector.LoadPlanInfo loadPlanInfo = planInfoCollector.collectLoadPlanInfo(streamLoadPlan);
250+
DescriptorTable descriptorTable = new DescriptorTable();
251+
TupleDescriptor scanTupleDesc = descriptorTable.createTupleDescriptor();
252+
scanTupleDesc.setTable(destTable);
253+
NereidsLoadPlanInfoCollector.LoadPlanInfo loadPlanInfo = planInfoCollector.collectLoadPlanInfo(streamLoadPlan,
254+
descriptorTable, scanTupleDesc);
249255
FileLoadScanNode fileScanNode = new FileLoadScanNode(new PlanNodeId(0), loadPlanInfo.getDestTuple());
250256
fileScanNode.finalizeForNereids(loadId, Lists.newArrayList(fileGroupInfo), Lists.newArrayList(context),
251-
loadPlanInfo);
257+
Lists.newArrayList(loadPlanInfo));
252258
scanNode = fileScanNode;
253259

254260
// for stream load, we only need one fragment, ScanNode -> DataSink.
@@ -262,7 +268,7 @@ public TPipelineFragmentParams plan(TUniqueId loadId, int fragmentInstanceIdInde
262268
params.setProtocolVersion(PaloInternalServiceVersion.V1);
263269
params.setFragment(fragment.toThrift());
264270

265-
params.setDescTbl(loadPlanInfo.getDescriptorTable().toThrift());
271+
params.setDescTbl(descriptorTable.toThrift());
266272
params.setCoord(new TNetworkAddress(FrontendOptions.getLocalHostAddress(), Config.rpc_port));
267273
params.setCurrentConnectFe(new TNetworkAddress(FrontendOptions.getLocalHostAddress(), Config.rpc_port));
268274

fe/fe-core/src/main/java/org/apache/doris/planner/FileLoadScanNode.java

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -64,18 +64,13 @@ public FileLoadScanNode(PlanNodeId id, TupleDescriptor desc) {
6464
}
6565

6666
public void finalizeForNereids(TUniqueId loadId, List<NereidsFileGroupInfo> fileGroupInfos,
67-
List<NereidsParamCreateContext> contexts, NereidsLoadPlanInfoCollector.LoadPlanInfo loadPlanInfo)
67+
List<NereidsParamCreateContext> contexts, List<NereidsLoadPlanInfoCollector.LoadPlanInfo> loadPlanInfos)
6868
throws UserException {
6969
Preconditions.checkState(contexts.size() == fileGroupInfos.size(),
7070
contexts.size() + " vs. " + fileGroupInfos.size());
71-
List<Expr> preFilterList = loadPlanInfo.getPreFilterExprList();
72-
if (preFilterList != null) {
73-
addPreFilterConjuncts(preFilterList);
74-
}
75-
List<Expr> postFilterList = loadPlanInfo.getPostFilterExprList();
76-
if (postFilterList != null) {
77-
addConjuncts(postFilterList);
78-
}
71+
Preconditions.checkState(loadPlanInfos.size() == fileGroupInfos.size(),
72+
loadPlanInfos.size() + " vs. " + fileGroupInfos.size());
73+
7974
// ATTN: for load scan node, do not use backend policy in ExternalScanNode.
8075
// Because backend policy in ExternalScanNode may only contain compute backend.
8176
// But for load job, we should select backends from all backends, both compute and mix.
@@ -88,6 +83,16 @@ public void finalizeForNereids(TUniqueId loadId, List<NereidsFileGroupInfo> file
8883
for (int i = 0; i < contexts.size(); ++i) {
8984
NereidsParamCreateContext context = contexts.get(i);
9085
NereidsFileGroupInfo fileGroupInfo = fileGroupInfos.get(i);
86+
NereidsLoadPlanInfoCollector.LoadPlanInfo loadPlanInfo = loadPlanInfos.get(i);
87+
// Add filters for each file group's load plan info
88+
List<Expr> preFilterList = loadPlanInfo.getPreFilterExprList();
89+
if (preFilterList != null) {
90+
addPreFilterConjuncts(preFilterList);
91+
}
92+
List<Expr> postFilterList = loadPlanInfo.getPostFilterExprList();
93+
if (postFilterList != null) {
94+
addConjuncts(postFilterList);
95+
}
9196
context.params = loadPlanInfo.toFileScanRangeParams(loadId, fileGroupInfo);
9297
createScanRangeLocations(context, fileGroupInfo, localBackendPolicy);
9398
this.selectedSplitNum += fileGroupInfo.getFileStatuses().size();
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
-- This file is automatically generated. You should know what you did if you want to edit this
2+
-- !pr22666_1 --
3+
99510
4+
5+
-- !pr22666_2 --
6+
100490
7+

0 commit comments

Comments
 (0)