Skip to content

Commit a64326e

Browse files
committed
Handle common agg functions for OTHER category for timechart
Signed-off-by: Yuanchun Shen <[email protected]>
1 parent 0257aa5 commit a64326e

File tree

2 files changed

+202
-25
lines changed

2 files changed

+202
-25
lines changed

core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java

Lines changed: 82 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1926,7 +1926,7 @@ public RelNode visitFlatten(Flatten node, CalcitePlanContext context) {
19261926
}
19271927

19281928
/** Helper method to get the function name for proper column naming */
1929-
private String getValueFunctionName(UnresolvedExpression aggregateFunction) {
1929+
private String getAggFieldAlias(UnresolvedExpression aggregateFunction) {
19301930
if (aggregateFunction instanceof Alias) {
19311931
return ((Alias) aggregateFunction).getName();
19321932
}
@@ -1976,15 +1976,15 @@ public RelNode visitTimechart(
19761976

19771977
// Handle no by field case
19781978
if (node.getByField() == null) {
1979-
String valueFunctionName = getValueFunctionName(node.getAggregateFunction());
1979+
String aggFieldAlias = getAggFieldAlias(node.getAggregateFunction());
19801980

19811981
// Create group expression list with just the timestamp span but use a different alias
19821982
// to avoid @timestamp naming conflict
19831983
List<UnresolvedExpression> simpleGroupExprList = new ArrayList<>();
19841984
simpleGroupExprList.add(new Alias("timestamp", spanExpr));
19851985
// Create agg expression list with the aggregate function
19861986
List<UnresolvedExpression> simpleAggExprList =
1987-
List.of(new Alias(valueFunctionName, node.getAggregateFunction()));
1987+
List.of(new Alias(aggFieldAlias, node.getAggregateFunction()));
19881988
// Create an Aggregation object
19891989
Aggregation aggregation =
19901990
new Aggregation(
@@ -1999,9 +1999,9 @@ public RelNode visitTimechart(
19991999
context.relBuilder.push(result);
20002000
// Reorder fields: timestamp first, then count
20012001
context.relBuilder.project(
2002-
context.relBuilder.field("timestamp"), context.relBuilder.field(valueFunctionName));
2002+
context.relBuilder.field("timestamp"), context.relBuilder.field(aggFieldAlias));
20032003
// Rename timestamp to @timestamp
2004-
context.relBuilder.rename(List.of("@timestamp", valueFunctionName));
2004+
context.relBuilder.rename(List.of("@timestamp", aggFieldAlias));
20052005

20062006
context.relBuilder.sort(context.relBuilder.field(0));
20072007
return context.relBuilder.peek();
@@ -2010,7 +2010,7 @@ public RelNode visitTimechart(
20102010
// Extract parameters for byField case
20112011
UnresolvedExpression byField = node.getByField();
20122012
String byFieldName = ((Field) byField).getField().toString();
2013-
String valueFunctionName = getValueFunctionName(node.getAggregateFunction());
2013+
String aggFieldAlias = getAggFieldAlias(node.getAggregateFunction());
20142014

20152015
int limit = Optional.ofNullable(node.getLimit()).orElse(10);
20162016
boolean useOther = Optional.ofNullable(node.getUseOther()).orElse(true);
@@ -2037,11 +2037,11 @@ public RelNode visitTimechart(
20372037

20382038
// Handle no limit case - just sort and return with proper field aliases
20392039
if (limit == 0) {
2040-
// Add final projection with proper aliases: [@timestamp, byField, valueFunctionName]
2040+
// Add final projection with proper aliases: [@timestamp, byField, aggFieldAlias]
20412041
context.relBuilder.project(
20422042
context.relBuilder.alias(context.relBuilder.field(0), "@timestamp"),
20432043
context.relBuilder.alias(context.relBuilder.field(1), byFieldName),
2044-
context.relBuilder.alias(context.relBuilder.field(2), valueFunctionName));
2044+
context.relBuilder.alias(context.relBuilder.field(2), aggFieldAlias));
20452045
context.relBuilder.sort(context.relBuilder.field(0), context.relBuilder.field(1));
20462046
return context.relBuilder.peek();
20472047
}
@@ -2051,32 +2051,61 @@ public RelNode visitTimechart(
20512051

20522052
// Step 2: Find top N categories using window function approach (more efficient than separate
20532053
// aggregation)
2054-
RelNode topCategories = buildTopCategoriesQuery(completeResults, limit, context);
2054+
String aggFunctionName = getAggFunctionName(node.getAggregateFunction());
2055+
Optional<BuiltinFunctionName> aggFuncNameOptional = BuiltinFunctionName.of(aggFunctionName);
2056+
if (aggFuncNameOptional.isEmpty()) {
2057+
throw new IllegalArgumentException(
2058+
StringUtils.format("Unrecognized aggregation function: %s", aggFunctionName));
2059+
}
2060+
BuiltinFunctionName aggFunction = aggFuncNameOptional.get();
2061+
RelNode topCategories = buildTopCategoriesQuery(completeResults, limit, aggFunction, context);
20552062

20562063
// Step 3: Apply OTHER logic with single pass
20572064
return buildFinalResultWithOther(
2058-
completeResults, topCategories, byFieldName, valueFunctionName, useOther, limit, context);
2065+
completeResults,
2066+
topCategories,
2067+
byFieldName,
2068+
aggFunction,
2069+
aggFieldAlias,
2070+
useOther,
2071+
limit,
2072+
context);
20592073

20602074
} catch (Exception e) {
20612075
throw new RuntimeException("Error in visitTimechart: " + e.getMessage(), e);
20622076
}
20632077
}
20642078

2079+
private String getAggFunctionName(UnresolvedExpression aggregateFunction) {
2080+
if (aggregateFunction instanceof Alias alias) {
2081+
return getAggFunctionName(alias.getDelegated());
2082+
}
2083+
return ((AggregateFunction) aggregateFunction).getFuncName();
2084+
}
2085+
20652086
/** Build top categories query - simpler approach that works better with OTHER handling */
20662087
private RelNode buildTopCategoriesQuery(
2067-
RelNode completeResults, int limit, CalcitePlanContext context) {
2088+
RelNode completeResults,
2089+
int limit,
2090+
BuiltinFunctionName aggFunction,
2091+
CalcitePlanContext context) {
20682092
context.relBuilder.push(completeResults);
20692093

20702094
// Filter out null values when determining top categories - null should not count towards limit
20712095
context.relBuilder.filter(context.relBuilder.isNotNull(context.relBuilder.field(1)));
20722096

20732097
// Get totals for non-null categories - field positions: 0=@timestamp, 1=byField, 2=value
2098+
RexInputRef valueField = context.relBuilder.field(2);
2099+
AggCall call = buildAggCall(context.relBuilder, aggFunction, valueField);
2100+
20742101
context.relBuilder.aggregate(
2075-
context.relBuilder.groupKey(context.relBuilder.field(1)),
2076-
context.relBuilder.sum(context.relBuilder.field(2)).as("grand_total"));
2102+
context.relBuilder.groupKey(context.relBuilder.field(1)), call.as("grand_total"));
20772103

20782104
// Apply sorting and limit to non-null categories only
2079-
context.relBuilder.sort(context.relBuilder.desc(context.relBuilder.field("grand_total")));
2105+
RexNode sortField = context.relBuilder.field("grand_total");
2106+
sortField =
2107+
aggFunction == BuiltinFunctionName.MIN ? sortField : context.relBuilder.desc(sortField);
2108+
context.relBuilder.sort(sortField);
20802109
if (limit > 0) {
20812110
context.relBuilder.limit(0, limit);
20822111
}
@@ -2089,18 +2118,25 @@ private RelNode buildFinalResultWithOther(
20892118
RelNode completeResults,
20902119
RelNode topCategories,
20912120
String byFieldName,
2092-
String valueFunctionName,
2121+
BuiltinFunctionName aggFunction,
2122+
String aggFieldAlias,
20932123
boolean useOther,
20942124
int limit,
20952125
CalcitePlanContext context) {
20962126

20972127
// Use zero-filling for count aggregations, standard result for others
2098-
if (valueFunctionName.equals("count")) {
2128+
if (aggFieldAlias.equals("count")) {
20992129
return buildZeroFilledResult(
2100-
completeResults, topCategories, byFieldName, valueFunctionName, useOther, limit, context);
2130+
completeResults, topCategories, byFieldName, aggFieldAlias, useOther, limit, context);
21012131
} else {
21022132
return buildStandardResult(
2103-
completeResults, topCategories, byFieldName, valueFunctionName, useOther, context);
2133+
completeResults,
2134+
topCategories,
2135+
byFieldName,
2136+
aggFunction,
2137+
aggFieldAlias,
2138+
useOther,
2139+
context);
21042140
}
21052141
}
21062142

@@ -2109,7 +2145,8 @@ private RelNode buildStandardResult(
21092145
RelNode completeResults,
21102146
RelNode topCategories,
21112147
String byFieldName,
2112-
String valueFunctionName,
2148+
BuiltinFunctionName aggFunctionName,
2149+
String aggFieldAlias,
21132150
boolean useOther,
21142151
CalcitePlanContext context) {
21152152

@@ -2132,11 +2169,13 @@ private RelNode buildStandardResult(
21322169
context.relBuilder.project(
21332170
context.relBuilder.alias(context.relBuilder.field(0), "@timestamp"),
21342171
context.relBuilder.alias(categoryExpr, byFieldName),
2135-
context.relBuilder.alias(context.relBuilder.field(2), valueFunctionName));
2172+
context.relBuilder.alias(context.relBuilder.field(2), aggFieldAlias));
21362173

2174+
RexInputRef valueField = context.relBuilder.field(2);
2175+
AggCall aggCall = buildAggCall(context.relBuilder, aggFunctionName, valueField);
21372176
context.relBuilder.aggregate(
21382177
context.relBuilder.groupKey(context.relBuilder.field(0), context.relBuilder.field(1)),
2139-
context.relBuilder.sum(context.relBuilder.field(2)).as(valueFunctionName));
2178+
aggCall.as(aggFieldAlias));
21402179

21412180
applyFiltersAndSort(useOther, context);
21422181
return context.relBuilder.peek();
@@ -2171,7 +2210,7 @@ private RelNode buildZeroFilledResult(
21712210
RelNode completeResults,
21722211
RelNode topCategories,
21732212
String byFieldName,
2174-
String valueFunctionName,
2213+
String aggFieldAlias,
21752214
boolean useOther,
21762215
int limit,
21772216
CalcitePlanContext context) {
@@ -2210,7 +2249,7 @@ private RelNode buildZeroFilledResult(
22102249
context.relBuilder.cast(context.relBuilder.field(0), SqlTypeName.TIMESTAMP),
22112250
"@timestamp"),
22122251
context.relBuilder.alias(context.relBuilder.field(1), byFieldName),
2213-
context.relBuilder.alias(context.relBuilder.literal(0), valueFunctionName));
2252+
context.relBuilder.alias(context.relBuilder.literal(0), aggFieldAlias));
22142253
RelNode zeroFilledCombinations = context.relBuilder.build();
22152254

22162255
// Get actual results with OTHER logic applied
@@ -2232,7 +2271,7 @@ private RelNode buildZeroFilledResult(
22322271
context.relBuilder.cast(context.relBuilder.field(0), SqlTypeName.TIMESTAMP),
22332272
"@timestamp"),
22342273
context.relBuilder.alias(actualCategoryExpr, byFieldName),
2235-
context.relBuilder.alias(context.relBuilder.field(2), valueFunctionName));
2274+
context.relBuilder.alias(context.relBuilder.field(2), aggFieldAlias));
22362275

22372276
context.relBuilder.aggregate(
22382277
context.relBuilder.groupKey(context.relBuilder.field(0), context.relBuilder.field(1)),
@@ -2247,12 +2286,30 @@ private RelNode buildZeroFilledResult(
22472286
// Aggregate to combine actual and zero-filled data
22482287
context.relBuilder.aggregate(
22492288
context.relBuilder.groupKey(context.relBuilder.field(0), context.relBuilder.field(1)),
2250-
context.relBuilder.sum(context.relBuilder.field(2)).as(valueFunctionName));
2289+
context.relBuilder.sum(context.relBuilder.field(2)).as(aggFieldAlias));
22512290

22522291
applyFiltersAndSort(useOther, context);
22532292
return context.relBuilder.peek();
22542293
}
22552294

2295+
/**
2296+
* Aggregate a field based on a given built-in aggregation function name.
2297+
*
2298+
* <p>It is intended for secondary aggregations in timechart and chart commands. Using it
2299+
* elsewhere may lead to unintended results. It handles explicitly only MIN, MAX, AVG, COUNT,
2300+
* DISTINCT_COUNT, EARLIEST, and LATEST. It sums the results for the rest aggregation types,
2301+
* assuming them to be accumulative.
2302+
*/
2303+
private AggCall buildAggCall(
2304+
RelBuilder relBuilder, BuiltinFunctionName aggFunction, RexNode node) {
2305+
return switch (aggFunction) {
2306+
case MIN, EARLIEST -> relBuilder.min(node);
2307+
case MAX, LATEST -> relBuilder.max(node);
2308+
case AVG -> relBuilder.avg(node);
2309+
default -> relBuilder.sum(node);
2310+
};
2311+
}
2312+
22562313
@Override
22572314
public RelNode visitTrendline(Trendline node, CalcitePlanContext context) {
22582315
visitChildren(node, context);
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
setup:
2+
- do:
3+
query.settings:
4+
body:
5+
transient:
6+
plugins.calcite.enabled : true
7+
- do:
8+
indices.create:
9+
index: test_timechart_4582
10+
body:
11+
mappings:
12+
properties:
13+
"@timestamp":
14+
type: date_nanos
15+
severityNumber:
16+
type: long
17+
severityText:
18+
type: keyword
19+
body:
20+
type: text
21+
- do:
22+
bulk:
23+
index: test_timechart_4582
24+
refresh: true
25+
body:
26+
- '{"index": {}}'
27+
- '{"@timestamp": "2024-01-15T10:30:04.567890123Z", "severityNumber": 9, "severityText": "INFO", "body": "Info message"}'
28+
- '{"index": {}}'
29+
- '{"@timestamp": "2024-01-15T10:30:05.567890123Z", "severityNumber": 13, "severityText": "WARN", "body": "Warning message"}'
30+
- '{"index": {}}'
31+
- '{"@timestamp": "2024-01-15T10:30:06.567890123Z", "severityNumber": 17, "severityText": "ERROR", "body": "Error message"}'
32+
- '{"index": {}}'
33+
- '{"@timestamp": "2024-01-15T10:30:07.567890123Z", "severityNumber": 21, "severityText": "FATAL", "body": "Fatal message"}'
34+
- '{"index": {}}'
35+
- '{"@timestamp": "2024-01-15T10:30:08.567890123Z", "severityNumber": 24, "severityText": "FATAL4", "body": "Fatal4 message"}'
36+
- '{"index": {}}'
37+
- '{"@timestamp": "2024-01-15T10:30:09.567890123Z", "severityNumber": 23, "severityText": "DEBUG", "body": "Debug message"}'
38+
- '{"index": {}}'
39+
- '{"@timestamp": "2024-01-15T10:30:10.567890123Z", "severityNumber": 20, "severityText": "TRACE", "body": "Trace message"}'
40+
- '{"index": {}}'
41+
- '{"@timestamp": "2024-01-15T10:30:11.567890123Z", "severityNumber": 22, "severityText": "CUSTOM", "body": "Custom message"}'
42+
43+
---
44+
teardown:
45+
- do:
46+
query.settings:
47+
body:
48+
transient:
49+
plugins.calcite.enabled : false
50+
51+
---
52+
"timechart max aggregation with limit should not sum OTHER values":
53+
- skip:
54+
features:
55+
- headers
56+
- allowed_warnings
57+
- do:
58+
headers:
59+
Content-Type: 'application/json'
60+
ppl:
61+
body:
62+
query: source=test_timechart_4582 | timechart limit=1 span=10seconds max(severityNumber) by severityText
63+
64+
- match: { total: 3 }
65+
- match: { "schema": [{"name": "@timestamp", "type": "timestamp"}, {"name": "severityText", "type": "string"}, {"name": "max(severityNumber)", "type": "bigint"}] }
66+
- match: { "datarows": [["2024-01-15 10:30:00", "FATAL4", 24], ["2024-01-15 10:30:00", "OTHER", 23], ["2024-01-15 10:30:10", "OTHER",22]] }
67+
68+
---
69+
"timechart min aggregation with limit should not sum OTHER values":
70+
- skip:
71+
features:
72+
- headers
73+
- allowed_warnings
74+
- do:
75+
headers:
76+
Content-Type: 'application/json'
77+
ppl:
78+
body:
79+
query: source=test_timechart_4582 | timechart limit=2 span=1d min(severityNumber) by severityText
80+
81+
- match: { total: 3 }
82+
- match: { "schema": [{"name": "@timestamp", "type": "timestamp"}, {"name": "severityText", "type": "string"}, {"name": "min(severityNumber)", "type": "bigint"}] }
83+
- match: { "datarows": [["2024-01-15 00:00:00", "INFO", 9], ["2024-01-15 00:00:00", "OTHER", 17], ["2024-01-15 00:00:00", "WARN", 13]] }
84+
85+
---
86+
"timechart earliest aggregation with limit should not sum OTHER values":
87+
- skip:
88+
features:
89+
- headers
90+
- allowed_warnings
91+
- do:
92+
headers:
93+
Content-Type: 'application/json'
94+
ppl:
95+
body:
96+
query: source=test_timechart_4582 | timechart limit=2 span=30seconds earliest(@timestamp) by severityText
97+
98+
- match: { total: 3 }
99+
- match: { "schema": [{"name": "@timestamp", "type": "timestamp"}, {"name": "severityText", "type": "string"}, {"name": "earliest(@timestamp)", "type": "timestamp"}] }
100+
- match: { "datarows": [
101+
["2024-01-15 10:30:00", "CUSTOM", "2024-01-15 10:30:11.567890123"],
102+
["2024-01-15 10:30:00", "OTHER", "2024-01-15 10:30:04.567890123"],
103+
["2024-01-15 10:30:00", "TRACE", "2024-01-15 10:30:10.567890123"]] }
104+
105+
---
106+
"timechart count aggregation with limit should sum OTHER values":
107+
- skip:
108+
features:
109+
- headers
110+
- allowed_warnings
111+
- do:
112+
headers:
113+
Content-Type: 'application/json'
114+
ppl:
115+
body:
116+
query: source=test_timechart_4582 | timechart limit=3 span=1min count() by severityText
117+
118+
- match: { total: 4 }
119+
- match: { "schema": [{"name": "@timestamp", "type": "timestamp"}, {"name": "severityText", "type": "string"}, {"name": "count", "type": "bigint"}] }
120+
- match: { "datarows": [["2024-01-15 10:30:00", "CUSTOM", 1], ["2024-01-15 10:30:00", "DEBUG", 1], ["2024-01-15 10:30:00", "ERROR", 1], ["2024-01-15 10:30:00", "OTHER", 5]] }

0 commit comments

Comments
 (0)