Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1926,7 +1926,7 @@ public RelNode visitFlatten(Flatten node, CalcitePlanContext context) {
}

/** Helper method to get the function name for proper column naming */
private String getValueFunctionName(UnresolvedExpression aggregateFunction) {
private String getAggFieldAlias(UnresolvedExpression aggregateFunction) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getAggFieldAlias lacks precision. How about change to getMeasureAlias?

count(balance) as cnt

balance: aggregate Field
count(): aggregate function
count(balance): measure
cnt: alias of measure

Do not forget to update the java doc either.

Copy link
Collaborator Author

@yuancu yuancu Oct 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing out! I updated the name as getMetricAlias as metric conforms to how it is named in other parts of the codebase.

if (aggregateFunction instanceof Alias) {
return ((Alias) aggregateFunction).getName();
}
Expand Down Expand Up @@ -1976,15 +1976,15 @@ public RelNode visitTimechart(

// Handle no by field case
if (node.getByField() == null) {
String valueFunctionName = getValueFunctionName(node.getAggregateFunction());
String aggFieldAlias = getAggFieldAlias(node.getAggregateFunction());

// Create group expression list with just the timestamp span but use a different alias
// to avoid @timestamp naming conflict
List<UnresolvedExpression> simpleGroupExprList = new ArrayList<>();
simpleGroupExprList.add(new Alias("timestamp", spanExpr));
// Create agg expression list with the aggregate function
List<UnresolvedExpression> simpleAggExprList =
List.of(new Alias(valueFunctionName, node.getAggregateFunction()));
List.of(new Alias(aggFieldAlias, node.getAggregateFunction()));
// Create an Aggregation object
Aggregation aggregation =
new Aggregation(
Expand All @@ -1999,9 +1999,9 @@ public RelNode visitTimechart(
context.relBuilder.push(result);
// Reorder fields: timestamp first, then count
context.relBuilder.project(
context.relBuilder.field("timestamp"), context.relBuilder.field(valueFunctionName));
context.relBuilder.field("timestamp"), context.relBuilder.field(aggFieldAlias));
// Rename timestamp to @timestamp
context.relBuilder.rename(List.of("@timestamp", valueFunctionName));
context.relBuilder.rename(List.of("@timestamp", aggFieldAlias));

context.relBuilder.sort(context.relBuilder.field(0));
return context.relBuilder.peek();
Expand All @@ -2010,7 +2010,7 @@ public RelNode visitTimechart(
// Extract parameters for byField case
UnresolvedExpression byField = node.getByField();
String byFieldName = ((Field) byField).getField().toString();
String valueFunctionName = getValueFunctionName(node.getAggregateFunction());
String aggFieldAlias = getAggFieldAlias(node.getAggregateFunction());

int limit = Optional.ofNullable(node.getLimit()).orElse(10);
boolean useOther = Optional.ofNullable(node.getUseOther()).orElse(true);
Expand All @@ -2037,11 +2037,11 @@ public RelNode visitTimechart(

// Handle no limit case - just sort and return with proper field aliases
if (limit == 0) {
// Add final projection with proper aliases: [@timestamp, byField, valueFunctionName]
// Add final projection with proper aliases: [@timestamp, byField, aggFieldAlias]
context.relBuilder.project(
context.relBuilder.alias(context.relBuilder.field(0), "@timestamp"),
context.relBuilder.alias(context.relBuilder.field(1), byFieldName),
context.relBuilder.alias(context.relBuilder.field(2), valueFunctionName));
context.relBuilder.alias(context.relBuilder.field(2), aggFieldAlias));
context.relBuilder.sort(context.relBuilder.field(0), context.relBuilder.field(1));
return context.relBuilder.peek();
}
Expand All @@ -2051,32 +2051,61 @@ public RelNode visitTimechart(

// Step 2: Find top N categories using window function approach (more efficient than separate
// aggregation)
RelNode topCategories = buildTopCategoriesQuery(completeResults, limit, context);
String aggFunctionName = getAggFunctionName(node.getAggregateFunction());
Optional<BuiltinFunctionName> aggFuncNameOptional = BuiltinFunctionName.of(aggFunctionName);
if (aggFuncNameOptional.isEmpty()) {
throw new IllegalArgumentException(
StringUtils.format("Unrecognized aggregation function: %s", aggFunctionName));
}
BuiltinFunctionName aggFunction = aggFuncNameOptional.get();
RelNode topCategories = buildTopCategoriesQuery(completeResults, limit, aggFunction, context);

// Step 3: Apply OTHER logic with single pass
return buildFinalResultWithOther(
completeResults, topCategories, byFieldName, valueFunctionName, useOther, limit, context);
completeResults,
topCategories,
byFieldName,
aggFunction,
aggFieldAlias,
useOther,
limit,
context);

} catch (Exception e) {
throw new RuntimeException("Error in visitTimechart: " + e.getMessage(), e);
}
}

private String getAggFunctionName(UnresolvedExpression aggregateFunction) {
if (aggregateFunction instanceof Alias alias) {
return getAggFunctionName(alias.getDelegated());
}
return ((AggregateFunction) aggregateFunction).getFuncName();
}

/** Build top categories query - simpler approach that works better with OTHER handling */
private RelNode buildTopCategoriesQuery(
RelNode completeResults, int limit, CalcitePlanContext context) {
RelNode completeResults,
int limit,
BuiltinFunctionName aggFunction,
CalcitePlanContext context) {
context.relBuilder.push(completeResults);

// Filter out null values when determining top categories - null should not count towards limit
context.relBuilder.filter(context.relBuilder.isNotNull(context.relBuilder.field(1)));

// Get totals for non-null categories - field positions: 0=@timestamp, 1=byField, 2=value
RexInputRef valueField = context.relBuilder.field(2);
AggCall call = buildAggCall(context.relBuilder, aggFunction, valueField);

context.relBuilder.aggregate(
context.relBuilder.groupKey(context.relBuilder.field(1)),
context.relBuilder.sum(context.relBuilder.field(2)).as("grand_total"));
context.relBuilder.groupKey(context.relBuilder.field(1)), call.as("grand_total"));

// Apply sorting and limit to non-null categories only
context.relBuilder.sort(context.relBuilder.desc(context.relBuilder.field("grand_total")));
RexNode sortField = context.relBuilder.field("grand_total");
sortField =
aggFunction == BuiltinFunctionName.MIN ? sortField : context.relBuilder.desc(sortField);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the reason of handling MIN separately? Will it meet our expected behavior?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is because limit is supposed to keep top categories, yet the expectation for top categories varies with the actual aggregation function -- for MIN and EARLIEST, the smallest ones are supposed to be of the top categories, while for most of the rest aggregations (SUM, MAX, AVG, COUNT, etc), the greatest ones are.

context.relBuilder.sort(sortField);
if (limit > 0) {
context.relBuilder.limit(0, limit);
}
Expand All @@ -2089,18 +2118,25 @@ private RelNode buildFinalResultWithOther(
RelNode completeResults,
RelNode topCategories,
String byFieldName,
String valueFunctionName,
BuiltinFunctionName aggFunction,
String aggFieldAlias,
boolean useOther,
int limit,
CalcitePlanContext context) {

// Use zero-filling for count aggregations, standard result for others
if (valueFunctionName.equals("count")) {
if (aggFieldAlias.equals("count")) {
return buildZeroFilledResult(
completeResults, topCategories, byFieldName, valueFunctionName, useOther, limit, context);
completeResults, topCategories, byFieldName, aggFieldAlias, useOther, limit, context);
} else {
return buildStandardResult(
completeResults, topCategories, byFieldName, valueFunctionName, useOther, context);
completeResults,
topCategories,
byFieldName,
aggFunction,
aggFieldAlias,
useOther,
context);
}
}

Expand All @@ -2109,7 +2145,8 @@ private RelNode buildStandardResult(
RelNode completeResults,
RelNode topCategories,
String byFieldName,
String valueFunctionName,
BuiltinFunctionName aggFunctionName,
String aggFieldAlias,
boolean useOther,
CalcitePlanContext context) {

Expand All @@ -2132,11 +2169,13 @@ private RelNode buildStandardResult(
context.relBuilder.project(
context.relBuilder.alias(context.relBuilder.field(0), "@timestamp"),
context.relBuilder.alias(categoryExpr, byFieldName),
context.relBuilder.alias(context.relBuilder.field(2), valueFunctionName));
context.relBuilder.alias(context.relBuilder.field(2), aggFieldAlias));

RexInputRef valueField = context.relBuilder.field(2);
AggCall aggCall = buildAggCall(context.relBuilder, aggFunctionName, valueField);
context.relBuilder.aggregate(
context.relBuilder.groupKey(context.relBuilder.field(0), context.relBuilder.field(1)),
context.relBuilder.sum(context.relBuilder.field(2)).as(valueFunctionName));
aggCall.as(aggFieldAlias));

applyFiltersAndSort(useOther, context);
return context.relBuilder.peek();
Expand Down Expand Up @@ -2171,7 +2210,7 @@ private RelNode buildZeroFilledResult(
RelNode completeResults,
RelNode topCategories,
String byFieldName,
String valueFunctionName,
String aggFieldAlias,
boolean useOther,
int limit,
CalcitePlanContext context) {
Expand Down Expand Up @@ -2210,7 +2249,7 @@ private RelNode buildZeroFilledResult(
context.relBuilder.cast(context.relBuilder.field(0), SqlTypeName.TIMESTAMP),
"@timestamp"),
context.relBuilder.alias(context.relBuilder.field(1), byFieldName),
context.relBuilder.alias(context.relBuilder.literal(0), valueFunctionName));
context.relBuilder.alias(context.relBuilder.literal(0), aggFieldAlias));
RelNode zeroFilledCombinations = context.relBuilder.build();

// Get actual results with OTHER logic applied
Expand All @@ -2232,7 +2271,7 @@ private RelNode buildZeroFilledResult(
context.relBuilder.cast(context.relBuilder.field(0), SqlTypeName.TIMESTAMP),
"@timestamp"),
context.relBuilder.alias(actualCategoryExpr, byFieldName),
context.relBuilder.alias(context.relBuilder.field(2), valueFunctionName));
context.relBuilder.alias(context.relBuilder.field(2), aggFieldAlias));

context.relBuilder.aggregate(
context.relBuilder.groupKey(context.relBuilder.field(0), context.relBuilder.field(1)),
Expand All @@ -2247,12 +2286,30 @@ private RelNode buildZeroFilledResult(
// Aggregate to combine actual and zero-filled data
context.relBuilder.aggregate(
context.relBuilder.groupKey(context.relBuilder.field(0), context.relBuilder.field(1)),
context.relBuilder.sum(context.relBuilder.field(2)).as(valueFunctionName));
context.relBuilder.sum(context.relBuilder.field(2)).as(aggFieldAlias));

applyFiltersAndSort(useOther, context);
return context.relBuilder.peek();
}

/**
* Aggregate a field based on a given built-in aggregation function name.
*
* <p>It is intended for secondary aggregations in timechart and chart commands. Using it
* elsewhere may lead to unintended results. It handles explicitly only MIN, MAX, AVG, COUNT,
* DISTINCT_COUNT, EARLIEST, and LATEST. It sums the results for the rest aggregation types,
* assuming them to be accumulative.
*/
private AggCall buildAggCall(
RelBuilder relBuilder, BuiltinFunctionName aggFunction, RexNode node) {
return switch (aggFunction) {
case MIN, EARLIEST -> relBuilder.min(node);
case MAX, LATEST -> relBuilder.max(node);
case AVG -> relBuilder.avg(node);
default -> relBuilder.sum(node);
};
}

@Override
public RelNode visitTrendline(Trendline node, CalcitePlanContext context) {
visitChildren(node, context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -422,10 +422,7 @@ public void testExplainWithReverse() throws IOException {
@Test
public void testExplainWithTimechartAvg() throws IOException {
var result = explainQueryToString("source=events | timechart span=1m avg(cpu_usage) by host");
String expected =
!isPushdownDisabled()
? loadFromFile("expectedOutput/calcite/explain_timechart.yaml")
: loadFromFile("expectedOutput/calcite/explain_timechart_no_pushdown.yaml");
String expected = loadExpectedPlan("explain_timechart.yaml");
assertYamlEqualsJsonIgnoreId(expected, result);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,27 +183,13 @@ public void testTimechartWithLimit() throws IOException {
schema("host", "string"),
schema("avg(cpu_usage)", "double"));

// Verify we have rows for web-01, web-02, and OTHER
boolean foundWeb01 = false;
boolean foundWeb02 = false;
boolean foundOther = false;

for (int i = 0; i < result.getJSONArray("datarows").length(); i++) {
Object[] row = result.getJSONArray("datarows").getJSONArray(i).toList().toArray();
String label = (String) row[1];

if ("web-01".equals(label)) {
foundWeb01 = true;
} else if ("web-02".equals(label)) {
foundWeb02 = true;
} else if ("OTHER".equals(label)) {
foundOther = true;
}
}

assertTrue("web-01 not found in results", foundWeb01);
assertTrue("web-02 not found in results", foundWeb02);
assertTrue("OTHER category not found in results", foundOther);
verifyDataRows(
result,
rows("2024-07-01 00:00:00", "web-01", 45.2),
rows("2024-07-01 00:01:00", "OTHER", 38.7),
rows("2024-07-01 00:02:00", "web-01", 55.3),
rows("2024-07-01 00:03:00", "db-01", 42.1),
rows("2024-07-01 00:04:00", "OTHER", 41.8));
}

@Test
Expand Down Expand Up @@ -383,7 +369,7 @@ public void testTimechartWithLimitAndUseOther() throws IOException {

if ("OTHER".equals(host)) {
foundOther = true;
assertEquals(330.4, cpuUsage, 0.1);
assertEquals(41.3, cpuUsage, 0.1);
} else if ("web-03".equals(host)) {
foundWeb03 = true;
assertEquals(55.3, cpuUsage, 0.1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@ calcite:
logical: |
LogicalSystemLimit(sort0=[$0], sort1=[$1], dir0=[ASC], dir1=[ASC], fetch=[10000], type=[QUERY_SIZE_LIMIT])
LogicalSort(sort0=[$0], sort1=[$1], dir0=[ASC], dir1=[ASC])
LogicalAggregate(group=[{0, 1}], avg(cpu_usage)=[SUM($2)])
LogicalAggregate(group=[{0, 1}], avg(cpu_usage)=[AVG($2)])
LogicalProject(@timestamp=[$0], host=[CASE(IS NOT NULL($3), $1, CASE(IS NULL($1), null:NULL, 'OTHER'))], avg(cpu_usage)=[$2])
LogicalJoin(condition=[=($1, $3)], joinType=[left])
LogicalProject(@timestamp=[$1], host=[$0], $f2=[$2])
LogicalAggregate(group=[{0, 2}], agg#0=[AVG($1)])
LogicalProject(host=[$4], cpu_usage=[$7], $f3=[SPAN($1, 1, 'm')])
CalciteLogicalIndexScan(table=[[OpenSearch, events]])
LogicalSort(sort0=[$1], dir0=[DESC], fetch=[10])
LogicalAggregate(group=[{1}], grand_total=[SUM($2)])
LogicalAggregate(group=[{1}], grand_total=[AVG($2)])
LogicalFilter(condition=[IS NOT NULL($1)])
LogicalProject(@timestamp=[$1], host=[$0], $f2=[$2])
LogicalAggregate(group=[{0, 2}], agg#0=[AVG($1)])
Expand All @@ -19,19 +19,21 @@ calcite:
physical: |
EnumerableLimit(fetch=[10000])
EnumerableSort(sort0=[$0], sort1=[$1], dir0=[ASC], dir1=[ASC])
EnumerableAggregate(group=[{0, 1}], avg(cpu_usage)=[SUM($2)])
EnumerableCalc(expr#0..4=[{inputs}], expr#5=[IS NOT NULL($t3)], expr#6=[IS NULL($t1)], expr#7=[null:NULL], expr#8=['OTHER'], expr#9=[CASE($t6, $t7, $t8)], expr#10=[CASE($t5, $t1, $t9)], @timestamp=[$t0], host=[$t10], avg(cpu_usage)=[$t2])
EnumerableMergeJoin(condition=[=($1, $3)], joinType=[left])
EnumerableSort(sort0=[$1], dir0=[ASC])
EnumerableCalc(expr#0..3=[{inputs}], expr#4=[0], expr#5=[=($t3, $t4)], expr#6=[null:DOUBLE], expr#7=[CASE($t5, $t6, $t2)], expr#8=[/($t7, $t3)], @timestamp=[$t1], host=[$t0], $f2=[$t8])
EnumerableAggregate(group=[{0, 2}], agg#0=[$SUM0($1)], agg#1=[COUNT($1)])
EnumerableCalc(expr#0..2=[{inputs}], expr#3=[1], expr#4=['m'], expr#5=[SPAN($t2, $t3, $t4)], proj#0..1=[{exprs}], $f2=[$t5])
CalciteEnumerableIndexScan(table=[[OpenSearch, events]], PushDownContext=[[PROJECT->[host, cpu_usage, @timestamp]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"timeout":"1m","_source":{"includes":["host","cpu_usage","@timestamp"],"excludes":[]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])
EnumerableSort(sort0=[$0], dir0=[ASC])
EnumerableLimit(fetch=[10])
EnumerableSort(sort0=[$1], dir0=[DESC])
EnumerableAggregate(group=[{0}], grand_total=[SUM($1)])
EnumerableCalc(expr#0..3=[{inputs}], expr#4=[0], expr#5=[=($t3, $t4)], expr#6=[null:DOUBLE], expr#7=[CASE($t5, $t6, $t2)], expr#8=[/($t7, $t3)], host=[$t0], $f2=[$t8])
EnumerableAggregate(group=[{0, 2}], agg#0=[$SUM0($1)], agg#1=[COUNT($1)])
EnumerableCalc(expr#0..2=[{inputs}], expr#3=[1], expr#4=['m'], expr#5=[SPAN($t2, $t3, $t4)], proj#0..1=[{exprs}], $f2=[$t5])
CalciteEnumerableIndexScan(table=[[OpenSearch, events]], PushDownContext=[[PROJECT->[host, cpu_usage, @timestamp], FILTER->IS NOT NULL($0)], OpenSearchRequestBuilder(sourceBuilder={"from":0,"timeout":"1m","query":{"exists":{"field":"host","boost":1.0}},"_source":{"includes":["host","cpu_usage","@timestamp"],"excludes":[]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])
EnumerableCalc(expr#0..3=[{inputs}], expr#4=[0], expr#5=[=($t3, $t4)], expr#6=[null:DOUBLE], expr#7=[CASE($t5, $t6, $t2)], expr#8=[/($t7, $t3)], proj#0..1=[{exprs}], avg(cpu_usage)=[$t8])
EnumerableAggregate(group=[{0, 1}], agg#0=[$SUM0($2)], agg#1=[COUNT($2)])
EnumerableCalc(expr#0..4=[{inputs}], expr#5=[IS NOT NULL($t3)], expr#6=[IS NULL($t1)], expr#7=[null:NULL], expr#8=['OTHER'], expr#9=[CASE($t6, $t7, $t8)], expr#10=[CASE($t5, $t1, $t9)], @timestamp=[$t0], host=[$t10], avg(cpu_usage)=[$t2])
EnumerableMergeJoin(condition=[=($1, $3)], joinType=[left])
EnumerableSort(sort0=[$1], dir0=[ASC])
EnumerableCalc(expr#0..3=[{inputs}], expr#4=[0], expr#5=[=($t3, $t4)], expr#6=[null:DOUBLE], expr#7=[CASE($t5, $t6, $t2)], expr#8=[/($t7, $t3)], @timestamp=[$t1], host=[$t0], $f2=[$t8])
EnumerableAggregate(group=[{0, 2}], agg#0=[$SUM0($1)], agg#1=[COUNT($1)])
EnumerableCalc(expr#0..2=[{inputs}], expr#3=[1], expr#4=['m'], expr#5=[SPAN($t2, $t3, $t4)], proj#0..1=[{exprs}], $f2=[$t5])
CalciteEnumerableIndexScan(table=[[OpenSearch, events]], PushDownContext=[[PROJECT->[host, cpu_usage, @timestamp]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"timeout":"1m","_source":{"includes":["host","cpu_usage","@timestamp"],"excludes":[]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])
EnumerableSort(sort0=[$0], dir0=[ASC])
EnumerableLimit(fetch=[10])
EnumerableSort(sort0=[$1], dir0=[DESC])
EnumerableCalc(expr#0..2=[{inputs}], expr#3=[0], expr#4=[=($t2, $t3)], expr#5=[null:DOUBLE], expr#6=[CASE($t4, $t5, $t1)], expr#7=[/($t6, $t2)], host=[$t0], grand_total=[$t7])
EnumerableAggregate(group=[{0}], agg#0=[$SUM0($1)], agg#1=[COUNT($1)])
EnumerableCalc(expr#0..3=[{inputs}], expr#4=[0], expr#5=[=($t3, $t4)], expr#6=[null:DOUBLE], expr#7=[CASE($t5, $t6, $t2)], expr#8=[/($t7, $t3)], host=[$t0], $f2=[$t8])
EnumerableAggregate(group=[{0, 2}], agg#0=[$SUM0($1)], agg#1=[COUNT($1)])
EnumerableCalc(expr#0..2=[{inputs}], expr#3=[1], expr#4=['m'], expr#5=[SPAN($t2, $t3, $t4)], proj#0..1=[{exprs}], $f2=[$t5])
CalciteEnumerableIndexScan(table=[[OpenSearch, events]], PushDownContext=[[PROJECT->[host, cpu_usage, @timestamp], FILTER->IS NOT NULL($0)], OpenSearchRequestBuilder(sourceBuilder={"from":0,"timeout":"1m","query":{"exists":{"field":"host","boost":1.0}},"_source":{"includes":["host","cpu_usage","@timestamp"],"excludes":[]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])
Loading
Loading