Skip to content

Commit 20e61a1

Browse files
committed
Introduce RelBuilder wrapper for dynamic fields
Signed-off-by: Tomoyuki Morita <[email protected]>
1 parent e76030f commit 20e61a1

File tree

16 files changed

+1534
-168
lines changed

16 files changed

+1534
-168
lines changed

core/src/main/java/org/opensearch/sql/calcite/CalcitePlanContext.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import org.apache.calcite.tools.FrameworkConfig;
2323
import org.apache.calcite.tools.RelBuilder;
2424
import org.opensearch.sql.ast.expression.UnresolvedExpression;
25+
import org.opensearch.sql.calcite.rel.RelBuilderWrapper;
26+
import org.opensearch.sql.calcite.rel.RelFieldBuilder;
2527
import org.opensearch.sql.calcite.utils.CalciteToolsHelper;
2628
import org.opensearch.sql.common.setting.Settings;
2729
import org.opensearch.sql.executor.QueryType;
@@ -31,7 +33,8 @@ public class CalcitePlanContext {
3133

3234
public FrameworkConfig config;
3335
public final Connection connection;
34-
public final RelBuilder relBuilder;
36+
public final RelBuilderWrapper relBuilder;
37+
public final RelFieldBuilder fieldBuilder;
3538
public final ExtendedRexBuilder rexBuilder;
3639
public final FunctionProperties functionProperties;
3740
public final QueryType queryType;
@@ -66,8 +69,10 @@ private CalcitePlanContext(FrameworkConfig config, SysLimit sysLimit, QueryType
6669
this.sysLimit = sysLimit;
6770
this.queryType = queryType;
6871
this.connection = CalciteToolsHelper.connect(config, TYPE_FACTORY);
69-
this.relBuilder = CalciteToolsHelper.create(config, TYPE_FACTORY, connection);
70-
this.rexBuilder = new ExtendedRexBuilder(relBuilder.getRexBuilder());
72+
RelBuilder rawRelBuilder = CalciteToolsHelper.create(config, TYPE_FACTORY, connection);
73+
this.relBuilder = new RelBuilderWrapper(rawRelBuilder);
74+
this.rexBuilder = new ExtendedRexBuilder(rawRelBuilder.getRexBuilder());
75+
this.fieldBuilder = new RelFieldBuilder(rawRelBuilder, this.rexBuilder);
7176
this.functionProperties = new FunctionProperties(QueryType.PPL);
7277
this.rexLambdaRefMap = new HashMap<>();
7378
}

core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java

Lines changed: 165 additions & 132 deletions
Large diffs are not rendered by default.

core/src/main/java/org/opensearch/sql/calcite/CalciteRexNodeVisitor.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@
7272
import org.opensearch.sql.ast.tree.UnresolvedPlan;
7373
import org.opensearch.sql.calcite.plan.LogicalSystemLimit;
7474
import org.opensearch.sql.calcite.plan.LogicalSystemLimit.SystemLimitType;
75+
import org.opensearch.sql.calcite.rel.QualifiedNameResolver;
7576
import org.opensearch.sql.calcite.utils.OpenSearchTypeFactory;
7677
import org.opensearch.sql.calcite.utils.PlanUtils;
7778
import org.opensearch.sql.calcite.utils.SubsearchUtils;
@@ -548,7 +549,7 @@ private RelNode resolveSubqueryPlan(
548549
replacement,
549550
context.relBuilder.literal(context.sysLimit.subsearchLimit()));
550551
}
551-
PlanUtils.replaceTop(context.relBuilder, replacement);
552+
PlanUtils.replaceTop(context.relBuilder.getRawRelBuilder(), replacement);
552553
}
553554
// pop the inner plan
554555
RelNode subqueryRel = context.relBuilder.build();

core/src/main/java/org/opensearch/sql/calcite/SchemaUnifier.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,11 @@ public static List<RelNode> buildUnifiedSchemaWithConflictResolution(
6464
List<RelNode> renamedNodes = new ArrayList<>();
6565
for (RelNode node : projectedNodes) {
6666
RelNode renamedNode =
67-
context.relBuilder.push(node).project(context.relBuilder.fields(), uniqueNames).build();
67+
context
68+
.relBuilder
69+
.push(node)
70+
.project(context.fieldBuilder.staticFields(), uniqueNames)
71+
.build();
6872
renamedNodes.add(renamedNode);
6973
}
7074
return renamedNodes;

core/src/main/java/org/opensearch/sql/calcite/plan/PPLAggregateConvertRule.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@
3434
import org.apache.calcite.util.mapping.Mappings;
3535
import org.apache.commons.lang3.tuple.Pair;
3636
import org.immutables.value.Value;
37+
import org.opensearch.sql.calcite.rel.RelBuilderWrapper;
38+
import org.opensearch.sql.calcite.rel.RelFieldBuilder;
3739

3840
/**
3941
* Planner rule that converts specific aggCall to a more efficient expressions, which includes:
@@ -74,8 +76,10 @@ public void onMatch(RelOptRuleCall call) {
7476

7577
public void apply(RelOptRuleCall call, LogicalAggregate aggregate, LogicalProject project) {
7678

77-
final RelBuilder relBuilder = call.builder();
79+
final RelBuilder rawRelBuilder = call.builder();
80+
final RelBuilderWrapper relBuilder = new RelBuilderWrapper(rawRelBuilder);
7881
final RexBuilder rexBuilder = aggregate.getCluster().getRexBuilder();
82+
final RelFieldBuilder fieldBuilder = new RelFieldBuilder(rawRelBuilder, rexBuilder);
7983
relBuilder.push(project.getInput());
8084

8185
/*
@@ -213,13 +217,14 @@ public void apply(RelOptRuleCall call, LogicalAggregate aggregate, LogicalProjec
213217
distinctAggregateCalls.add(aggregateCall.transform(targetMapping));
214218
}
215219
// Project the used fields
216-
relBuilder.project(relBuilder.fields(fieldsUsed.stream().toList()));
220+
relBuilder.project(fieldBuilder.staticFields(fieldsUsed.stream().toList()));
217221
}
218222

219223
/* Build the final project-aggregate-project after eliminating unused fields */
220224
relBuilder.aggregate(relBuilder.groupKey(newGroupSet, newGroupSets), distinctAggregateCalls);
221225
List<RexNode> parentProjects =
222-
new ArrayList<>(relBuilder.fields(IntStream.range(0, groupSetOffset).boxed().toList()));
226+
new ArrayList<>(
227+
fieldBuilder.staticFields(IntStream.range(0, groupSetOffset).boxed().toList()));
223228
parentProjects.addAll(
224229
newExprOnAggCall.transform(
225230
(constructor, name) ->
@@ -267,7 +272,7 @@ private static RexNode convertToNewOperand(
267272
}
268273
}
269274

270-
private RexNode aliasMaybe(RelBuilder builder, RexNode node, String alias) {
275+
private RexNode aliasMaybe(RelBuilderWrapper builder, RexNode node, String alias) {
271276
return alias == null ? node : builder.alias(node, alias);
272277
}
273278

core/src/main/java/org/opensearch/sql/calcite/QualifiedNameResolver.java renamed to core/src/main/java/org/opensearch/sql/calcite/rel/QualifiedNameResolver.java

Lines changed: 29 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,9 @@
33
* SPDX-License-Identifier: Apache-2.0
44
*/
55

6-
package org.opensearch.sql.calcite;
6+
package org.opensearch.sql.calcite.rel;
7+
8+
import static org.opensearch.sql.calcite.plan.DynamicFieldsConstants.DYNAMIC_FIELDS_MAP;
79

810
import java.util.ArrayList;
911
import java.util.List;
@@ -16,7 +18,7 @@
1618
import org.apache.logging.log4j.LogManager;
1719
import org.apache.logging.log4j.Logger;
1820
import org.opensearch.sql.ast.expression.QualifiedName;
19-
import org.opensearch.sql.calcite.plan.DynamicFieldsConstants;
21+
import org.opensearch.sql.calcite.CalcitePlanContext;
2022
import org.opensearch.sql.expression.function.BuiltinFunctionName;
2123
import org.opensearch.sql.expression.function.PPLFuncImpTable;
2224

@@ -29,6 +31,25 @@ public class QualifiedNameResolver {
2931

3032
private static final Logger log = LogManager.getLogger(QualifiedNameResolver.class);
3133

34+
/** Resolve field in a specific input */
35+
public static Optional<RexNode> resolveField(
36+
int inputCount, int inputOrdinal, String fieldName, CalcitePlanContext context) {
37+
List<String> inputFieldNames = context.fieldBuilder.getAllFieldNames(inputCount, inputOrdinal);
38+
if (inputFieldNames.contains(fieldName)) {
39+
return Optional.of(context.fieldBuilder.staticField(inputCount, inputOrdinal, fieldName));
40+
} else if (context.fieldBuilder.isDynamicFieldsExist()) {
41+
return Optional.of(context.fieldBuilder.dynamicField(fieldName));
42+
}
43+
return Optional.empty();
44+
}
45+
46+
public static RexNode resolveFieldOrThrow(
47+
int inputCount, int inputOrdinal, String fieldName, CalcitePlanContext context) {
48+
return resolveField(inputCount, inputOrdinal, fieldName, context)
49+
.orElseThrow(
50+
() -> new IllegalArgumentException(String.format("Field [%s] not found.", fieldName)));
51+
}
52+
3253
/**
3354
* Resolves a qualified name to a RexNode based on the current context.
3455
*
@@ -130,10 +151,9 @@ private static Optional<RexNode> resolveDynamicFields(
130151
List<Set<String>> inputFieldNames = collectInputFieldNames(context, inputCount);
131152

132153
for (int i = 0; i < inputCount; i++) {
133-
if (inputFieldNames.get(i).contains(DynamicFieldsConstants.DYNAMIC_FIELDS_MAP)) {
154+
if (inputFieldNames.get(i).contains(DYNAMIC_FIELDS_MAP)) {
134155
String fieldName = String.join(".", parts);
135-
RexNode dynamicField =
136-
context.relBuilder.field(inputCount, i, DynamicFieldsConstants.DYNAMIC_FIELDS_MAP);
156+
RexNode dynamicField = context.relBuilder.field_(inputCount, i, DYNAMIC_FIELDS_MAP);
137157
RexNode itemAccess = createItemAccess(dynamicField, fieldName, context);
138158
return Optional.of(itemAccess);
139159
}
@@ -149,7 +169,7 @@ private static Optional<RexNode> tryToResolveField(
149169
fieldName,
150170
inputCount);
151171
try {
152-
return Optional.of(context.relBuilder.field(inputCount, alias, fieldName));
172+
return Optional.of(context.relBuilder.field_(inputCount, alias, fieldName));
153173
} catch (IllegalArgumentException e) {
154174
log.debug("tryToResolveField() failed: {}", e.getMessage());
155175
}
@@ -171,7 +191,7 @@ private static Optional<RexNode> resolveFieldWithoutAlias(
171191
int foundInput = findInputContainingFieldName(inputCount, inputFieldNames, fieldName);
172192
log.debug("resolveFieldWithoutAlias() foundInput={}", foundInput);
173193
if (foundInput != -1) {
174-
RexNode fieldNode = context.relBuilder.field(inputCount, foundInput, fieldName);
194+
RexNode fieldNode = context.relBuilder.field_(inputCount, foundInput, fieldName);
175195
return Optional.of(resolveFieldAccess(context, parts, 0, length, fieldNode));
176196
}
177197
}
@@ -219,7 +239,7 @@ private static Optional<RexNode> resolveRenamedField(
219239
String alias = parts.get(0);
220240
for (String candidate : candidates) {
221241
try {
222-
return Optional.of(context.relBuilder.field(alias, candidate));
242+
return Optional.of(context.relBuilder.field_(alias, candidate));
223243
} catch (IllegalArgumentException e1) {
224244
// Indicates the field was not found.
225245
}
@@ -260,7 +280,7 @@ private static Optional<RexNode> resolveCorrelationField(
260280
String fieldName = joinParts(parts, start, length);
261281
log.debug("resolveCorrelationField() trying fieldName={}", fieldName);
262282
if (fieldNameList.contains(fieldName)) {
263-
RexNode field = context.relBuilder.field(correlation, fieldName);
283+
RexNode field = context.relBuilder.field_(correlation, fieldName);
264284
return resolveFieldAccess(context, parts, start, length, field);
265285
}
266286
}

0 commit comments

Comments
 (0)