Skip to content

Commit 9657a6a

Browse files
ishaoxyyuancu
authored andcommitted
Support Streamstats command with calcite (opensearch-project#4297)
* support streamstats simply Signed-off-by: Xinyu Hao <[email protected]> * add some tests Signed-off-by: Xinyu Hao <[email protected]> * add UT Signed-off-by: Xinyu Hao <[email protected]> * fix some error Signed-off-by: Xinyu Hao <[email protected]> * add global Signed-off-by: Xinyu Hao <[email protected]> * implement global Signed-off-by: Xinyu Hao <[email protected]> * implement reset Signed-off-by: Xinyu Hao <[email protected]> * implement all the arguments Signed-off-by: Xinyu Hao <[email protected]> * fix test Signed-off-by: Xinyu Hao <[email protected]> * add all IT, UT and rst doc Signed-off-by: Xinyu Hao <[email protected]> * fix anonymizer test Signed-off-by: Xinyu Hao <[email protected]> * fix doctest Signed-off-by: Xinyu Hao <[email protected]> * modify doc and IT Signed-off-by: Xinyu Hao <[email protected]> * add explainIT Signed-off-by: Xinyu Hao <[email protected]> * fix import Signed-off-by: Xinyu Hao <[email protected]> * fix typo Signed-off-by: Xinyu Hao <[email protected]> * fix doctest Signed-off-by: Xinyu Hao <[email protected]> * fix explainIT yaml format Signed-off-by: Xinyu Hao <[email protected]> * fix dc nopushdown explainIT Signed-off-by: Xinyu Hao <[email protected]> * add explainIT for path2 and path3 Signed-off-by: Xinyu Hao <[email protected]> * typo error Signed-off-by: Xinyu Hao <[email protected]> * handle resort case Signed-off-by: Xinyu Hao <[email protected]> * fix IT Signed-off-by: Xinyu Hao <[email protected]> * change row_num Signed-off-by: Xinyu Hao <[email protected]> * Rule out aggregator from PPLAggregateMergeRule Signed-off-by: Xinyu Hao <[email protected]> * Rule out aggregator from PPLAggregateMergeRule Signed-off-by: Xinyu Hao <[email protected]> * fix explainIT Signed-off-by: Xinyu Hao <[email protected]> --------- Signed-off-by: Xinyu Hao <[email protected]> Signed-off-by: Yuanchun Shen <[email protected]> Co-authored-by: Yuanchun Shen <[email protected]>
1 parent 93da3e6 commit 9657a6a

34 files changed

+2504
-26
lines changed

core/src/main/java/org/opensearch/sql/analysis/Analyzer.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@
9292
import org.opensearch.sql.ast.tree.Search;
9393
import org.opensearch.sql.ast.tree.Sort;
9494
import org.opensearch.sql.ast.tree.Sort.SortOption;
95+
import org.opensearch.sql.ast.tree.StreamWindow;
9596
import org.opensearch.sql.ast.tree.SubqueryAlias;
9697
import org.opensearch.sql.ast.tree.TableFunction;
9798
import org.opensearch.sql.ast.tree.Timechart;
@@ -748,6 +749,11 @@ public LogicalPlan visitTrendline(Trendline node, AnalysisContext context) {
748749
computationsAndTypes.build());
749750
}
750751

752+
@Override
753+
public LogicalPlan visitStreamWindow(StreamWindow node, AnalysisContext context) {
754+
throw getOnlyForCalciteException("Streamstats");
755+
}
756+
751757
@Override
752758
public LogicalPlan visitFlatten(Flatten node, AnalysisContext context) {
753759
throw getOnlyForCalciteException("Flatten");

core/src/main/java/org/opensearch/sql/ast/AbstractNodeVisitor.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@
7979
import org.opensearch.sql.ast.tree.SPath;
8080
import org.opensearch.sql.ast.tree.Search;
8181
import org.opensearch.sql.ast.tree.Sort;
82+
import org.opensearch.sql.ast.tree.StreamWindow;
8283
import org.opensearch.sql.ast.tree.SubqueryAlias;
8384
import org.opensearch.sql.ast.tree.TableFunction;
8485
import org.opensearch.sql.ast.tree.Timechart;
@@ -410,6 +411,10 @@ public T visitWindow(Window window, C context) {
410411
return visitChildren(window, context);
411412
}
412413

414+
public T visitStreamWindow(StreamWindow node, C context) {
415+
return visitChildren(node, context);
416+
}
417+
413418
public T visitJoin(Join node, C context) {
414419
return visitChildren(node, context);
415420
}
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.ast.tree;
7+
8+
import com.google.common.collect.ImmutableList;
9+
import java.util.List;
10+
import lombok.EqualsAndHashCode;
11+
import lombok.Getter;
12+
import lombok.ToString;
13+
import org.opensearch.sql.ast.AbstractNodeVisitor;
14+
import org.opensearch.sql.ast.expression.UnresolvedExpression;
15+
16+
@Getter
17+
@ToString
18+
@EqualsAndHashCode(callSuper = false)
19+
public class StreamWindow extends UnresolvedPlan {
20+
21+
private final List<UnresolvedExpression> windowFunctionList;
22+
private final List<UnresolvedExpression> groupList;
23+
private final boolean current;
24+
private final int window;
25+
private final boolean global;
26+
private final UnresolvedExpression resetBefore;
27+
private final UnresolvedExpression resetAfter;
28+
@ToString.Exclude private UnresolvedPlan child;
29+
30+
/** StreamWindow Constructor. */
31+
public StreamWindow(
32+
List<UnresolvedExpression> windowFunctionList,
33+
List<UnresolvedExpression> groupList,
34+
boolean current,
35+
int window,
36+
boolean global,
37+
UnresolvedExpression resetBefore,
38+
UnresolvedExpression resetAfter) {
39+
this.windowFunctionList = windowFunctionList;
40+
this.groupList = groupList;
41+
this.current = current;
42+
this.window = window;
43+
this.global = global;
44+
this.resetBefore = resetBefore;
45+
this.resetAfter = resetAfter;
46+
}
47+
48+
public boolean isCurrent() {
49+
return current;
50+
}
51+
52+
public boolean isGlobal() {
53+
return global;
54+
}
55+
56+
@Override
57+
public StreamWindow attach(UnresolvedPlan child) {
58+
this.child = child;
59+
return this;
60+
}
61+
62+
@Override
63+
public List<UnresolvedPlan> getChild() {
64+
return this.child == null ? ImmutableList.of() : ImmutableList.of(this.child);
65+
}
66+
67+
@Override
68+
public <T, C> T accept(AbstractNodeVisitor<T, C> nodeVisitor, C context) {
69+
return nodeVisitor.visitStreamWindow(this, context);
70+
}
71+
}

0 commit comments

Comments
 (0)