Skip to content

Commit a003e8c

Browse files
authored
Support serializing external OpenSearch UDFs at pushdown time (#4618)
* Supports serilizing external OpenSearch UDFs Signed-off-by: Yuanchun Shen <[email protected]> * Correct subfield access logical when calling ITEM Signed-off-by: Yuanchun Shen <[email protected]> * Resolve types of generated structs based on their values because their types are UNDEFINED Signed-off-by: Yuanchun Shen <[email protected]> * Add explain and integration tests for geoip Signed-off-by: Yuanchun Shen <[email protected]> --------- Signed-off-by: Yuanchun Shen <[email protected]>
1 parent 234f608 commit a003e8c

File tree

9 files changed

+141
-18
lines changed

9 files changed

+141
-18
lines changed

core/src/main/java/org/opensearch/sql/ast/expression/QualifiedName.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
@Getter
2323
@EqualsAndHashCode(callSuper = false)
2424
public class QualifiedName extends UnresolvedExpression {
25+
public static final String DELIMITER = ".";
2526
private final List<String> parts;
2627

2728
public QualifiedName(String name) {
@@ -94,7 +95,7 @@ public QualifiedName rest() {
9495
}
9596

9697
public String toString() {
97-
return String.join(".", this.parts);
98+
return String.join(DELIMITER, this.parts);
9899
}
99100

100101
@Override

core/src/main/java/org/opensearch/sql/calcite/QualifiedNameResolver.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -270,8 +270,10 @@ private static RexNode resolveFieldAccess(
270270
if (length == parts.size() - start) {
271271
return field;
272272
} else {
273-
String itemName = joinParts(parts, length + start, parts.size() - 1 - length);
274-
return createItemAccess(field, itemName, context);
273+
String itemName = joinParts(parts, length + start, parts.size() - length);
274+
return context.relBuilder.alias(
275+
createItemAccess(field, itemName, context),
276+
String.join(QualifiedName.DELIMITER, parts.subList(start, parts.size())));
275277
}
276278
}
277279

integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ public void init() throws Exception {
3939
loadIndex(Index.LOGS);
4040
loadIndex(Index.WORKER);
4141
loadIndex(Index.WORK_INFORMATION);
42+
loadIndex(Index.WEBLOG);
4243
}
4344

4445
@Override
@@ -1471,4 +1472,15 @@ public void testTopKThenSortExplain() throws IOException {
14711472
+ "| sort age "
14721473
+ "| fields age"));
14731474
}
1475+
1476+
@Test
1477+
public void testGeoIpPushedInAgg() throws IOException {
1478+
// This explain IT verifies that externally registered UDF can be properly pushed down
1479+
assertYamlEqualsIgnoreId(
1480+
loadExpectedPlan("udf_geoip_in_agg_pushed.yaml"),
1481+
explainQueryYaml(
1482+
String.format(
1483+
"source=%s | eval info = geoip('my-datasource', host) | stats count() by info.city",
1484+
TEST_INDEX_WEBLOGS)));
1485+
}
14741486
}

integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteGeoIpFunctionsIT.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,4 +53,35 @@ public void testGeoIpEnrichmentWithIpFieldAsInput() throws IOException {
5353
rows("10.0.0.1", Map.of("country", "USA")),
5454
rows("fd12:2345:6789:1:a1b2:c3d4:e5f6:789a", Map.of("country", "India")));
5555
}
56+
57+
@Test
58+
public void testGeoIpInAggregation() throws IOException {
59+
JSONObject result1 =
60+
executeQuery(
61+
String.format(
62+
"source=%s | where method='POST' | eval info = geoip('%s', host) | eval"
63+
+ " date=DATE('2020-12-10') | stats count() by info.city, method, span(date,"
64+
+ " 1month) as month",
65+
TEST_INDEX_WEBLOGS, DATASOURCE_NAME));
66+
verifySchema(
67+
result1,
68+
schema("count()", "bigint"),
69+
schema("month", "date"),
70+
schema("info.city", "string"),
71+
schema("method", "string"));
72+
verifyDataRows(
73+
result1,
74+
rows(1, "2020-12-01", "Seattle", "POST"),
75+
rows(1, "2020-12-01", "Bengaluru", "POST"));
76+
77+
// This case is pushed down into DSL with scripts
78+
JSONObject result2 =
79+
executeQuery(
80+
String.format(
81+
"source=%s | where method='POST' | eval info = geoip('%s', host) | stats count() by"
82+
+ " info.city",
83+
TEST_INDEX_WEBLOGS, DATASOURCE_NAME));
84+
verifySchema(result2, schema("count()", "bigint"), schema("info.city", "string"));
85+
verifyDataRows(result2, rows(1, "Seattle"), rows(1, "Bengaluru"));
86+
}
5687
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
calcite:
2+
logical: |
3+
LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])
4+
LogicalProject(count()=[$1], info.city=[$0])
5+
LogicalAggregate(group=[{0}], count()=[COUNT()])
6+
LogicalProject(info.city=[ITEM(GEOIP('my-datasource':VARCHAR, $0), 'city')])
7+
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_weblogs]])
8+
physical: |
9+
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_weblogs]], PushDownContext=[[AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0},count()=COUNT()), PROJECT->[count(), info.city], LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","aggregations":{"composite_buckets":{"composite":{"size":1000,"sources":[{"info.city":{"terms":{"script":{"source":"{\"langType\":\"calcite\",\"script\":\"rO0ABXNyABFqYXZhLnV0aWwuQ29sbFNlcleOq7Y6G6gRAwABSQADdGFneHAAAAADdwQAAAAGdAAHcm93VHlwZXQAknsKICAiZmllbGRzIjogWwogICAgewogICAgICAidWR0IjogIkVYUFJfSVAiLAogICAgICAidHlwZSI6ICJPVEhFUiIsCiAgICAgICJudWxsYWJsZSI6IHRydWUsCiAgICAgICJuYW1lIjogImhvc3QiCiAgICB9CiAgXSwKICAibnVsbGFibGUiOiBmYWxzZQp9dAAEZXhwcnQETnsKICAib3AiOiB7CiAgICAibmFtZSI6ICJJVEVNIiwKICAgICJraW5kIjogIklURU0iLAogICAgInN5bnRheCI6ICJTUEVDSUFMIgogIH0sCiAgIm9wZXJhbmRzIjogWwogICAgewogICAgICAib3AiOiB7CiAgICAgICAgIm5hbWUiOiAiR0VPSVAiLAogICAgICAgICJraW5kIjogIk9USEVSX0ZVTkNUSU9OIiwKICAgICAgICAic3ludGF4IjogIkZVTkNUSU9OIgogICAgICB9LAogICAgICAib3BlcmFuZHMiOiBbCiAgICAgICAgewogICAgICAgICAgImxpdGVyYWwiOiAibXktZGF0YXNvdXJjZSIsCiAgICAgICAgICAidHlwZSI6IHsKICAgICAgICAgICAgInR5cGUiOiAiVkFSQ0hBUiIsCiAgICAgICAgICAgICJudWxsYWJsZSI6IGZhbHNlLAogICAgICAgICAgICAicHJlY2lzaW9uIjogLTEKICAgICAgICAgIH0KICAgICAgICB9LAogICAgICAgIHsKICAgICAgICAgICJpbnB1dCI6IDAsCiAgICAgICAgICAibmFtZSI6ICIkMCIKICAgICAgICB9CiAgICAgIF0sCiAgICAgICJjbGFzcyI6ICJvcmcub3BlbnNlYXJjaC5zcWwuZXhwcmVzc2lvbi5mdW5jdGlvbi5Vc2VyRGVmaW5lZEZ1bmN0aW9uQnVpbGRlciQxIiwKICAgICAgInR5cGUiOiB7CiAgICAgICAgInR5cGUiOiAiTUFQIiwKICAgICAgICAibnVsbGFibGUiOiBmYWxzZSwKICAgICAgICAia2V5IjogewogICAgICAgICAgInR5cGUiOiAiVkFSQ0hBUiIsCiAgICAgICAgICAibnVsbGFibGUiOiBmYWxzZSwKICAgICAgICAgICJwcmVjaXNpb24iOiAtMQogICAgICAgIH0sCiAgICAgICAgInZhbHVlIjogewogICAgICAgICAgInR5cGUiOiAiQU5ZIiwKICAgICAgICAgICJudWxsYWJsZSI6IGZhbHNlLAogICAgICAgICAgInByZWNpc2lvbiI6IC0xLAogICAgICAgICAgInNjYWxlIjogLTIxNDc0ODM2NDgKICAgICAgICB9CiAgICAgIH0sCiAgICAgICJkZXRlcm1pbmlzdGljIjogdHJ1ZSwKICAgICAgImR5bmFtaWMiOiBmYWxzZQogICAgfSwKICAgIHsKICAgICAgImxpdGVyYWwiOiAiY2l0eSIsCiAgICAgICJ0eXBlIjogewogICAgICAgICJ0eXBlIjogIkNIQVIiLAogICAgICAgICJudWxsYWJsZSI6IGZhbHNlLAogICAgICAgICJwcmVjaXNpb24iOiA0CiAgICAgIH0KICAgIH0KICBdCn10AApmaWVsZFR5cGVzc3IAEWphdmEudXRpbC5IYXNoTWFwBQfawcMWYNEDAAJGAApsb2FkRmFjdG9ySQAJdGhyZXNob2xkeHA/QAAAAAAADHcIAAAAEAAAAAF0AARob3N0fnIAKW9yZy5vcGVuc2VhcmNoLnNxbC5kYXRhLnR5cGUuRXhwckNvcmVUeXBlAAAAAAAAAAASAAB4cgAOamF2YS5sYW5nLkVudW0AAAAAAAAAABIAAHhwdAACSVB4eA==\"}","lang":"opensearch_compounded_script","params":{"utcTimestamp": 0}},"missing_bucket":true,"missing_order":"first","order":"asc"}}}]}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
calcite:
2+
logical: |
3+
LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])
4+
LogicalProject(count()=[$1], info.city=[$0])
5+
LogicalAggregate(group=[{0}], count()=[COUNT()])
6+
LogicalProject(info.city=[ITEM(GEOIP('my-datasource':VARCHAR, $0), 'city')])
7+
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_weblogs]])
8+
physical: |
9+
EnumerableLimit(fetch=[10000])
10+
EnumerableCalc(expr#0..1=[{inputs}], count()=[$t1], info.city=[$t0])
11+
EnumerableAggregate(group=[{0}], count()=[COUNT()])
12+
EnumerableCalc(expr#0..11=[{inputs}], expr#12=['my-datasource':VARCHAR], expr#13=[GEOIP($t12, $t0)], expr#14=['city'], expr#15=[ITEM($t13, $t14)], info.city=[$t15])
13+
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_weblogs]])

opensearch/src/main/java/org/opensearch/sql/opensearch/data/value/OpenSearchExprValueFactory.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,12 @@ private ExprValue parse(
190190

191191
// Field type may be not defined in mapping if users have disabled dynamic mapping.
192192
// Then try to parse content directly based on the value itself
193-
if (fieldType.isEmpty()) {
193+
// Besides, sub-fields of generated objects are also of type UNDEFINED. We parse the content
194+
// directly on the value itself for this case as well.
195+
// TODO: Remove the second condition once https://github.com/opensearch-project/sql/issues/3751
196+
// is resolved
197+
if (fieldType.isEmpty()
198+
|| fieldType.get().equals(OpenSearchDataType.of(ExprCoreType.UNDEFINED))) {
194199
return parseContent(content);
195200
}
196201

opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngine.java

Lines changed: 37 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,7 @@
55

66
package org.opensearch.sql.opensearch.executor;
77

8-
import static org.opensearch.sql.expression.function.BuiltinFunctionName.DISTINCT_COUNT_APPROX;
9-
8+
import com.google.common.base.Suppliers;
109
import java.security.AccessController;
1110
import java.security.PrivilegedAction;
1211
import java.sql.PreparedStatement;
@@ -17,16 +16,21 @@
1716
import java.util.LinkedHashMap;
1817
import java.util.List;
1918
import java.util.Map;
19+
import java.util.concurrent.ConcurrentHashMap;
2020
import java.util.concurrent.atomic.AtomicReference;
21+
import java.util.function.Supplier;
2122
import org.apache.calcite.plan.RelOptUtil;
2223
import org.apache.calcite.rel.RelNode;
2324
import org.apache.calcite.rel.RelRoot;
2425
import org.apache.calcite.rel.type.RelDataType;
2526
import org.apache.calcite.rel.type.RelDataTypeField;
2627
import org.apache.calcite.runtime.Hook;
2728
import org.apache.calcite.sql.SqlExplainLevel;
29+
import org.apache.calcite.sql.SqlOperator;
30+
import org.apache.calcite.sql.SqlOperatorTable;
2831
import org.apache.calcite.sql.type.ReturnTypes;
2932
import org.apache.calcite.sql.type.SqlTypeName;
33+
import org.apache.calcite.sql.util.ListSqlOperatorTable;
3034
import org.apache.calcite.sql.validate.SqlUserDefinedAggFunction;
3135
import org.apache.calcite.sql.validate.SqlUserDefinedFunction;
3236
import org.apache.logging.log4j.LogManager;
@@ -273,8 +277,9 @@ private void buildResultSet(
273277
private void registerOpenSearchFunctions() {
274278
if (client instanceof OpenSearchNodeClient) {
275279
SqlUserDefinedFunction geoIpFunction =
276-
new GeoIpFunction(client.getNodeClient()).toUDF("GEOIP");
280+
new GeoIpFunction(client.getNodeClient()).toUDF(BuiltinFunctionName.GEOIP.name());
277281
PPLFuncImpTable.INSTANCE.registerExternalOperator(BuiltinFunctionName.GEOIP, geoIpFunction);
282+
OperatorTable.addOperator(BuiltinFunctionName.GEOIP.name(), geoIpFunction);
278283
} else {
279284
logger.info(
280285
"Function [GEOIP] not registered: incompatible client type {}",
@@ -284,10 +289,37 @@ private void registerOpenSearchFunctions() {
284289
SqlUserDefinedAggFunction approxDistinctCountFunction =
285290
UserDefinedFunctionUtils.createUserDefinedAggFunction(
286291
DistinctCountApproxAggFunction.class,
287-
DISTINCT_COUNT_APPROX.toString(),
292+
BuiltinFunctionName.DISTINCT_COUNT_APPROX.name(),
288293
ReturnTypes.BIGINT_FORCE_NULLABLE,
289294
null);
290295
PPLFuncImpTable.INSTANCE.registerExternalAggOperator(
291-
DISTINCT_COUNT_APPROX, approxDistinctCountFunction);
296+
BuiltinFunctionName.DISTINCT_COUNT_APPROX, approxDistinctCountFunction);
297+
OperatorTable.addOperator(
298+
BuiltinFunctionName.DISTINCT_COUNT_APPROX.name(), approxDistinctCountFunction);
299+
}
300+
301+
/**
302+
* Dynamic SqlOperatorTable that allows adding operators after initialization. Similar to
303+
* PPLBuiltinOperator.instance() or SqlStdOperatorTable.instance().
304+
*/
305+
public static class OperatorTable extends ListSqlOperatorTable {
306+
private static final Supplier<OperatorTable> INSTANCE =
307+
Suppliers.memoize(() -> (OperatorTable) new OperatorTable().init());
308+
// Use map instead of list to avoid duplicated elements if the class is initialized multiple
309+
// times
310+
private static final Map<String, SqlOperator> operators = new ConcurrentHashMap<>();
311+
312+
public static SqlOperatorTable instance() {
313+
return INSTANCE.get();
314+
}
315+
316+
private ListSqlOperatorTable init() {
317+
setOperators(buildIndex(operators.values()));
318+
return this;
319+
}
320+
321+
public static synchronized void addOperator(String name, SqlOperator operator) {
322+
operators.put(name, operator);
323+
}
292324
}
293325
}

opensearch/src/main/java/org/opensearch/sql/opensearch/storage/serde/RelJsonSerializer.java

Lines changed: 27 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import java.io.ByteArrayOutputStream;
1313
import java.io.ObjectInputStream;
1414
import java.io.ObjectOutputStream;
15+
import java.io.Serializable;
1516
import java.util.Base64;
1617
import java.util.HashMap;
1718
import java.util.LinkedHashMap;
@@ -31,6 +32,7 @@
3132
import org.opensearch.sql.calcite.CalcitePlanContext;
3233
import org.opensearch.sql.data.type.ExprType;
3334
import org.opensearch.sql.expression.function.PPLBuiltinOperators;
35+
import org.opensearch.sql.opensearch.executor.OpenSearchExecutionEngine;
3436
import org.opensearch.sql.opensearch.util.OpenSearchRelOptUtil;
3537

3638
/**
@@ -39,7 +41,7 @@
3941
* <p>This serializer:
4042
* <li>Uses Calcite's RelJson class to convert RexNode and RelDataType to/from JSON string
4143
* <li>Manages required OpenSearch field mapping information Note: OpenSearch ExprType subclasses
42-
* implement {@link java.io.Serializable} and are handled through standard Java serialization.
44+
* implement {@link Serializable} and are handled through standard Java serialization.
4345
*/
4446
@Getter
4547
public class RelJsonSerializer {
@@ -52,13 +54,7 @@ public class RelJsonSerializer {
5254
private static final ObjectMapper mapper = new ObjectMapper();
5355
private static final TypeReference<LinkedHashMap<String, Object>> TYPE_REF =
5456
new TypeReference<>() {};
55-
private static final SqlOperatorTable pplSqlOperatorTable =
56-
SqlOperatorTables.chain(
57-
PPLBuiltinOperators.instance(),
58-
SqlStdOperatorTable.instance(),
59-
// Add a list of necessary SqlLibrary if needed
60-
SqlLibraryOperatorTableFactory.INSTANCE.getOperatorTable(
61-
SqlLibrary.MYSQL, SqlLibrary.BIG_QUERY, SqlLibrary.SPARK, SqlLibrary.POSTGRESQL));
57+
private static volatile SqlOperatorTable pplSqlOperatorTable;
6258

6359
static {
6460
mapper.configure(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS, true);
@@ -68,6 +64,27 @@ public RelJsonSerializer(RelOptCluster cluster) {
6864
this.cluster = cluster;
6965
}
7066

67+
private static SqlOperatorTable getPplSqlOperatorTable() {
68+
if (pplSqlOperatorTable == null) {
69+
synchronized (RelJsonSerializer.class) {
70+
if (pplSqlOperatorTable == null) {
71+
pplSqlOperatorTable =
72+
SqlOperatorTables.chain(
73+
PPLBuiltinOperators.instance(),
74+
SqlStdOperatorTable.instance(),
75+
OpenSearchExecutionEngine.OperatorTable.instance(),
76+
// Add a list of necessary SqlLibrary if needed
77+
SqlLibraryOperatorTableFactory.INSTANCE.getOperatorTable(
78+
SqlLibrary.MYSQL,
79+
SqlLibrary.BIG_QUERY,
80+
SqlLibrary.SPARK,
81+
SqlLibrary.POSTGRESQL));
82+
}
83+
}
84+
}
85+
return pplSqlOperatorTable;
86+
}
87+
7188
/**
7289
* Serializes Calcite expressions and field types into a map object string.
7390
*
@@ -136,7 +153,8 @@ public Map<String, Object> deserialize(String struct) {
136153
Map<String, Object> rowTypeMap = mapper.readValue((String) objectMap.get(ROW_TYPE), TYPE_REF);
137154
RelDataType rowType = relJson.toType(cluster.getTypeFactory(), rowTypeMap);
138155
OpenSearchRelInputTranslator inputTranslator = new OpenSearchRelInputTranslator(rowType);
139-
relJson = relJson.withInputTranslator(inputTranslator).withOperatorTable(pplSqlOperatorTable);
156+
relJson =
157+
relJson.withInputTranslator(inputTranslator).withOperatorTable(getPplSqlOperatorTable());
140158
Map<String, Object> exprMap = mapper.readValue((String) objectMap.get(EXPR), TYPE_REF);
141159
RexNode rexNode = relJson.toRex(cluster, exprMap);
142160

0 commit comments

Comments
 (0)