Skip to content

Commit bb78f22

Browse files
committed
Fix sub-fields accessing of generated structs (#4683)
* Correct subfield access logical when calling ITEM Signed-off-by: Yuanchun Shen <[email protected]> * Add explain and integration tests Signed-off-by: Yuanchun Shen <[email protected]> --------- Signed-off-by: Yuanchun Shen <[email protected]> (cherry picked from commit 3a42c51)
1 parent ac44f8b commit bb78f22

File tree

4 files changed

+90
-0
lines changed

4 files changed

+90
-0
lines changed

integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1462,4 +1462,27 @@ public void testTopKThenSortExplain() throws IOException {
14621462
+ "| sort age "
14631463
+ "| fields age"));
14641464
}
1465+
1466+
@Test
1467+
public void testGeoIpPushedInAgg() throws IOException {
1468+
// This explain IT verifies that externally registered UDF can be properly pushed down
1469+
assertYamlEqualsIgnoreId(
1470+
loadExpectedPlan("udf_geoip_in_agg_pushed.yaml"),
1471+
explainQueryYaml(
1472+
String.format(
1473+
"source=%s | eval info = geoip('my-datasource', host) | stats count() by info.city",
1474+
TEST_INDEX_WEBLOGS)));
1475+
}
1476+
1477+
@Test
1478+
public void testInternalItemAccessOnStructs() throws IOException {
1479+
String expected = loadExpectedPlan("access_struct_subfield_with_item.yaml");
1480+
assertYamlEqualsIgnoreId(
1481+
expected,
1482+
explainQueryYaml(
1483+
String.format(
1484+
"source=%s | eval info = geoip('dummy-datasource', host) | fields host, info,"
1485+
+ " info.dummy_sub_field",
1486+
TEST_INDEX_WEBLOGS)));
1487+
}
14651488
}

integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteGeoIpFunctionsIT.java

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,4 +55,54 @@ public void testGeoIpEnrichmentWithIpFieldAsInput() throws IOException {
5555
rows("10.0.0.1", Map.of("country", "USA")),
5656
rows("fd12:2345:6789:1:a1b2:c3d4:e5f6:789a", Map.of("country", "India")));
5757
}
58+
59+
@Test
60+
public void testGeoIpInAggregation() throws IOException {
61+
JSONObject result1 =
62+
executeQuery(
63+
String.format(
64+
"source=%s | where method='POST' | eval info = geoip('%s', host) | eval"
65+
+ " date=DATE('2020-12-10') | stats count() by info.city, method, span(date,"
66+
+ " 1month) as month",
67+
TEST_INDEX_WEBLOGS, DATASOURCE_NAME));
68+
verifySchema(
69+
result1,
70+
schema("count()", "bigint"),
71+
schema("month", "date"),
72+
schema("info.city", "string"),
73+
schema("method", "string"));
74+
verifyDataRows(
75+
result1,
76+
rows(1, "2020-12-01", "Seattle", "POST"),
77+
rows(1, "2020-12-01", "Bengaluru", "POST"));
78+
79+
// This case is pushed down into DSL with scripts
80+
JSONObject result2 =
81+
executeQuery(
82+
String.format(
83+
"source=%s | where method='POST' | eval info = geoip('%s', host) | stats count() by"
84+
+ " info.city",
85+
TEST_INDEX_WEBLOGS, DATASOURCE_NAME));
86+
verifySchema(result2, schema("count()", "bigint"), schema("info.city", "string"));
87+
verifyDataRows(result2, rows(1, "Seattle"), rows(1, "Bengaluru"));
88+
}
89+
90+
@Test
91+
public void testGeoIpEnrichmentAccessingSubField() throws IOException {
92+
JSONObject result =
93+
executeQuery(
94+
String.format(
95+
"source=%s | where method='POST' | eval info = geoip('%s', host) | fields host,"
96+
+ " info, info.country",
97+
TEST_INDEX_WEBLOGS, DATASOURCE_NAME));
98+
verifySchema(
99+
result, schema("host", "ip"), schema("info", "struct"), schema("info.country", "string"));
100+
verifyDataRows(
101+
result,
102+
rows("10.0.0.1", Map.of("country", "USA", "city", "Seattle"), "USA"),
103+
rows(
104+
"fd12:2345:6789:1:a1b2:c3d4:e5f6:789a",
105+
Map.of("country", "India", "city", "Bengaluru"),
106+
"India"));
107+
}
58108
}
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
calcite:
2+
logical: |
3+
LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])
4+
LogicalProject(host=[$0], info=[GEOIP('dummy-datasource':VARCHAR, $0)], info.dummy_sub_field=[ITEM(GEOIP('dummy-datasource':VARCHAR, $0), 'dummy_sub_field')])
5+
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_weblogs]])
6+
physical: |
7+
EnumerableCalc(expr#0=[{inputs}], expr#1=['dummy-datasource':VARCHAR], expr#2=[GEOIP($t1, $t0)], expr#3=['dummy_sub_field'], expr#4=[ITEM($t2, $t3)], host=[$t0], $f1=[$t2], $f2=[$t4])
8+
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_weblogs]], PushDownContext=[[PROJECT->[host], LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":10000,"timeout":"1m","_source":{"includes":["host"],"excludes":[]}}, requestedTotalSize=10000, pageSize=null, startFrom=0)])
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
calcite:
2+
logical: |
3+
LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])
4+
LogicalProject(host=[$0], info=[GEOIP('dummy-datasource':VARCHAR, $0)], info.dummy_sub_field=[ITEM(GEOIP('dummy-datasource':VARCHAR, $0), 'dummy_sub_field')])
5+
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_weblogs]])
6+
physical: |
7+
EnumerableLimit(fetch=[10000])
8+
EnumerableCalc(expr#0..11=[{inputs}], expr#12=['dummy-datasource':VARCHAR], expr#13=[GEOIP($t12, $t0)], expr#14=['dummy_sub_field'], expr#15=[ITEM($t13, $t14)], host=[$t0], info=[$t13], info.dummy_sub_field=[$t15])
9+
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_weblogs]])

0 commit comments

Comments
 (0)