Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
7632807
feat(iceberg): add BatchIcebergListExecutor and BatchIcebergFetchExec…
Oct 21, 2025
cf1ad3f
feat(iceberg): implement execute and debug traits for BatchIcebergLis…
Oct 21, 2025
03ae5de
feat: add SourceRefreshMode to catalog and stream plan
Oct 22, 2025
985d923
feat(iceberg): integrate metrics into BatchIcebergListExecutor
Oct 22, 2025
6d9f9e1
feat: enhance StreamFsFetch with refresh mode support
Oct 22, 2025
9093772
feat(iceberg): enhance BatchIcebergFetchExecutor with new functionality
Oct 22, 2025
16a7f3d
feat(iceberg): enhance delete file handling in Iceberg scan and fetch…
Oct 23, 2025
7646c6a
stash
Oct 23, 2025
c31e351
fix: improve error handling and logging in Iceberg source executors
Oct 24, 2025
12508fb
Merge branch 'main' into tab/batch-iceberg-3
tabVersion Oct 24, 2025
e8d2a11
feat(iceberg): integrate refresh mode handling in table creation
Oct 24, 2025
775acd1
feat(iceberg): enhance delete file processing in Iceberg scan and fet…
Oct 26, 2025
ce8ae20
refactor(iceberg): change logging level to debug for delete file proc…
Oct 27, 2025
07cd958
Merge branch 'main' into tab/batch-iceberg-3
tabVersion Oct 27, 2025
1cf1894
Merge branch 'main' into tab/batch-iceberg-3
tabVersion Oct 28, 2025
2cad418
refactor(iceberg): replace legacy scan function with new implementati…
Oct 28, 2025
db95e26
test(iceberg): add end-to-end test for refreshing Iceberg tables
Oct 28, 2025
04cade2
Merge remote-tracking branch 'origin' into tab/batch-iceberg-3
Nov 3, 2025
6da13ad
refactor(iceberg): update delete handling logic and improve type safety
Nov 3, 2025
4f5b6c5
refactor(iceberg): enhance delete handling and improve logging
Nov 3, 2025
7e6037b
Merge remote-tracking branch 'origin' into tab/batch-iceberg-3
Nov 4, 2025
afa89b0
refactor(iceberg): improve equality delete handling and type safety
Nov 4, 2025
68adb94
refactor(iceberg): simplify delete handling in scan task
Nov 4, 2025
816ec34
feat: implement refresh progress tracking for refresh table (#23671)
tabVersion Nov 7, 2025
cf28501
Merge remote-tracking branch 'origin' into tab/batch-iceberg-3
Nov 7, 2025
96b7f2e
refactor: simplify command scheduling and ID handling in barrier and …
Nov 7, 2025
08b5418
refactor: rename manual trigger refresh mode to full recompute in sou…
Nov 7, 2025
0855532
feat: add invalid refresh mode option and update refresh mode to full…
Nov 7, 2025
f704c26
feat: enhance dropped table handling by cleaning up refresh progress …
Nov 7, 2025
046e78a
Revert "feat: enhance dropped table handling by cleaning up refresh p…
Nov 7, 2025
a296c81
fix syntax
Nov 8, 2025
0c28c6d
Merge remote-tracking branch 'origin' into tab/batch-iceberg-3
Nov 8, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 143 additions & 0 deletions e2e_test/iceberg/test_case/pure_slt/refresh_iceberg_table.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
control substitution on

statement ok
create secret my_secret with (
backend = 'meta'
) as 'hummockadmin';

statement ok
create connection my_conn
with (
type = 'iceberg',
warehouse.path = 's3://hummock001/iceberg_connection',
s3.access.key = secret my_secret,
s3.secret.key = secret my_secret,
s3.endpoint = 'http://127.0.0.1:9301',
s3.region = 'us-west-2',
catalog.type = 'storage',
);

statement ok
set iceberg_engine_connection = 'public.my_conn';

statement ok
create table t_refresh (id int primary key, name varchar, foo varchar) with (commit_checkpoint_interval = 10) engine = iceberg;

statement ok
insert into t_refresh values(1, 'xxx', 'a') , (2, 'yyy', 'b'), (3, 'zzz', 'c'), (4, 'www', 'd'), (5, 'zzz', 'c');

statement ok
flush;

statement ok
delete from t_refresh where id = 4;

statement ok
flush;

sleep 10s

query ?
select count(*) = 1 from rw_iceberg_files where source_name = '__iceberg_source_t_refresh' and content = 'PositionDeletes';
----
t


statement error Invalid key
create table iceberg_batch_table ( primary key (id) ) with (
connector = 'iceberg',
warehouse.path = 's3://hummock001/iceberg_connection',
s3.access.key = secret my_secret,
s3.secret.key = secret my_secret,
s3.endpoint = 'http://127.0.0.1:9301',
s3.region = 'us-west-2',
catalog.type = 'storage',
table.name = 't_refresh',
database.name = 'public',
refresh_mode = 'invalid_option'
);

statement ok
create table iceberg_batch_table ( primary key (id) ) with (
connector = 'iceberg',
warehouse.path = 's3://hummock001/iceberg_connection',
s3.access.key = secret my_secret,
s3.secret.key = secret my_secret,
s3.endpoint = 'http://127.0.0.1:9301',
s3.region = 'us-west-2',
catalog.type = 'storage',
table.name = 't_refresh',
database.name = 'public',
refresh_mode = 'FULL_RECOMPUTE'
);


statement ok
refresh table iceberg_batch_table;

sleep 10s

query I
select id from t_refresh order by id;
----
1
2
3
5

query I
select id from iceberg_batch_table order by id;
----
1
2
3
5

# ==== end PositionDeletes

statement ok
delete from t_refresh where name = 'zzz' and foo = 'c';

statement ok
flush;

sleep 10s

query ?
select count(*) = 1 from rw_iceberg_files where source_name = '__iceberg_source_t_refresh' and content = 'EqualityDeletes';
----
t


statement ok
refresh table iceberg_batch_table;

sleep 10s

query I
select id from t_refresh order by id;
----
1
2


query I
select id from iceberg_batch_table order by id;
----
1
2


# Cleanup

statement ok
drop table iceberg_batch_table;

statement ok
drop table t_refresh;

statement ok
drop connection my_conn;

statement ok
drop secret my_secret;
3 changes: 3 additions & 0 deletions proto/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,9 @@ message Source {
// Used for secret connect options.
map<string, secret.SecretRef> secret_refs = 19;

// Source refresh mode
plan_common.SourceRefreshMode refresh_mode = 20;

// Per-source catalog version, used by schema change.
uint64 version = 100;

Expand Down
10 changes: 10 additions & 0 deletions proto/plan_common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -304,3 +304,13 @@ enum AdditionalColumnType {
ADDITIONAL_COLUMN_TYPE_PAYLOAD = 8;
ADDITIONAL_COLUMN_TYPE_PULSAR_MESSAGE_ID_DATA = 9; // for pulsar source, used for ack message
}

message SourceRefreshMode {
message SourceRefreshModeStreaming {}
message SourceRefreshModeFullRecompute {}

oneof refresh_mode {
SourceRefreshModeStreaming streaming = 1;
SourceRefreshModeFullRecompute full_recompute = 2;
}
}
4 changes: 4 additions & 0 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,8 @@ message StreamSource {
map<string, secret.SecretRef> secret_refs = 10;
// Downstream columns are used by list node to know which columns are needed.
optional Columns downstream_columns = 11;
plan_common.SourceRefreshMode refresh_mode = 12;
optional uint32 associated_table_id = 13;
}

// copy contents from StreamSource to prevent compatibility issues in the future
Expand All @@ -292,6 +294,8 @@ message StreamFsFetch {
// Source rate limit
optional uint32 rate_limit = 9;
map<string, secret.SecretRef> secret_refs = 10;
plan_common.SourceRefreshMode refresh_mode = 11;
optional uint32 associated_table_id = 12;
}

// The executor only for receiving barrier from the meta service. It always resides in the leaves
Expand Down
20 changes: 18 additions & 2 deletions proto/stream_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,15 @@
// Used for refreshable batch source.
// SourceExecutor reports the source load is finished, and then
// meta will issue a LoadFinish barrier to notify MaterializeExecutor to start diff calculation.
repeated uint32 load_finished_source_ids = 11;
message LoadFinishedSource {
// The actor that reported the completion event.
uint32 reporter_actor_id = 1;
// The table ID for the refreshable batch source.
uint32 table_id = 2;
// The source identifier associated with the finished load.
uint32 associated_source_id = 3;
}
repeated LoadFinishedSource load_finished_sources = 11;

Check failure on line 95 in proto/stream_service.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Field "11" on message "BarrierCompleteResponse" changed name from "load_finished_source_ids" to "load_finished_sources".

Check failure on line 95 in proto/stream_service.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Field "11" with name "load_finished_sources" on message "BarrierCompleteResponse" changed type from "uint32" to "message". See https://developers.google.com/protocol-buffers/docs/proto3#updating for wire compatibility rules and https://developers.google.com/protocol-buffers/docs/proto3#json for JSON compatibility rules.

Check failure on line 95 in proto/stream_service.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Field "11" with name "load_finished_sources" on message "BarrierCompleteResponse" changed option "json_name" from "loadFinishedSourceIds" to "loadFinishedSources".
map<uint32, hummock.VectorIndexDelta.VectorIndexAdds> vector_index_adds = 12;
repeated CdcTableBackfillProgress cdc_table_backfill_progress = 13;
// Used for truncating tables in storage layer.
Expand All @@ -97,7 +105,15 @@
repeated uint32 refresh_finished_tables = 15;
// SourceExecutor reports the source list is finished, and then
// meta will issue a ListFinish barrier to notify the source to start loading.
repeated uint32 list_finished_source_ids = 16;
message ListFinishedSource {
// The actor that reported the completion event.
uint32 reporter_actor_id = 1;
// The table ID for the refreshable batch source.
uint32 table_id = 2;
// The source identifier associated with the completed listing.
uint32 associated_source_id = 3;
}
repeated ListFinishedSource list_finished_sources = 16;

Check failure on line 116 in proto/stream_service.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Field "16" on message "BarrierCompleteResponse" changed name from "list_finished_source_ids" to "list_finished_sources".

Check failure on line 116 in proto/stream_service.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Field "16" with name "list_finished_sources" on message "BarrierCompleteResponse" changed type from "uint32" to "message". See https://developers.google.com/protocol-buffers/docs/proto3#updating for wire compatibility rules and https://developers.google.com/protocol-buffers/docs/proto3#json for JSON compatibility rules.

Check failure on line 116 in proto/stream_service.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Field "16" with name "list_finished_sources" on message "BarrierCompleteResponse" changed option "json_name" from "listFinishedSourceIds" to "listFinishedSources".
}

message StreamingControlStreamRequest {
Expand Down
6 changes: 4 additions & 2 deletions src/batch/executors/src/executor/iceberg_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ use risingwave_common::catalog::{
use risingwave_common::types::{DataType, ScalarImpl};
use risingwave_connector::WithOptionsSecResolved;
use risingwave_connector::source::iceberg::{
IcebergFileScanTask, IcebergProperties, IcebergScanOpts, IcebergSplit, scan_task_to_chunk,
IcebergFileScanTask, IcebergProperties, IcebergScanOpts, IcebergSplit,
scan_task_to_chunk_with_deletes,
};
use risingwave_connector::source::{ConnectorProperties, SplitImpl, SplitMetaData};
use risingwave_expr::expr::LiteralExpression;
Expand Down Expand Up @@ -109,13 +110,14 @@ impl IcebergScanExecutor {

for data_file_scan_task in data_file_scan_tasks {
#[for_await]
for chunk in scan_task_to_chunk(
for chunk in scan_task_to_chunk_with_deletes(
table.clone(),
data_file_scan_task,
IcebergScanOpts {
chunk_size: self.chunk_size,
need_seq_num: self.need_seq_num,
need_file_path_and_pos: self.need_file_path_and_pos,
handle_delete_files: false,
},
self.metrics.as_ref().map(|m| m.iceberg_scan_metrics()),
) {
Expand Down
Loading
Loading