-
Notifications
You must be signed in to change notification settings - Fork 178
Description
Overview
Our team has implemented a SQL plugin named OLAP based on our internal versions of OS. This plugin has been widely used in analytical scenarios and has achieved excellent results in our internal business for two years.
To support modern SQL query functionalities, OLAP adopts a groundbreaking design, rewriting almost the entire query process except for data reading from Lucene. This design enables OLAP to execute data queries in MPP mode. In simple terms, OLAP can be divided into three parts: plan optimization, task scheduling, and execution.
In the query plan optimization phase, OLAP utilizes Apache Calcite for SQL query parsing and plan optimization. It employs Calcite's HepPlanner and VolcanoPlanner for rule-based optimization (RBO) and top-down cost-based optimization (CBO). The layout of the optimizer draws inspiration from Flink's chained optimization, which facilitates the insertion of new optimization rules at any stage. Users can also control the query optimization path based on query parameters. This optimization approach lays the foundation for a pluggable native engine, allowing for different optimization rules to be applied based on different native engines, and the native engine can be selected during the query process.
In the scheduling phase, the query plan is converted into a Directed Acyclic Graph before scheduling begins. The scheduler further transforms this DAG into a series of tasks that can be executed on the cluster and links them based on upstream and downstream dependencies. The scheduling process supports task retries and monitors the health of cluster nodes to avoid dispatching tasks to unavailable nodes.
The execution layer manages the entire lifecycle of query tasks, ensuring they run according to plan, and carries out fault recovery. It is also responsible for data transmission and exchange. The execution layer effectively manages computing resources and reports error information. Additionally, it collects and reports performance metrics of task execution to provide data support for performance tuning. In the computation phase, OLAP has used three different native engines for acceleration, namely Calcite, the community edition of Velox, and an in-house native engine, achieving significant performance gains.
Plan Optimization
OLAP can map Index metadata to SQL metadata and treat regular expressions of time-series indexes as table partitions.
POST /_olap/database/test_database
POST /_olap/table
{
"database": "test_database",
"table": "table",
"type": "external",
"connector": {
"es": {
"index_pattern": "index\\-(\\d{4}.\\d{2}.\\d{2})",
"index_pattern_match": ["partition_0"],
"mapping_merge_policy": "created_as_order",
"partitions":[
{
"field": "_pdate_",
"type": "varchar"
}
]
}
}
}
POST /_olap/sql
{
"sql":"select * from `test_database`.`table` where _pdate_ = "2025.11.14"
}
POST /_olap/sql
{
"sql":"select * from `index-2025.11.14`
}The query plan can be optimized for both scatter-gather and MPP models. During the planning phase, partition pruning and predicate pushdown are performed based on query conditions. It also supports optimization operations such as RuntimeFilter and TopN.
As shown in the figure, the process first enters the LogicalOptimizer for some conventional RBO optimizations. These rules are common optimization methods in many databases and are purely logical, independent of data distribution. Subsequently, some CBO optimizations are performed, mainly JoinReorder, which relies on collected metadata information.
The physical optimization phase mainly incorporates data distribution factors into the query plan, such as when to insert an Exchange in the data flow. This phase involves more complex optimizations like two-stage sorting and two-stage aggregation. At the end of the physical optimization phase, OLAP will perform RuntimeFilter-related optimizations.
Pluggable Native Engine
The choice of the native engine is actually determined during the query planning phase. The main job of the EngineOptimizer is to extract the parts of the query plan related to the native engine and convert them into a corresponding native engine plan. This step is a prerequisite for a pluggable native engine. The remaining parts of the plan are responsible for data reading and data transmission, which are common parts independent of the native engine.
The subsequent ExecOptimizer part generates the corresponding native engine computation objects from the plan generated in the previous stage. During execution, these generated computation objects are called directly.
The native engine-related logic here is all interface-based. If you want to support any other native engine, such as Velox or DataFusion, you just need to implement the corresponding interfaces.
Engine Interface Diagram
Explain
OLAP also supports displaying the query plan.
--------------------------------------------------
SET runtime_filter_enabled=true;
SELECT a.long_field
FROM (select id,long_field/2 as long_field from `index-v1` where id > 13 and long_field/2 > 767) a
JOIN `index-v2` b on a.id = b.id and a.long_field = b.long_fieldEXPLAIN RESULT
RecordType(DOUBLE long_field)
==================== APPLY [SQL TO PLAN CONVERTER]====================
0:LogicalProject(long_field=[$1])
1:LogicalJoin(condition=[AND(=($0, $2), =($1, $3))], joinType=[inner])
4:LogicalProject(id=[$0], long_field=[divide_as_fraction($1, 2)])
5:LogicalFilter(condition=[AND(>($0, 13), >(divide_as_fraction($1, 2), 767))])
6:LogicalProject(id=[$10], long_field=[$14])
7:LogicalTableScan(table=[appendix$1])
2:LogicalProject(id=[$10], long_field0=[CAST($14):DOUBLE])
3:LogicalTableScan(table=[appendix$0])
==================== APPLY [LOGICAL OPTIMIZER]====================
0:LogicalProject(long_field=[$1])
1:LogicalJoin(condition=[AND(=($0, $2), =($1, $3))], joinType=[inner])
4:LogicalProject(id=[$0], long_field=[divide_as_fraction($1, 2)])
5:LogicalFilter(condition=[>(divide_as_fraction($1, 2), 767)])
6:LogicalTableScan(table=[appendix$3])
2:LogicalProject(id=[$0], long_field0=[CAST($1):DOUBLE])
3:LogicalTableScan(table=[appendix$2])
==================== APPLY [CBO OPTIMIZER]====================
0:LogicalProject(long_field=[$1])
1:LogicalJoin(condition=[AND(=($0, $2), =($1, $3))], joinType=[inner])
4:LogicalProject(id=[$0], long_field=[divide_as_fraction($1, 2)])
5:LogicalFilter(condition=[>(divide_as_fraction($1, 2), 767)])
6:LogicalTableScan(table=[appendix$3])
2:LogicalProject(id=[$0], long_field0=[CAST($1):DOUBLE])
3:LogicalTableScan(table=[appendix$2])
==================== APPLY [SCATTER GATHER PHYSICAL OPTIMIZER]====================
0:PhysicalProject(long_field=[$1])
1:PhysicalJoin(condition=[AND(=($0, $2), =($1, $3))], joinType=[inner], hints=[[]])
5:PhysicalExchange(exchangeId=[0], distribution=[{"type":"EXECUTION_SINGLETON"}], exchangeInfo=[{"partitionActionInfos":[{"partitionActionType":"GATHER"}]}])
6:PhysicalProject(id=[$0], long_field=[divide_as_fraction($1, 2)])
7:PhysicalFilter(condition=[>(divide_as_fraction($1, 2), 767)])
8:PhysicalTableScan(table=[appendix$3], hints=[[]], distribution=[{"tableId":0,"type":"SOURCE_RANDOM"}])
2:PhysicalExchange(exchangeId=[3], distribution=[{"type":"EXECUTION_SINGLETON"}], exchangeInfo=[{"partitionActionInfos":[{"partitionActionType":"GATHER"}]}])
3:PhysicalProject(id=[$0], long_field0=[CAST($1):DOUBLE])
4:PhysicalTableScan(table=[appendix$2], hints=[[]], distribution=[{"tableId":1,"type":"SOURCE_RANDOM"}])
==================== APPLY [PHYSICAL RUNTIME FILTER OPTIMIZER]====================
0:PhysicalProject(long_field=[$1])
1:PhysicalJoin(condition=[AND(=($0, $2), =($1, $3))], joinType=[inner], hints=[[]])
5:PhysicalExchange(exchangeId=[0], distribution=[{"type":"EXECUTION_SINGLETON"}], exchangeInfo=[{"partitionActionInfos":[{"partitionActionType":"GATHER"}]}])
6:PhysicalProject(id=[$0], long_field=[divide_as_fraction($1, 2)])
7:PhysicalFilter(condition=[>(divide_as_fraction($1, 2), 767)])
8:PhysicalRfTableScan(table=[appendix$3], hints=[[]], distribution=[{"tableId":0,"type":"SOURCE_RANDOM"}], runtimeFilterToField=[appendix$4])
9:PhysicalExchange(exchangeId=[6], distribution=[{"tableId":0,"type":"SOURCE_BROADCAST"}], exchangeInfo=[appendix$5])
10:PhysicalRuntimeFilterBuilder(select=[0], runtimeFilterType=[TERMS], runtimeFilterStage=[FINAL])
11:PhysicalExchange(exchangeId=[5], distribution=[{"type":"EXECUTION_SINGLETON"}], exchangeInfo=[{"partitionActionInfos":[{"partitionActionType":"GATHER"}]}])
12:PhysicalRuntimeFilterBuilder(select=[0], runtimeFilterType=[TERMS], runtimeFilterStage=[PARTIAL])
ReuseRel(reuseId=3, relType=PhysicalProject)
2:PhysicalExchange(exchangeId=[3], distribution=[{"type":"EXECUTION_SINGLETON"}], exchangeInfo=[{"partitionActionInfos":[{"partitionActionType":"GATHER"}]}])
3:PhysicalProject(id=[$0], long_field0=[CAST($1):DOUBLE])
4:PhysicalTableScan(table=[appendix$2], hints=[[]], distribution=[{"tableId":1,"type":"SOURCE_RANDOM"}])
==================== APPLY [ENGINE OPTIMIZER]====================
0:PhysicalEngine(engineFragment=[appendix$6])
4:PhysicalExchange(exchangeId=[0], distribution=[{"type":"EXECUTION_SINGLETON"}], exchangeInfo=[{"partitionActionInfos":[{"partitionActionType":"GATHER"}]}])
5:PhysicalEngine(engineFragment=[appendix$8])
6:PhysicalRfTableScan(table=[appendix$3], hints=[[]], distribution=[{"tableId":0,"type":"SOURCE_RANDOM"}], runtimeFilterToField=[appendix$4])
7:PhysicalExchange(exchangeId=[6], distribution=[{"tableId":0,"type":"SOURCE_BROADCAST"}], exchangeInfo=[appendix$5])
8:PhysicalRuntimeFilterBuilder(select=[0], runtimeFilterType=[TERMS], runtimeFilterStage=[FINAL])
9:PhysicalExchange(exchangeId=[5], distribution=[{"type":"EXECUTION_SINGLETON"}], exchangeInfo=[{"partitionActionInfos":[{"partitionActionType":"GATHER"}]}])
10:PhysicalRuntimeFilterBuilder(select=[0], runtimeFilterType=[TERMS], runtimeFilterStage=[PARTIAL])
ReuseRel(reuseId=2, relType=PhysicalEngine)
1:PhysicalExchange(exchangeId=[3], distribution=[{"type":"EXECUTION_SINGLETON"}], exchangeInfo=[{"partitionActionInfos":[{"partitionActionType":"GATHER"}]}])
2:PhysicalEngine(engineFragment=[appendix$7])
3:PhysicalTableScan(table=[appendix$2], hints=[[]], distribution=[{"tableId":1,"type":"SOURCE_RANDOM"}])
==================== APPLY [EXEC OPTIMIZER]====================
0:ExecExchange(exchangeId=[7], distribution=[{"type":"EXECUTION_SINGLETON"}], exchangeInfo=[{"partitionActionInfos":[{"partitionActionType":"GATHER"}]}])
1:ExecEngine(engineBridge=[appendix$9])
5:ExecExchange(exchangeId=[0], distribution=[{"type":"EXECUTION_SINGLETON"}], exchangeInfo=[{"partitionActionInfos":[{"partitionActionType":"GATHER"}]}])
6:ExecEngine(engineBridge=[appendix$11])
7:ExecRfTableScan(table=[appendix$3], hints=[[]], distribution=[{"tableId":0,"type":"SOURCE_RANDOM"}], runtimeFilterToField=[appendix$4])
8:ExecExchange(exchangeId=[6], distribution=[{"tableId":0,"type":"SOURCE_BROADCAST"}], exchangeInfo=[appendix$5])
9:ExecRuntimeFilter(select=[0], runtimeFilterType=[TERMS], runtimeFilterStage=[FINAL])
10:ExecExchange(exchangeId=[5], distribution=[{"type":"EXECUTION_SINGLETON"}], exchangeInfo=[{"partitionActionInfos":[{"partitionActionType":"GATHER"}]}])
11:ExecRuntimeFilter(select=[0], runtimeFilterType=[TERMS], runtimeFilterStage=[PARTIAL])
ReuseRel(reuseId=3, relType=ExecEngine)
2:ExecExchange(exchangeId=[3], distribution=[{"type":"EXECUTION_SINGLETON"}], exchangeInfo=[{"partitionActionInfos":[{"partitionActionType":"GATHER"}]}])
3:ExecEngine(engineBridge=[appendix$10])
4:ExecTableScan(table=[appendix$2], hints=[[]], distribution=[{"tableId":1,"type":"SOURCE_RANDOM"}])
APPENDICES:
appendix$0: [ table: [default, index-v2], tableId: 1, indexExpression: index-v2]
appendix$1: [ table: [default, index-v1], tableId: 0, indexExpression: index-v1]
appendix$2: [ table: [default, index-v2], tableId: 1, projects: [id, long_field], indexExpression: index-v2]
appendix$3: [ table: [default, index-v1], tableId: 0, projects: [id, long_field], indexExpression: index-v1, predicates: {"bool":{"filter":[{"range":{"id":{"from":13,"to":null,"include_lower":false,"include_upper":true,"boost":1.0}}},{"match_all":{"boost":1.0}}],"adjust_pure_negative":true,"boost":1.0}}]
appendix$4: {0={"runtimeFilterType":"TERMS","field":"id","forceDependency":false}}
appendix$5: {"partitionActionInfos":[{"partitionActionType":"BROADCAST"}]}
appendix$6: EngineBridgeProject(long_field=[$1])
EngineBridgeHashJoin(condition=[AND(=($0, $2), =($1, $3))], joinType=[inner])
EngineBridgeProject(id=[$0], long_field=[$1])
EngineBridgeTableScan(table=[[engine_data_source_table_0]], fields=[[{"name":"id","type":"INTEGER"}, {"name":"long_field","type":"DOUBLE"}]])
EngineBridgeProject(expr_0=[$0], long_field0=[$1])
EngineBridgeTableScan(table=[[engine_data_source_table_1]], fields=[[{"name":"id","type":"INTEGER"}, {"name":"long_field0","type":"DOUBLE"}]])
appendix$7: EngineBridgeProject(id=[$0], long_field0=[CAST($1):DOUBLE])
EngineBridgeTableScan(table=[[engine_data_source_table_0]], fields=[[{"name":"id","type":"INTEGER"}, {"name":"long_field","type":"BIGINT"}]])
appendix$8: EngineBridgeProject(id=[$0], long_field=[divide_as_fraction($1, 2)])
EngineBridgeFilter(condition=[>(divide_as_fraction($1, 2), 767)])
EngineBridgeTableScan(table=[[engine_data_source_table_0]], fields=[[{"name":"id","type":"INTEGER"}, {"name":"long_field","type":"BIGINT"}]])
appendix$9: plan: {"nodes":[{"id":0,"type":"project","projections":[""long_field" AS "long_field""]},{"id":1,"type":"hashJoin","joinType":"inner","leftKeys":[0,1],"rightKeys":[0,1],"filter":"CAST(true as BOOLEAN)"},{"id":2,"type":"project","projections":[""id" AS "id"",""long_field" AS "long_field""]},{"id":3,"type":"tableScan"},{"id":4,"type":"project","projections":[""id" AS "expr_0"",""long_field0" AS "long_field0""]},{"id":5,"type":"tableScan"}],"edges":[{"src":0,"dest":1},{"src":1,"dest":2},{"src":2,"dest":3},{"src":1,"dest":4},{"src":4,"dest":5}]}, tableScan: {"3": {"names": ["id","long_field"], "types": ["INTEGER","DOUBLE"]}, "5": {"names": ["id","long_field0"], "types": ["INTEGER","DOUBLE"]}}
appendix$10: plan: {"nodes":[{"id":0,"type":"project","projections":[""id" AS "id"","CAST("long_field" as DOUBLE) AS "long_field0""]},{"id":1,"type":"tableScan"}],"edges":[{"src":0,"dest":1}]}, tableScan: {"1": {"names": ["id","long_field"], "types": ["INTEGER","BIGINT"]}}
appendix$11: plan: {"nodes":[{"id":0,"type":"project","projections":[""id" AS "id"","(CAST("long_field" AS DOUBLE) / CAST(CAST(2 as INTEGER) AS DOUBLE)) AS "long_field""]},{"id":1,"type":"filter","filter":"((CAST("long_field" AS DOUBLE) / CAST(CAST(2 as INTEGER) AS DOUBLE)) > CAST(CAST(767 as INTEGER) as DOUBLE))"},{"id":2,"type":"tableScan"}],"edges":[{"src":0,"dest":1},{"src":1,"dest":2}]}, tableScan: {"2": {"names": ["id","long_field"], "types": ["INTEGER","BIGINT"]}}
Task Scheduling
OLAP draws on the design concepts of Flink, Presto, and StarRocks, introducing Stages and Operators to represent the plan topology.
Operators are converted one-to-one from RelNodes and represent the execution of a function. For example, a TaleScanOperator is used to read necessary data, while an EngineOperator represents logic that needs to be executed by the native engine. Operators are uniformly abstracted; you only need to define the input, output, and the logic the operator itself needs to complete. This is also why it can support different native engines in a pluggable manner. For an EngineOperator, it only needs to pass the data to the corresponding native engine processing logic and return the processed result; the specific native engine execution logic has already been determined in the planning phase.
The Operators, after the RelNode transformation is complete, form DAG of a complete query. An Operator can have multiple upstream and downstream operators. For physical execution, this DAG needs to be partitioned to deploy tasks in a distributed cluster. A Stage, as an abstract unit of distributed scheduling, splits and merges the DAG based on data Exchange to avoid frequent network data transmission. A Stage can contain multiple Operators, and a Stage can have multiple upstream or downstream stages.
A QueryTask is the basic unit of distributed scheduling and execution. The scheduling layer will construct one or more QueryTasks from a Stage. It can be understood that a QueryTask is a distributed expansion of a Stage. During the expansion process, upstream and downstream information and physical scheduling information are added. QueryTasks are aggregated by node and then sent for execution. The execution request sent to each node will contain the task information of one or more stages.
Diagram of the relationship between Stage, Operator, and QueryTask
MPP
Stages and Operators separate the specific data computation and data transmission, ensuring scheduling flexibility and making MPP possible. As long as an MPP plan can be generated in the query plan, the scheduling layer can schedule query tasks across the entire OS cluster, regardless of whether the scheduled node acts as a client processing the request or contains the shard corresponding to the query data.
Task Execution
The execution layer adopts a push model, in which upstream pushes data to downstream. When a QueryTask is initialized on a deployment node, the Operators are initialized starting from the downstream. Each Operator is instantiated as a TableConsumer and holds a reference to the downstream Consumer. Multiple TableConsumers form a Task pipeline. After receiving and processing the data, the TableConsumer pushes the data to the downstream. It should be noted that when a node has multiple Tasks, their deployment and execution are asynchronous.
In terms of execution, OLAP has made many acceleration optimizations. OLAP extensively uses Apache Arrow as a data carrier to reduce the overhead caused by frequent data copying. To achieve faster and more efficient data reading, OLAP supports concurrent data reading at the LeafReaderContext granularity when reading shard data.
In addition, it supports custom data transmission strategies, such as hash shuffle.
Execution Flow Diagram
Performance
Currently, our OLAP clusters have been deployed on thousands of nodes internally. While preserving the low latency and high QPS capabilities of Elasticsearch, we have increased the supported computation scale by over an order of magnitude. The system can handle over 10,000 QPS for JOIN queries, with typical single-query latency in hundreds of milliseconds.
In terms of scale, a single machine can support aggregations or JOINs on up to 100 million rows of data. In a distributed environment, when optimizations like hash distribution are properly applied, the query scale is theoretically unlimited.
To further enhance performance and stability, the system utilizes off-heap memory end-to-end, which alleviates GC pressure on the heap and overcomes its memory limitations. For instance, on a machine with 1 TB of physical memory and a 32 GB JVM configuration, by fully leveraging system memory, the available memory for computation can be increased from approximately 20 GB to around 500 GB.
-
In a scenario involving a JOIN between a 200-million-row dataset (after filtering) and a 20,000-row dataset (after filtering), the combination of Runtime Filter and JOIN Reorder optimizations can drastically reduce query time from about 2 minutes to approximately 100 milliseconds.
-
Process 2 million detailed rows:
- Aggregate into 100,000 rows (fewer dimensions), sort and fetch the last page; end-to-end latency is about 0.5 seconds.
- Aggregate into 1 million rows (multiple dimensions), sort and fetch the last page; end-to-end latency is about 1.5 seconds.
-
Process 1 billion detailed rows, aggregate into 100 million buckets, perform a Shuffle JOIN, and return the top 100 results; end-to-end latency is about 3 seconds.
Partial Features and Diagrams
Cross Cluster Query
Join
Proposal
Although there are some differences between the OLAP plugin and opensearch-sql, we believe that the ultimate goal is the same. We hope to contribute this part of the code to the community so that more people can participate.
Metadata
Metadata
Assignees
Labels
Type
Projects
Status






