diff --git a/core/src/main/java/org/opensearch/sql/expression/DSL.java b/core/src/main/java/org/opensearch/sql/expression/DSL.java index dc819c8163d..2e3bdf7e223 100644 --- a/core/src/main/java/org/opensearch/sql/expression/DSL.java +++ b/core/src/main/java/org/opensearch/sql/expression/DSL.java @@ -973,6 +973,10 @@ public static FunctionExpression utc_timestamp( return compile(functionProperties, BuiltinFunctionName.UTC_TIMESTAMP, args); } + public static FunctionExpression geoip(Expression... args) { + return compile(FunctionProperties.None, BuiltinFunctionName.GEOIP, args); + } + @SuppressWarnings("unchecked") private static T compile( FunctionProperties functionProperties, BuiltinFunctionName bfn, Expression... args) { diff --git a/core/src/main/java/org/opensearch/sql/expression/function/BuiltinFunctionName.java b/core/src/main/java/org/opensearch/sql/expression/function/BuiltinFunctionName.java index 43fdbf2eb75..3cc9569a1ef 100644 --- a/core/src/main/java/org/opensearch/sql/expression/function/BuiltinFunctionName.java +++ b/core/src/main/java/org/opensearch/sql/expression/function/BuiltinFunctionName.java @@ -207,6 +207,9 @@ public enum BuiltinFunctionName { /** Json Functions. */ JSON_VALID(FunctionName.of("json_valid")), + /** GEOSPATIAL Functions. */ + GEOIP(FunctionName.of("geoip")), + /** NULL Test. */ IS_NULL(FunctionName.of("is null")), IS_NOT_NULL(FunctionName.of("is not null")), diff --git a/core/src/main/java/org/opensearch/sql/expression/function/OpenSearchFunctions.java b/core/src/main/java/org/opensearch/sql/expression/function/OpenSearchFunctions.java index 8d8928c16ae..e2e56f0041a 100644 --- a/core/src/main/java/org/opensearch/sql/expression/function/OpenSearchFunctions.java +++ b/core/src/main/java/org/opensearch/sql/expression/function/OpenSearchFunctions.java @@ -173,4 +173,54 @@ public String toString() { return String.format("%s(%s)", functionName, String.join(", ", args)); } } + + /** + * Static class to identify functional Expression which specifically designed for OpenSearch + * storage runtime + */ + public static class OpenSearchExecutableFunction extends FunctionExpression { + private final FunctionName functionName; + private final List arguments; + private final ExprType returnType; + + public OpenSearchExecutableFunction( + FunctionName functionName, List arguments, ExprType returnType) { + super(functionName, arguments); + this.functionName = functionName; + this.arguments = arguments; + this.returnType = returnType; + } + + @Override + public ExprValue valueOf(Environment valueEnv) { + throw new UnsupportedOperationException( + String.format( + "OpenSearch defined function [%s] is only supported in Eval operation.", + functionName)); + } + + @Override + public ExprType type() { + return returnType; + } + + /** + * Util method to generate probe implementation with given list of argument types, with marker + * class `OpenSearchFunction` to annotate this is an OpenSearch specific expression. + * + * @param returnType return type. + * @return Binary Function Implementation. + */ + public static SerializableFunction> + openSearchImpl(ExprType returnType, List args) { + return functionName -> { + FunctionSignature functionSignature = new FunctionSignature(functionName, args); + FunctionBuilder functionBuilder = + (functionProperties, arguments) -> + new OpenSearchFunctions.OpenSearchExecutableFunction( + functionName, arguments, returnType); + return Pair.of(functionSignature, functionBuilder); + }; + } + } } diff --git a/core/src/main/java/org/opensearch/sql/expression/ip/IPFunctions.java b/core/src/main/java/org/opensearch/sql/expression/ip/IPFunctions.java index 8b3ee230147..11442b67dd9 100644 --- a/core/src/main/java/org/opensearch/sql/expression/ip/IPFunctions.java +++ b/core/src/main/java/org/opensearch/sql/expression/ip/IPFunctions.java @@ -11,8 +11,10 @@ import static org.opensearch.sql.expression.function.FunctionDSL.define; import static org.opensearch.sql.expression.function.FunctionDSL.impl; import static org.opensearch.sql.expression.function.FunctionDSL.nullMissingHandling; +import static org.opensearch.sql.expression.function.OpenSearchFunctions.OpenSearchExecutableFunction.openSearchImpl; import inet.ipaddr.IPAddress; +import java.util.Arrays; import lombok.experimental.UtilityClass; import org.opensearch.sql.data.model.ExprValue; import org.opensearch.sql.data.model.ExprValueUtils; @@ -28,6 +30,7 @@ public class IPFunctions { public void register(BuiltinFunctionRepository repository) { repository.register(cidrmatch()); + repository.register(geoIp()); } private DefaultFunctionResolver cidrmatch() { @@ -57,4 +60,16 @@ private ExprValue exprCidrMatch(ExprValue addressExprValue, ExprValue rangeExprV ? ExprValueUtils.LITERAL_FALSE : ExprValueUtils.LITERAL_TRUE; } + + /** + * To register all method signatures related to geoip( ) expression under eval. + * + * @return Resolver for geoip( ) expression. + */ + private DefaultFunctionResolver geoIp() { + return define( + BuiltinFunctionName.GEOIP.getName(), + openSearchImpl(BOOLEAN, Arrays.asList(STRING, STRING)), + openSearchImpl(BOOLEAN, Arrays.asList(STRING, STRING, STRING))); + } } diff --git a/core/src/main/java/org/opensearch/sql/planner/physical/EvalOperator.java b/core/src/main/java/org/opensearch/sql/planner/physical/EvalOperator.java index ac62fe1b867..4bcb07e2982 100644 --- a/core/src/main/java/org/opensearch/sql/planner/physical/EvalOperator.java +++ b/core/src/main/java/org/opensearch/sql/planner/physical/EvalOperator.java @@ -87,7 +87,7 @@ public ExprValue next() { * @param env {@link Environment} * @return The mapping of reference and {@link ExprValue} for each expression. */ - private Map eval(Environment env) { + protected Map eval(Environment env) { Map evalResultMap = new LinkedHashMap<>(); for (Pair pair : expressionList) { ReferenceExpression var = pair.getKey(); diff --git a/core/src/test/java/org/opensearch/sql/expression/function/OpenSearchFunctionsTest.java b/core/src/test/java/org/opensearch/sql/expression/function/OpenSearchFunctionsTest.java index 168b73acc43..eaaa243e16a 100644 --- a/core/src/test/java/org/opensearch/sql/expression/function/OpenSearchFunctionsTest.java +++ b/core/src/test/java/org/opensearch/sql/expression/function/OpenSearchFunctionsTest.java @@ -22,6 +22,7 @@ import org.opensearch.sql.expression.ExpressionTestBase; import org.opensearch.sql.expression.FunctionExpression; import org.opensearch.sql.expression.NamedArgumentExpression; +import org.opensearch.sql.expression.ReferenceExpression; import org.opensearch.sql.expression.env.Environment; public class OpenSearchFunctionsTest extends ExpressionTestBase { @@ -309,4 +310,17 @@ void nested_query() { assertEquals(expr.valueOf(nestedTuple), ExprValueUtils.stringValue("result")); assertEquals(expr.type(), STRING); } + + @Test + void opensearchExecutableFunction_valueOf() { + var ipInStr = + new OpenSearchFunctions.OpenSearchExecutableFunction( + BuiltinFunctionName.GEOIP.getName(), + List.of(DSL.literal("my-datasource"), new ReferenceExpression("ipInStr", STRING)), + BOOLEAN); + assertThrows( + UnsupportedOperationException.class, + () -> ipInStr.valueOf(valueEnv()), + "OpenSearch defined function [geoip] is only supported in Eval operation."); + } } diff --git a/core/src/test/java/org/opensearch/sql/expression/ip/GeoIPFunctionTest.java b/core/src/test/java/org/opensearch/sql/expression/ip/GeoIPFunctionTest.java new file mode 100644 index 00000000000..6d61ee142d3 --- /dev/null +++ b/core/src/test/java/org/opensearch/sql/expression/ip/GeoIPFunctionTest.java @@ -0,0 +1,49 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.expression.ip; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.opensearch.sql.data.type.ExprCoreType.BOOLEAN; +import static org.opensearch.sql.data.type.ExprCoreType.STRING; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.sql.data.model.ExprValue; +import org.opensearch.sql.expression.DSL; +import org.opensearch.sql.expression.Expression; +import org.opensearch.sql.expression.env.Environment; + +@ExtendWith(MockitoExtension.class) +public class GeoIPFunctionTest { + + // Mock value environment for testing. + @Mock private Environment env; + + @Test + public void testGeoipFunctionSignature() { + var geoip = DSL.geoip(DSL.literal("HARDCODED_DATASOURCE_NAME"), DSL.ref("ip_address", STRING)); + assertEquals(BOOLEAN, geoip.type()); + } + + /** To make sure no logic being evaluated when no environment being passed. */ + @Test + public void testDefaultValueOf() { + UnsupportedOperationException exception = + assertThrows( + UnsupportedOperationException.class, + () -> + DSL.geoip(DSL.literal("HARDCODED_DATASOURCE_NAME"), DSL.ref("ip_address", STRING)) + .valueOf(env)); + assertTrue( + exception + .getMessage() + .contains("OpenSearch defined function [geoip] is only supported in Eval operation.")); + } +} diff --git a/docs/user/ppl/functions/ip.rst b/docs/user/ppl/functions/ip.rst index 30cb9020b01..c4f6191ac84 100644 --- a/docs/user/ppl/functions/ip.rst +++ b/docs/user/ppl/functions/ip.rst @@ -36,3 +36,34 @@ Note: - `cidr` can be an IPv4 or IPv6 block - `ip` and `cidr` must both be valid and non-missing/non-null + +GEOIP +--------- + +Description +>>>>>>>>>>> + +Usage: `geoip(dataSourceName, ipAddress[, options])` to lookup location information from given IP addresses via OpenSearch GeoSpatial plugin API. + +Argument type: STRING, STRING, STRING + +Return type: OBJECT + +.. The execution of below example is being excluded, as this requires a standalone Geo-Spatial dataSource setup, which is not yet supported by docTest. + +Example: + + > source=weblogs | eval LookupResult = geoip("dataSourceName", "50.68.18.229", "country_iso_code,city_name") + fetched rows / total rows = 1/1 + +-------------------------------------------------------------+ + | LookupResult | + |-------------------------------------------------------------| + | {'city_name': 'Vancouver', 'country_iso_code': 'CA'} | + +-------------------------------------------------------------+ + + +Note: + - `dataSourceName` must be an established dataSource on OpenSearch GeoSpatial plugin, detail of configuration can be found: https://opensearch.org/docs/latest/ingest-pipelines/processors/ip2geo/ + - `ip` can be an IPv4 or an IPv6 address + - `options` is an optional String of comma separated fields to output: the selection of fields is subject to dataSourceProvider's schema. For example, the list of fields in the provided `geolite2-city` dataset includes: "country_iso_code", "country_name", "continent_name", "region_iso_code", "region_name", "city_name", "time_zone", "location" + diff --git a/integ-test/src/test/java/org/opensearch/sql/legacy/SQLIntegTestCase.java b/integ-test/src/test/java/org/opensearch/sql/legacy/SQLIntegTestCase.java index 02e05854683..328e76013f4 100644 --- a/integ-test/src/test/java/org/opensearch/sql/legacy/SQLIntegTestCase.java +++ b/integ-test/src/test/java/org/opensearch/sql/legacy/SQLIntegTestCase.java @@ -21,6 +21,7 @@ import static org.opensearch.sql.legacy.TestUtils.getDogs3IndexMapping; import static org.opensearch.sql.legacy.TestUtils.getEmployeeNestedTypeIndexMapping; import static org.opensearch.sql.legacy.TestUtils.getGameOfThronesIndexMapping; +import static org.opensearch.sql.legacy.TestUtils.getGeoIpIndexMapping; import static org.opensearch.sql.legacy.TestUtils.getGeopointIndexMapping; import static org.opensearch.sql.legacy.TestUtils.getJoinTypeIndexMapping; import static org.opensearch.sql.legacy.TestUtils.getJsonTestIndexMapping; @@ -627,6 +628,11 @@ public enum Index { "unexpandedObject", getUnexpandedObjectIndexMapping(), "src/test/resources/unexpanded_objects.json"), + GEOIP( + TestsConstants.TEST_INDEX_GEOIP, + "geoip", + getGeoIpIndexMapping(), + "src/test/resources/geoip.json"), BANK( TestsConstants.TEST_INDEX_BANK, "account", diff --git a/integ-test/src/test/java/org/opensearch/sql/legacy/TestUtils.java b/integ-test/src/test/java/org/opensearch/sql/legacy/TestUtils.java index b1b09a0997f..27963e3787f 100644 --- a/integ-test/src/test/java/org/opensearch/sql/legacy/TestUtils.java +++ b/integ-test/src/test/java/org/opensearch/sql/legacy/TestUtils.java @@ -195,6 +195,11 @@ public static String getBankIndexMapping() { return getMappingFile(mappingFile); } + public static String getGeoIpIndexMapping() { + String mappingFile = "geoip_index_mapping.json"; + return getMappingFile(mappingFile); + } + public static String getBankWithNullValuesIndexMapping() { String mappingFile = "bank_with_null_values_index_mapping.json"; return getMappingFile(mappingFile); diff --git a/integ-test/src/test/java/org/opensearch/sql/legacy/TestsConstants.java b/integ-test/src/test/java/org/opensearch/sql/legacy/TestsConstants.java index 292acbc50b0..4601aadf7f4 100644 --- a/integ-test/src/test/java/org/opensearch/sql/legacy/TestsConstants.java +++ b/integ-test/src/test/java/org/opensearch/sql/legacy/TestsConstants.java @@ -60,6 +60,7 @@ public class TestsConstants { public static final String TEST_INDEX_GEOPOINT = TEST_INDEX + "_geopoint"; public static final String TEST_INDEX_JSON_TEST = TEST_INDEX + "_json_test"; public static final String TEST_INDEX_ALIAS = TEST_INDEX + "_alias"; + public static final String TEST_INDEX_GEOIP = TEST_INDEX + "_geoip"; public static final String DATASOURCES = ".ql-datasources"; public static final String DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"; diff --git a/integ-test/src/test/java/org/opensearch/sql/ppl/GeoIpFunctionsIT.java b/integ-test/src/test/java/org/opensearch/sql/ppl/GeoIpFunctionsIT.java index 3eb17387adf..c169a18b265 100644 --- a/integ-test/src/test/java/org/opensearch/sql/ppl/GeoIpFunctionsIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/ppl/GeoIpFunctionsIT.java @@ -8,28 +8,57 @@ package org.opensearch.sql.ppl; import static org.opensearch.sql.legacy.TestUtils.getResponseBody; +import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_GEOIP; +import static org.opensearch.sql.util.MatcherUtils.columnName; +import static org.opensearch.sql.util.MatcherUtils.rows; +import static org.opensearch.sql.util.MatcherUtils.verifyColumn; +import static org.opensearch.sql.util.MatcherUtils.verifyDataRows; import java.io.IOException; +import java.time.Duration; +import java.time.Instant; +import java.util.List; +import java.util.Locale; +import java.util.Map; import lombok.SneakyThrows; +import org.apache.hc.core5.http.io.entity.EntityUtils; +import org.json.JSONObject; import org.junit.Assert; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.opensearch.client.Request; import org.opensearch.client.RequestOptions; import org.opensearch.client.Response; +import org.opensearch.common.xcontent.XContentFactory; +import org.opensearch.common.xcontent.XContentType; +import org.opensearch.core.xcontent.XContentBuilder; /** IP enrichment PPL request with OpenSearch Geo-sptial plugin */ public class GeoIpFunctionsIT extends PPLIntegTestCase { private static boolean initialized = false; + private static Map MANIFEST_LOCATION = + Map.of( + "endpoint", + "https://raw.githubusercontent.com/opensearch-project/geospatial/main/src/test/resources/ip2geo/server/city/manifest.json"); + + private static String DATASOURCE_NAME = "dummycityindex"; + private static String PLUGIN_NAME = "opensearch-geospatial"; + private static String GEO_SPATIAL_DATASOURCE_PATH = "/_plugins/geospatial/ip2geo/datasource/"; + + private static int CREATE_DATASOURCE_TIMEOUT = 10; + @SneakyThrows - @BeforeEach - public void initialize() { + @Override + public void init() throws IOException { + loadIndex(Index.GEOIP); if (!initialized) { - setUpIndices(); + // Create a new dataSource + createDatasource(); + waitForDatasourceToBeAvailable( + DATASOURCE_NAME, Duration.ofSeconds(CREATE_DATASOURCE_TIMEOUT)); initialized = true; } } @@ -45,4 +74,113 @@ public void testGeoPluginInstallation() throws IOException { Assert.assertEquals(200, response.getStatusLine().getStatusCode()); Assert.assertTrue(getResponseBody(response, true).contains(PLUGIN_NAME)); } + + @SneakyThrows + @Test + public void testGeoIpEnrichment() { + + JSONObject resultGeoIp = + executeQuery( + String.format( + "search source=%s | eval enrichmentResult = geoip(\\\"%s\\\",%s)", + TEST_INDEX_GEOIP, "dummycityindex", "ip")); + + verifyColumn(resultGeoIp, columnName("name"), columnName("ip"), columnName("enrichmentResult")); + verifyDataRows( + resultGeoIp, + rows("Test user - USA", "10.1.1.1", Map.of("country", "USA", "city", "Seattle")), + rows("Test user - Canada", "127.1.1.1", Map.of("country", "Canada", "city", "Vancouver"))); + } + + @SneakyThrows + @Test + public void testGeoIpEnrichmentWithSingleOption() { + + JSONObject resultGeoIp = + executeQuery( + String.format( + "search source=%s | eval enrichmentResult = geoip(\\\"%s\\\",%s,\\\"%s\\\")", + TEST_INDEX_GEOIP, "dummycityindex", "ip", "city")); + + verifyColumn(resultGeoIp, columnName("name"), columnName("ip"), columnName("enrichmentResult")); + verifyDataRows( + resultGeoIp, + rows("Test user - USA", "10.1.1.1", Map.of("city", "Seattle")), + rows("Test user - Canada", "127.1.1.1", Map.of("city", "Vancouver"))); + } + + @SneakyThrows + @Test + public void testGeoIpEnrichmentWithSpaceSeparatedMultipleOptions() { + + JSONObject resultGeoIp = + executeQuery( + String.format( + "search source=%s | eval enrichmentResult = geoip(\\\"%s\\\",%s,\\\"%s\\\")", + TEST_INDEX_GEOIP, "dummycityindex", "ip", "city , country")); + + verifyColumn(resultGeoIp, columnName("name"), columnName("ip"), columnName("enrichmentResult")); + verifyDataRows( + resultGeoIp, + rows("Test user - USA", "10.1.1.1", Map.of("country", "USA", "city", "Seattle")), + rows("Test user - Canada", "127.1.1.1", Map.of("country", "Canada", "city", "Vancouver"))); + } + + /** + * Helper method to send a PUT request to create a dummy dataSource with provided endpoint for + * integration test. + * + * @return Response for the create dataSource request. + * @throws IOException In case of network failure + */ + private Response createDatasource() throws IOException { + XContentBuilder builder = XContentFactory.jsonBuilder().startObject(); + for (Map.Entry config : MANIFEST_LOCATION.entrySet()) { + builder.field(config.getKey(), config.getValue()); + } + builder.endObject(); + Request request = new Request("PUT", GEO_SPATIAL_DATASOURCE_PATH + DATASOURCE_NAME); + request.setJsonEntity(builder.toString()); + return client().performRequest(request); + } + + /** + * Helper method check the status of dataSource creation within the specific timeframe. + * + * @param name The name of the dataSource to assert + * @param timeout The timeout value in seconds + * @throws Exception Exception + */ + private void waitForDatasourceToBeAvailable(final String name, final Duration timeout) + throws Exception { + Instant start = Instant.now(); + while (!"AVAILABLE".equals(getDatasourceState(name))) { + if (Duration.between(start, Instant.now()).compareTo(timeout) > 0) { + throw new RuntimeException( + String.format( + Locale.ROOT, + "Datasource state didn't change to %s after %d seconds", + "AVAILABLE", + timeout.toSeconds())); + } + Thread.sleep(1000); + } + } + + /** + * Helper method to fetch the DataSource creation status via REST client. + * + * @param name dataSource name + * @return Status in String + * @throws Exception IO. + */ + private String getDatasourceState(final String name) throws Exception { + Request request = new Request("GET", GEO_SPATIAL_DATASOURCE_PATH + name); + Response response = client().performRequest(request); + var responseInMap = + createParser(XContentType.JSON.xContent(), EntityUtils.toString(response.getEntity())) + .map(); + var datasources = (List>) responseInMap.get("datasources"); + return (String) datasources.get(0).get("state"); + } } diff --git a/integ-test/src/test/resources/expectedOutput/ppl/explain_fillnull_push.json b/integ-test/src/test/resources/expectedOutput/ppl/explain_fillnull_push.json index 7e5e1c1c200..ac7697ebf1d 100644 --- a/integ-test/src/test/resources/expectedOutput/ppl/explain_fillnull_push.json +++ b/integ-test/src/test/resources/expectedOutput/ppl/explain_fillnull_push.json @@ -6,7 +6,7 @@ }, "children": [ { - "name": "EvalOperator", + "name": "OpenSearchEvalOperator", "description": { "expressions": { "balance": "ifnull(balance, -1)", diff --git a/integ-test/src/test/resources/expectedOutput/ppl/explain_limit_push.json b/integ-test/src/test/resources/expectedOutput/ppl/explain_limit_push.json index 0a0b58f17db..6568cc7fa76 100644 --- a/integ-test/src/test/resources/expectedOutput/ppl/explain_limit_push.json +++ b/integ-test/src/test/resources/expectedOutput/ppl/explain_limit_push.json @@ -6,7 +6,7 @@ }, "children": [ { - "name": "EvalOperator", + "name": "OpenSearchEvalOperator", "description": { "expressions": { "ageMinus": "-(age, 30)" diff --git a/integ-test/src/test/resources/expectedOutput/ppl/explain_output.json b/integ-test/src/test/resources/expectedOutput/ppl/explain_output.json index bd7310810e4..04b9eef43f6 100644 --- a/integ-test/src/test/resources/expectedOutput/ppl/explain_output.json +++ b/integ-test/src/test/resources/expectedOutput/ppl/explain_output.json @@ -15,7 +15,7 @@ }, "children": [ { - "name": "EvalOperator", + "name": "OpenSearchEvalOperator", "description": { "expressions": { "age2": "+(avg_age, 2)" diff --git a/integ-test/src/test/resources/geoip.json b/integ-test/src/test/resources/geoip.json new file mode 100644 index 00000000000..83e1cc72d60 --- /dev/null +++ b/integ-test/src/test/resources/geoip.json @@ -0,0 +1,4 @@ +{"index":{"_id":"1"}} +{"name":"Test user - USA","ip":"10.1.1.1"} +{"index":{"_id":"6"}} +{"name":"Test user - Canada","ip": "127.1.1.1"} diff --git a/integ-test/src/test/resources/indexDefinitions/geoip_index_mapping.json b/integ-test/src/test/resources/indexDefinitions/geoip_index_mapping.json new file mode 100644 index 00000000000..aa059567896 --- /dev/null +++ b/integ-test/src/test/resources/indexDefinitions/geoip_index_mapping.json @@ -0,0 +1,12 @@ +{ + "mappings": { + "properties": { + "name": { + "type": "text" + }, + "ip": { + "type": "text" + } + } + } +} diff --git a/opensearch/build.gradle b/opensearch/build.gradle index c47806b6bbf..97d61486417 100644 --- a/opensearch/build.gradle +++ b/opensearch/build.gradle @@ -40,6 +40,7 @@ dependencies { implementation group: 'org.json', name: 'json', version:'20231013' compileOnly group: 'org.opensearch.client', name: 'opensearch-rest-high-level-client', version: "${opensearch_version}" implementation group: 'org.opensearch', name:'opensearch-ml-client', version: "${opensearch_build}" + implementation group: 'org.opensearch', name:'geospatial-client', version: "${opensearch_build}" testImplementation('org.junit.jupiter:junit-jupiter-api:5.9.3') testImplementation('org.junit.jupiter:junit-jupiter-params:5.9.3') diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/protector/OpenSearchExecutionProtector.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/protector/OpenSearchExecutionProtector.java index 358bc10ab4b..235ddc00752 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/protector/OpenSearchExecutionProtector.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/protector/OpenSearchExecutionProtector.java @@ -10,6 +10,7 @@ import org.opensearch.sql.opensearch.planner.physical.ADOperator; import org.opensearch.sql.opensearch.planner.physical.MLCommonsOperator; import org.opensearch.sql.opensearch.planner.physical.MLOperator; +import org.opensearch.sql.opensearch.planner.physical.OpenSearchEvalOperator; import org.opensearch.sql.planner.physical.AggregationOperator; import org.opensearch.sql.planner.physical.CursorCloseOperator; import org.opensearch.sql.planner.physical.DedupeOperator; @@ -96,6 +97,13 @@ public PhysicalPlan visitRemove(RemoveOperator node, Object context) { @Override public PhysicalPlan visitEval(EvalOperator node, Object context) { + if (node instanceof OpenSearchEvalOperator evalOperator) { + return doProtect( + new OpenSearchEvalOperator( + visitInput(evalOperator.getInput(), context), + evalOperator.getExpressionList(), + evalOperator.getNodeClient())); + } return new EvalOperator(visitInput(node.getInput(), context), node.getExpressionList()); } diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/physical/OpenSearchEvalOperator.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/physical/OpenSearchEvalOperator.java new file mode 100644 index 00000000000..c76ba76a793 --- /dev/null +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/physical/OpenSearchEvalOperator.java @@ -0,0 +1,65 @@ +/* + * + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + */ + +package org.opensearch.sql.opensearch.planner.physical; + +import static org.opensearch.sql.expression.env.Environment.extendEnv; + +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import lombok.Getter; +import org.apache.commons.lang3.tuple.Pair; +import org.opensearch.client.node.NodeClient; +import org.opensearch.sql.data.model.ExprValue; +import org.opensearch.sql.expression.Expression; +import org.opensearch.sql.expression.ReferenceExpression; +import org.opensearch.sql.expression.env.Environment; +import org.opensearch.sql.expression.function.OpenSearchFunctions; +import org.opensearch.sql.planner.physical.EvalOperator; +import org.opensearch.sql.planner.physical.PhysicalPlan; + +/** + * OpenSearch version of eval operator, which contains nodeClient, in order to perform OpenSearch + * specific operations during the eval process. + */ +public class OpenSearchEvalOperator extends EvalOperator { + + @Getter private final NodeClient nodeClient; + + public OpenSearchEvalOperator( + PhysicalPlan input, + List> expressionList, + NodeClient nodeClient) { + super(input, expressionList); + this.nodeClient = nodeClient; + } + + /** + * Evaluate the expression in the {@link EvalOperator} with {@link Environment}. + * + * @param env {@link Environment} + * @return The mapping of reference and {@link ExprValue} for each expression. + */ + @Override + protected Map eval(Environment env) { + Map evalResultMap = new LinkedHashMap<>(); + for (Pair pair : this.getExpressionList()) { + ExprValue value; + if (pair.getValue() + instanceof OpenSearchFunctions.OpenSearchExecutableFunction openSearchExpr) { + value = OpenSearchEvalProcessor.process(openSearchExpr, env, nodeClient); + } else { + value = pair.getValue().valueOf(env); + } + ReferenceExpression var = pair.getKey(); + env = extendEnv(env, var, value); + evalResultMap.put(var.toString(), value); + } + return evalResultMap; + } +} diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/physical/OpenSearchEvalProcessor.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/physical/OpenSearchEvalProcessor.java new file mode 100644 index 00000000000..3bd48405b8f --- /dev/null +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/physical/OpenSearchEvalProcessor.java @@ -0,0 +1,76 @@ +/* + * + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + */ + +package org.opensearch.sql.opensearch.planner.physical; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import org.opensearch.client.node.NodeClient; +import org.opensearch.geospatial.action.IpEnrichmentActionClient; +import org.opensearch.sql.common.utils.StringUtils; +import org.opensearch.sql.data.model.ExprStringValue; +import org.opensearch.sql.data.model.ExprTupleValue; +import org.opensearch.sql.data.model.ExprValue; +import org.opensearch.sql.expression.Expression; +import org.opensearch.sql.expression.env.Environment; +import org.opensearch.sql.expression.function.BuiltinFunctionName; +import org.opensearch.sql.expression.function.OpenSearchFunctions; + +/** Class to centralise all OpenSearch specific eval operations. */ +public class OpenSearchEvalProcessor { + + /** + * Static method to read an incoming OpenSearchFunction evaluation instruction, process it + * accordingly with nodeClient and return the result. + * + * @param funcExpression Eval operation which is OpenSearch storage engine specific. + * @param env {@link Environment} + * @param nodeClient NodeClient for OpenSearch cluster RPC. + * @return evaluation result. + */ + public static ExprValue process( + OpenSearchFunctions.OpenSearchExecutableFunction funcExpression, + Environment env, + NodeClient nodeClient) { + + if (BuiltinFunctionName.GEOIP.getName().equals(funcExpression.getFunctionName())) { + return fetchIpEnrichment(funcExpression.getArguments(), env, nodeClient); + } else { + throw new IllegalArgumentException("Unsupported OpenSearch specific expression."); + } + } + + private static ExprValue fetchIpEnrichment( + List arguments, Environment env, NodeClient nodeClient) { + IpEnrichmentActionClient ipClient = new IpEnrichmentActionClient(nodeClient); + String dataSource = StringUtils.unquoteText(arguments.get(0).toString()); + String ipAddress = arguments.get(1).valueOf(env).stringValue(); + final Set options = new HashSet<>(); + if (arguments.size() > 2) { + String option = StringUtils.unquoteText(arguments.get(2).toString()); + // Convert the option into a set. + options.addAll( + Arrays.stream(option.split(",")).map(String::trim).collect(Collectors.toSet())); + } + try { + Map geoLocationData = ipClient.getGeoLocationData(ipAddress, dataSource); + Map enrichmentResult = + geoLocationData.entrySet().stream() + .filter(entry -> options.isEmpty() || options.contains(entry.getKey())) + .collect( + Collectors.toMap( + Map.Entry::getKey, v -> new ExprStringValue(v.getValue().toString()))); + return ExprTupleValue.fromExprValueMap(enrichmentResult); + } catch (Exception e) { + throw new RuntimeException(e); + } + } +} diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndex.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndex.java index b8822cd1e81..f93389c6657 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndex.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndex.java @@ -21,6 +21,7 @@ import org.opensearch.sql.opensearch.planner.physical.ADOperator; import org.opensearch.sql.opensearch.planner.physical.MLCommonsOperator; import org.opensearch.sql.opensearch.planner.physical.MLOperator; +import org.opensearch.sql.opensearch.planner.physical.OpenSearchEvalOperator; import org.opensearch.sql.opensearch.request.OpenSearchRequest; import org.opensearch.sql.opensearch.request.OpenSearchRequestBuilder; import org.opensearch.sql.opensearch.request.system.OpenSearchDescribeIndexRequest; @@ -28,6 +29,7 @@ import org.opensearch.sql.opensearch.storage.scan.OpenSearchIndexScanBuilder; import org.opensearch.sql.planner.DefaultImplementor; import org.opensearch.sql.planner.logical.LogicalAD; +import org.opensearch.sql.planner.logical.LogicalEval; import org.opensearch.sql.planner.logical.LogicalML; import org.opensearch.sql.planner.logical.LogicalMLCommons; import org.opensearch.sql.planner.logical.LogicalPlan; @@ -209,5 +211,11 @@ public PhysicalPlan visitAD(LogicalAD node, OpenSearchIndexScan context) { public PhysicalPlan visitML(LogicalML node, OpenSearchIndexScan context) { return new MLOperator(visitChild(node, context), node.getArguments(), client.getNodeClient()); } + + @Override + public PhysicalPlan visitEval(LogicalEval node, OpenSearchIndexScan context) { + return new OpenSearchEvalOperator( + visitChild(node, context), node.getExpressions(), client.getNodeClient()); + } } } diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/executor/protector/OpenSearchExecutionProtectorTest.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/executor/protector/OpenSearchExecutionProtectorTest.java index 724178bd346..18958c74dbc 100644 --- a/opensearch/src/test/java/org/opensearch/sql/opensearch/executor/protector/OpenSearchExecutionProtectorTest.java +++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/executor/protector/OpenSearchExecutionProtectorTest.java @@ -47,6 +47,7 @@ import org.opensearch.sql.ast.tree.Trendline; import org.opensearch.sql.common.setting.Settings; import org.opensearch.sql.data.model.ExprBooleanValue; +import org.opensearch.sql.data.type.ExprCoreType; import org.opensearch.sql.expression.DSL; import org.opensearch.sql.expression.Expression; import org.opensearch.sql.expression.NamedExpression; @@ -58,10 +59,12 @@ import org.opensearch.sql.expression.window.ranking.RankFunction; import org.opensearch.sql.monitor.ResourceMonitor; import org.opensearch.sql.opensearch.client.OpenSearchClient; +import org.opensearch.sql.opensearch.data.type.OpenSearchTextType; import org.opensearch.sql.opensearch.data.value.OpenSearchExprValueFactory; import org.opensearch.sql.opensearch.planner.physical.ADOperator; import org.opensearch.sql.opensearch.planner.physical.MLCommonsOperator; import org.opensearch.sql.opensearch.planner.physical.MLOperator; +import org.opensearch.sql.opensearch.planner.physical.OpenSearchEvalOperator; import org.opensearch.sql.opensearch.request.OpenSearchRequest; import org.opensearch.sql.opensearch.request.OpenSearchRequestBuilder; import org.opensearch.sql.opensearch.setting.OpenSearchSettings; @@ -339,6 +342,27 @@ public void test_visitTrendline() { executionProtector.visitTrendline(trendlineOperator, null)); } + /** + * To ensure the original Eval functionality continue to work after the OpenSearchEvalOperator + * wrapper. + */ + @Test + void test_visitOpenSearchEval() { + NodeClient nodeClient = mock(NodeClient.class); + OpenSearchEvalOperator evalOperator = + new OpenSearchEvalOperator( + values(emptyList()), + List.of( + ImmutablePair.of( + new ReferenceExpression("ageInAbs", OpenSearchTextType.of()), + DSL.abs(DSL.abs(new ReferenceExpression("age", ExprCoreType.LONG))))), + nodeClient); + + assertEquals( + executionProtector.doProtect(evalOperator), + executionProtector.visitEval(evalOperator, null)); + } + PhysicalPlan resourceMonitor(PhysicalPlan input) { return new ResourceMonitorPlan(input, resourceMonitor); } diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/planner/physical/OpenSearchEvalOperatorTest.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/planner/physical/OpenSearchEvalOperatorTest.java new file mode 100644 index 00000000000..9cd6a98f20d --- /dev/null +++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/planner/physical/OpenSearchEvalOperatorTest.java @@ -0,0 +1,137 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.opensearch.planner.physical; + +import static org.junit.jupiter.api.Assertions.assertSame; +import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.when; +import static org.opensearch.sql.data.type.ExprCoreType.BOOLEAN; + +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import lombok.SneakyThrows; +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.commons.lang3.tuple.Pair; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.runner.RunWith; +import org.mockito.Answers; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; +import org.mockito.junit.jupiter.MockitoExtension; +import org.mockito.junit.jupiter.MockitoSettings; +import org.mockito.quality.Strictness; +import org.opensearch.client.node.NodeClient; +import org.opensearch.common.action.ActionFuture; +import org.opensearch.core.action.ActionResponse; +import org.opensearch.geospatial.action.IpEnrichmentAction; +import org.opensearch.geospatial.action.IpEnrichmentRequest; +import org.opensearch.geospatial.action.IpEnrichmentResponse; +import org.opensearch.sql.data.model.ExprLongValue; +import org.opensearch.sql.data.model.ExprTupleValue; +import org.opensearch.sql.data.model.ExprValue; +import org.opensearch.sql.data.type.ExprCoreType; +import org.opensearch.sql.expression.DSL; +import org.opensearch.sql.expression.Expression; +import org.opensearch.sql.expression.ReferenceExpression; +import org.opensearch.sql.expression.function.BuiltinFunctionName; +import org.opensearch.sql.expression.function.OpenSearchFunctions; +import org.opensearch.sql.opensearch.data.type.OpenSearchTextType; +import org.opensearch.sql.opensearch.data.value.OpenSearchExprTextValue; +import org.opensearch.sql.planner.physical.PhysicalPlan; + +/** To assert the original behaviour of eval operator. */ +@ExtendWith(MockitoExtension.class) +@MockitoSettings(strictness = Strictness.LENIENT) +@RunWith(MockitoJUnitRunner.Silent.class) +public class OpenSearchEvalOperatorTest { + + @Mock private PhysicalPlan input; + + @Mock(answer = Answers.RETURNS_DEEP_STUBS) + private NodeClient nodeClient; + + @Mock(answer = Answers.RETURNS_DEEP_STUBS) + private ActionFuture actionFuture; + + private final ExprTupleValue DATE_ROW = + new ExprTupleValue( + new LinkedHashMap<>( + Map.of( + "firstname", + new OpenSearchExprTextValue("Amber"), + "age", + new ExprLongValue(32), + "email", + new OpenSearchExprTextValue("amberduke@pyrami.com"), + "ipInStr", + new OpenSearchExprTextValue("192.168.1.1")))); + + /** + * The test-case aim to assert OpenSearchEvalOperator behaviour when evaluating generic + * expression, which is not OpenSearch Engine specific. (Ex: ABS(age) ) + */ + @Test + public void testEvalOperatorOnGenericOperations() { + + // The input dataset + when(input.next()).thenReturn(DATE_ROW); + + // Expression to be evaluated + List> ipAddress = + List.of( + ImmutablePair.of( + new ReferenceExpression("ageInAbs", OpenSearchTextType.of()), + DSL.abs(DSL.abs(new ReferenceExpression("age", ExprCoreType.LONG))))); + + OpenSearchEvalOperator evalOperator = new OpenSearchEvalOperator(input, ipAddress, nodeClient); + + // Make sure generic Expression function as expected when wrapped with OpenSearchEvalOperator. + assertSame(32, evalOperator.next().keyValue("ageInAbs").integerValue()); + } + + /** + * The test-case aim to assert OpenSearchEvalOperator behaviour when evaluating + * geoipFunctionExpression, which is only available when executing on OpeSearch storage engine. + * (No option) + */ + @SneakyThrows + @Test + public void testEvalOperatorOnGeoIpExpression() { + + // The input dataset + when(input.next()).thenReturn(DATE_ROW); + when(nodeClient.execute( + eq(IpEnrichmentAction.INSTANCE), + argThat( + request -> + request instanceof IpEnrichmentRequest + && "192.168.1.1".equals(((IpEnrichmentRequest) request).getIpString())))) + .thenReturn(actionFuture); + when(actionFuture.get()).thenReturn(new IpEnrichmentResponse(Map.of("country_name", "Canada"))); + + // Expression to be evaluated + List> ipAddress = + List.of( + ImmutablePair.of( + new ReferenceExpression("ipEnrichmentResult", OpenSearchTextType.of()), + new OpenSearchFunctions.OpenSearchExecutableFunction( + BuiltinFunctionName.GEOIP.getName(), + List.of( + DSL.literal("my-datasource"), + new ReferenceExpression("ipInStr", OpenSearchTextType.of())), + BOOLEAN))); + + OpenSearchEvalOperator evalOperator = new OpenSearchEvalOperator(input, ipAddress, nodeClient); + + // Make sure generic Expression function as expected when wrapped with OpenSearchEvalOperator. + Map ipEnrichmentResult = + evalOperator.next().keyValue("ipEnrichmentResult").tupleValue(); + assertSame("Canada", ipEnrichmentResult.get("country_name").stringValue()); + } +} diff --git a/ppl/src/main/antlr/OpenSearchPPLLexer.g4 b/ppl/src/main/antlr/OpenSearchPPLLexer.g4 index c484f34a2a6..f9d9e1b43c7 100644 --- a/ppl/src/main/antlr/OpenSearchPPLLexer.g4 +++ b/ppl/src/main/antlr/OpenSearchPPLLexer.g4 @@ -87,6 +87,9 @@ ANOMALY_SCORE_THRESHOLD: 'ANOMALY_SCORE_THRESHOLD'; CASE: 'CASE'; IN: 'IN'; +// Geo IP eval function +GEOIP: 'GEOIP'; + // LOGICAL KEYWORDS NOT: 'NOT'; OR: 'OR'; diff --git a/ppl/src/main/antlr/OpenSearchPPLParser.g4 b/ppl/src/main/antlr/OpenSearchPPLParser.g4 index acae54b7d9d..c70f8af2cc8 100644 --- a/ppl/src/main/antlr/OpenSearchPPLParser.g4 +++ b/ppl/src/main/antlr/OpenSearchPPLParser.g4 @@ -419,6 +419,7 @@ evalFunctionName | flowControlFunctionName | systemFunctionName | positionFunctionName + | geoipFunctionName ; functionArgs @@ -520,6 +521,10 @@ mathematicalFunctionName | trigonometricFunctionName ; +geoipFunctionName + : GEOIP + ; + trigonometricFunctionName : ACOS | ASIN @@ -864,6 +869,7 @@ keywordsCanBeId | mathematicalFunctionName | positionFunctionName | conditionFunctionName + | geoipFunctionName // commands | SEARCH | DESCRIBE