Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 3 additions & 21 deletions e2e_test/batch/describe_fragments.slt
Original file line number Diff line number Diff line change
Expand Up @@ -133,35 +133,17 @@ skipif madsim
system ok
psql_validate.py --db $__DATABASE__ --sql "DESCRIBE FRAGMENTS describe_plan_test.idx" \
--expected 'Fragment % (Actor %)
StreamMaterialize { columns: [name, age, created_at, tbl.id(hidden)], stream_key: [tbl.id], pk_columns: [name, age, tbl.id], pk_conflict: NoCheck }
StreamMaterialize { columns: [name, age, created_at, tbl.id(hidden)], stream_key: [name, tbl.id], pk_columns: [name, age, tbl.id], pk_conflict: NoCheck }
├── output: [ tbl.name, tbl.age, tbl.created_at, tbl.id ]
├── stream key: [ tbl.id ]
├── stream key: [ tbl.name, tbl.id ]
└── MergeExecutor { output: [ tbl.name, tbl.age, tbl.created_at, tbl.id ], stream key: [ tbl.id ] }
(empty)
Fragment % (Actor %)
StreamTableScan { table: tbl, columns: [name, age, created_at, id] }
├── output: [ tbl.name, tbl.age, tbl.created_at, tbl.id ]
├── stream key: [ tbl.id ]
StreamTableScan { table: tbl, columns: [name, age, created_at, id] } { output: [ tbl.name, tbl.age, tbl.created_at, tbl.id ], stream key: [ tbl.id ] }
├── Upstream { output: [ name, age, created_at, id ], stream key: [] }
└── BatchPlanNode { output: [ name, age, created_at, id ], stream key: [] }'


skipif madsim
system ok
psql_validate.py --db $__DATABASE__ --sql "DESCRIBE FRAGMENTS describe_plan_test.idx" \
--expected 'Fragment % (Actor %)
StreamMaterialize { columns: [name, age, created_at, tbl.id(hidden)], stream_key: [tbl.id], pk_columns: [name, age, tbl.id], pk_conflict: NoCheck }
├── output: [ tbl.name, tbl.age, tbl.created_at, tbl.id ]
├── stream key: [ tbl.id ]
└── MergeExecutor { output: [ tbl.name, tbl.age, tbl.created_at, tbl.id ], stream key: [ tbl.id ] }
(empty)
Fragment % (Actor %)
StreamTableScan { table: tbl, columns: [name, age, created_at, id] }
├── output: [ tbl.name, tbl.age, tbl.created_at, tbl.id ]
├── stream key: [ tbl.id ]
├── Upstream { output: [ name, age, created_at, id ], stream key: [] }
└── BatchPlanNode { output: [ name, age, created_at, id ], stream key: [] }'


skipif madsim
system ok
Expand Down
37 changes: 37 additions & 0 deletions e2e_test/streaming/mv_on_index.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
statement ok
set implicit_flush = true;

statement ok
create table t (id int primary key, a int, b int);

statement ok
create index idx_a on t(a);

statement ok
create materialized view mv as select * from idx_a;

statement ok
insert into t values (1, 1, 1);

statement ok
update t set a = a + 1;

statement ok
update t set a = a + 1;

statement ok
update t set a = a + 1;

statement ok
update t set a = a + 1;

statement ok
update t set a = a + 1;

query III
select * from mv;
----
6 1 1

statement ok
drop table t cascade;
Original file line number Diff line number Diff line change
Expand Up @@ -221,5 +221,12 @@
create table t(a int, b int, c int, x int[]);
create index idx_a on t(a);
select sum(unnest) from (select a, b, c, unnest(x) as unnest from t) group by a + 1;
expected_outputs:
- stream_plan
- name: Ban index selection for indexes with distribution key is not the subset of the group by keys
sql: |
create table t(a int, b int, c int, x int[]);
create index idx_a on t(a, b, c) distributed by (a, b, c);
select count(*) from t group by a, b;
expected_outputs:
- stream_plan
10 changes: 6 additions & 4 deletions src/frontend/planner_test/tests/testdata/output/agg.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1041,7 +1041,7 @@
StreamMaterialize { columns: [cnt, i.x(hidden)], stream_key: [i.x], pk_columns: [i.x], pk_conflict: NoCheck }
└─StreamProject { exprs: [count, i.x] }
└─StreamHashAgg { group_key: [i.x], aggs: [count] }
└─StreamTableScan { table: i, columns: [i.x, i.t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [i.t._row_id], pk: [x, t._row_id], dist: UpstreamHashShard(i.x) }
└─StreamTableScan { table: i, columns: [i.x, i.t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [i.x, i.t._row_id], pk: [x, t._row_id], dist: UpstreamHashShard(i.x) }
- name: distinct aggregates only have one distinct argument doesn't need expand
sql: |
create table t(x int, y int);
Expand Down Expand Up @@ -1502,10 +1502,12 @@
└─BatchScan { table: idx, columns: [idx.id], distribution: SomeShard }
stream_plan: |-
StreamMaterialize { columns: [count, idx.id(hidden)], stream_key: [idx.id], pk_columns: [idx.id], pk_conflict: NoCheck }
└─StreamProject { exprs: [count, idx.id] }
└─StreamHashAgg { group_key: [idx.id], aggs: [count] }
└─StreamProject { exprs: [sum0(count), idx.id] }
└─StreamHashAgg { group_key: [idx.id], aggs: [sum0(count), count] }
└─StreamExchange { dist: HashShard(idx.id) }
└─StreamTableScan { table: idx, columns: [idx.id], stream_scan_type: ArrangementBackfill, stream_key: [idx.id], pk: [col, id], dist: SomeShard }
└─StreamHashAgg { group_key: [idx.id, _vnode], aggs: [count] }
└─StreamProject { exprs: [idx.id, idx.col, Vnode(idx.col) as _vnode] }
└─StreamTableScan { table: idx, columns: [idx.id, idx.col], stream_scan_type: ArrangementBackfill, stream_key: [idx.col, idx.id], pk: [col, id], dist: UpstreamHashShard(idx.col) }
- name: two phase agg with stream SomeShard (via index) but pk does not satisfy output dist should use two phase agg
sql: |
SET QUERY_MODE TO DISTRIBUTED;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,9 +216,8 @@
create index idx on t(col);
select id from idx;
stream_plan: |-
StreamMaterialize { columns: [id], stream_key: [id], pk_columns: [id], pk_conflict: NoCheck }
└─StreamExchange { dist: HashShard(idx.id) }
└─StreamTableScan { table: idx, columns: [idx.id], stream_scan_type: ArrangementBackfill, stream_key: [idx.id], pk: [col, id], dist: SomeShard }
StreamMaterialize { columns: [id, idx.col(hidden)], stream_key: [idx.col, id], pk_columns: [idx.col, id], pk_conflict: NoCheck }
└─StreamTableScan { table: idx, columns: [idx.id, idx.col], stream_scan_type: ArrangementBackfill, stream_key: [idx.col, idx.id], pk: [col, id], dist: UpstreamHashShard(idx.col) }
- sql: |
select * from generate_series(1, 10000000, 1) where Now() is null;
batch_plan: 'BatchValues { rows: [] }'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@
/* should generate delta join plan, and stream index scan */
select * from a join b on a.a1 = b.b1 ;
stream_plan: |-
StreamMaterialize { columns: [a1, a2, b1, b2, i_a1.a._row_id(hidden), i_b1.b._row_id(hidden)], stream_key: [i_a1.a._row_id, i_b1.b._row_id, a1], pk_columns: [i_a1.a._row_id, i_b1.b._row_id, a1], pk_conflict: NoCheck }
StreamMaterialize { columns: [a1, a2, b1, b2, i_a1.a._row_id(hidden), i_b1.b._row_id(hidden)], stream_key: [a1, i_a1.a._row_id, i_b1.b._row_id], pk_columns: [a1, i_a1.a._row_id, i_b1.b._row_id], pk_conflict: NoCheck }
└─StreamExchange { dist: HashShard(i_a1.a1, i_a1.a._row_id, i_b1.b._row_id) }
└─StreamDeltaJoin { type: Inner, predicate: i_a1.a1 = i_b1.b1, output: [i_a1.a1, i_a1.a2, i_b1.b1, i_b1.b2, i_a1.a._row_id, i_b1.b._row_id] }
├─StreamTableScan { table: i_a1, columns: [i_a1.a1, i_a1.a2, i_a1.a._row_id], stream_scan_type: Backfill, stream_key: [i_a1.a._row_id], pk: [a1, a._row_id], dist: UpstreamHashShard(i_a1.a1) }
└─StreamTableScan { table: i_b1, columns: [i_b1.b1, i_b1.b2, i_b1.b._row_id], stream_scan_type: UpstreamOnly, stream_key: [i_b1.b._row_id], pk: [b1, b._row_id], dist: UpstreamHashShard(i_b1.b1) }
├─StreamTableScan { table: i_a1, columns: [i_a1.a1, i_a1.a2, i_a1.a._row_id], stream_scan_type: Backfill, stream_key: [i_a1.a1, i_a1.a._row_id], pk: [a1, a._row_id], dist: UpstreamHashShard(i_a1.a1) }
└─StreamTableScan { table: i_b1, columns: [i_b1.b1, i_b1.b2, i_b1.b._row_id], stream_scan_type: UpstreamOnly, stream_key: [i_b1.b1, i_b1.b._row_id], pk: [b1, b._row_id], dist: UpstreamHashShard(i_b1.b1) }
- sql: |
set rw_streaming_enable_delta_join = true;
create table a (a1 int primary key, a2 int);
Expand All @@ -25,7 +25,7 @@
└─StreamExchange { dist: HashShard(a.a1, i_b1.b._row_id) }
└─StreamDeltaJoin { type: Inner, predicate: a.a1 = i_b1.b1, output: all }
├─StreamTableScan { table: a, columns: [a.a1, a.a2], stream_scan_type: Backfill, stream_key: [a.a1], pk: [a1], dist: UpstreamHashShard(a.a1) }
└─StreamTableScan { table: i_b1, columns: [i_b1.b1, i_b1.b2, i_b1.b._row_id], stream_scan_type: UpstreamOnly, stream_key: [i_b1.b._row_id], pk: [b1, b._row_id], dist: UpstreamHashShard(i_b1.b1) }
└─StreamTableScan { table: i_b1, columns: [i_b1.b1, i_b1.b2, i_b1.b._row_id], stream_scan_type: UpstreamOnly, stream_key: [i_b1.b1, i_b1.b._row_id], pk: [b1, b._row_id], dist: UpstreamHashShard(i_b1.b1) }
- sql: |
set rw_streaming_enable_delta_join = true;
create table a (a1 int primary key, a2 int);
Expand Down
Loading
Loading