Skip to content
Open
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
7632807
feat(iceberg): add BatchIcebergListExecutor and BatchIcebergFetchExec…
Oct 21, 2025
cf1ad3f
feat(iceberg): implement execute and debug traits for BatchIcebergLis…
Oct 21, 2025
03ae5de
feat: add SourceRefreshMode to catalog and stream plan
Oct 22, 2025
985d923
feat(iceberg): integrate metrics into BatchIcebergListExecutor
Oct 22, 2025
6d9f9e1
feat: enhance StreamFsFetch with refresh mode support
Oct 22, 2025
9093772
feat(iceberg): enhance BatchIcebergFetchExecutor with new functionality
Oct 22, 2025
16a7f3d
feat(iceberg): enhance delete file handling in Iceberg scan and fetch…
Oct 23, 2025
7646c6a
stash
Oct 23, 2025
c31e351
fix: improve error handling and logging in Iceberg source executors
Oct 24, 2025
12508fb
Merge branch 'main' into tab/batch-iceberg-3
tabVersion Oct 24, 2025
e8d2a11
feat(iceberg): integrate refresh mode handling in table creation
Oct 24, 2025
775acd1
feat(iceberg): enhance delete file processing in Iceberg scan and fet…
Oct 26, 2025
ce8ae20
refactor(iceberg): change logging level to debug for delete file proc…
Oct 27, 2025
07cd958
Merge branch 'main' into tab/batch-iceberg-3
tabVersion Oct 27, 2025
1cf1894
Merge branch 'main' into tab/batch-iceberg-3
tabVersion Oct 28, 2025
2cad418
refactor(iceberg): replace legacy scan function with new implementati…
Oct 28, 2025
db95e26
test(iceberg): add end-to-end test for refreshing Iceberg tables
Oct 28, 2025
04cade2
Merge remote-tracking branch 'origin' into tab/batch-iceberg-3
Nov 3, 2025
6da13ad
refactor(iceberg): update delete handling logic and improve type safety
Nov 3, 2025
4f5b6c5
refactor(iceberg): enhance delete handling and improve logging
Nov 3, 2025
7e6037b
Merge remote-tracking branch 'origin' into tab/batch-iceberg-3
Nov 4, 2025
afa89b0
refactor(iceberg): improve equality delete handling and type safety
Nov 4, 2025
68adb94
refactor(iceberg): simplify delete handling in scan task
Nov 4, 2025
816ec34
feat: implement refresh progress tracking for refresh table (#23671)
tabVersion Nov 7, 2025
cf28501
Merge remote-tracking branch 'origin' into tab/batch-iceberg-3
Nov 7, 2025
96b7f2e
refactor: simplify command scheduling and ID handling in barrier and …
Nov 7, 2025
08b5418
refactor: rename manual trigger refresh mode to full recompute in sou…
Nov 7, 2025
0855532
feat: add invalid refresh mode option and update refresh mode to full…
Nov 7, 2025
f704c26
feat: enhance dropped table handling by cleaning up refresh progress …
Nov 7, 2025
046e78a
Revert "feat: enhance dropped table handling by cleaning up refresh p…
Nov 7, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 129 additions & 0 deletions e2e_test/iceberg/test_case/pure_slt/refresh_iceberg_table.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
control substitution on

statement ok
create secret my_secret with (
backend = 'meta'
) as 'hummockadmin';

statement ok
create connection my_conn
with (
type = 'iceberg',
warehouse.path = 's3://hummock001/iceberg_connection',
s3.access.key = secret my_secret,
s3.secret.key = secret my_secret,
s3.endpoint = 'http://127.0.0.1:9301',
s3.region = 'us-west-2',
catalog.type = 'storage',
);

statement ok
set iceberg_engine_connection = 'public.my_conn';

statement ok
create table t_refresh (id int primary key, name varchar, foo varchar) with (commit_checkpoint_interval = 10) engine = iceberg;

statement ok
insert into t_refresh values(1, 'xxx', 'a') , (2, 'yyy', 'b'), (3, 'zzz', 'c'), (4, 'www', 'd'), (5, 'zzz', 'c');

statement ok
flush;

statement ok
delete from t_refresh where id = 4;

statement ok
flush;

sleep 10s

query ?
select count(*) = 1 from rw_iceberg_files where source_name = '__iceberg_source_t_refresh' and content = 'PositionDeletes';
----
t


statement ok
create table iceberg_batch_table ( primary key (id) ) with (
connector = 'iceberg',
warehouse.path = 's3://hummock001/iceberg_connection',
s3.access.key = secret my_secret,
s3.secret.key = secret my_secret,
s3.endpoint = 'http://127.0.0.1:9301',
s3.region = 'us-west-2',
catalog.type = 'storage',
table.name = 't_refresh',
database.name = 'public',
refresh_mode = 'MANUAL_TRIGGER'
);


statement ok
refresh table iceberg_batch_table;

sleep 10s

query I
select id from t_refresh order by id;
----
1
2
3
5

query I
select id from iceberg_batch_table order by id;
----
1
2
3
5

# ==== end PositionDeletes

statement ok
delete from t_refresh where name = 'zzz' and foo = 'c';

statement ok
flush;

sleep 10s

query ?
select count(*) = 1 from rw_iceberg_files where source_name = '__iceberg_source_t_refresh' and content = 'EqualityDeletes';
----
t


statement ok
refresh table iceberg_batch_table;

sleep 10s

query I
select id from t_refresh order by id;
----
1
2


query I
select id from iceberg_batch_table order by id;
----
1
2


# Cleanup

statement ok
drop table iceberg_batch_table;

statement ok
drop table t_refresh;

statement ok
drop connection my_conn;

statement ok
drop secret my_secret;
3 changes: 3 additions & 0 deletions proto/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,9 @@ message Source {
// Used for secret connect options.
map<string, secret.SecretRef> secret_refs = 19;

// Source refresh mode
plan_common.SourceRefreshMode refresh_mode = 20;

// Per-source catalog version, used by schema change.
uint64 version = 100;

Expand Down
10 changes: 10 additions & 0 deletions proto/plan_common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -300,3 +300,13 @@ enum AdditionalColumnType {
ADDITIONAL_COLUMN_TYPE_NORMAL = 7;
ADDITIONAL_COLUMN_TYPE_PAYLOAD = 8;
}

message SourceRefreshMode {
message SourceRefreshModeStreaming {}
message SourceRefreshModeManualTrigger {}

oneof refresh_mode {
SourceRefreshModeStreaming streaming = 1;
SourceRefreshModeManualTrigger manual_trigger = 2;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we support auto refresh in the future, do we still use this SourceRefreshModeManualTrigger?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe full_recompute is more precise than manual_trigger

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done with renaming: MANUAL_TRIGGER -> FULL_RECOMPUTE

}
}
2 changes: 2 additions & 0 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,7 @@ message StreamSource {
map<string, secret.SecretRef> secret_refs = 10;
// Downstream columns are used by list node to know which columns are needed.
optional Columns downstream_columns = 11;
plan_common.SourceRefreshMode refresh_mode = 12;
}

// copy contents from StreamSource to prevent compatibility issues in the future
Expand All @@ -292,6 +293,7 @@ message StreamFsFetch {
// Source rate limit
optional uint32 rate_limit = 9;
map<string, secret.SecretRef> secret_refs = 10;
plan_common.SourceRefreshMode refresh_mode = 11;
}

// The executor only for receiving barrier from the meta service. It always resides in the leaves
Expand Down
6 changes: 4 additions & 2 deletions src/batch/executors/src/executor/iceberg_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ use risingwave_common::catalog::{
use risingwave_common::types::{DataType, ScalarImpl};
use risingwave_connector::WithOptionsSecResolved;
use risingwave_connector::source::iceberg::{
IcebergFileScanTask, IcebergProperties, IcebergScanOpts, IcebergSplit, scan_task_to_chunk,
IcebergFileScanTask, IcebergProperties, IcebergScanOpts, IcebergSplit,
scan_task_to_chunk_with_deletes,
};
use risingwave_connector::source::{ConnectorProperties, SplitImpl, SplitMetaData};
use risingwave_expr::expr::LiteralExpression;
Expand Down Expand Up @@ -109,13 +110,14 @@ impl IcebergScanExecutor {

for data_file_scan_task in data_file_scan_tasks {
#[for_await]
for chunk in scan_task_to_chunk(
for chunk in scan_task_to_chunk_with_deletes(
table.clone(),
data_file_scan_task,
IcebergScanOpts {
chunk_size: self.chunk_size,
need_seq_num: self.need_seq_num,
need_file_path_and_pos: self.need_file_path_and_pos,
handle_delete_files: false,
},
self.metrics.as_ref().map(|m| m.iceberg_scan_metrics()),
) {
Expand Down
Loading
Loading