-
Notifications
You must be signed in to change notification settings - Fork 703
feat(iceberg): support refreshable batch iceberg table #23527
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
tabVersion
wants to merge
30
commits into
main
Choose a base branch
from
tab/batch-iceberg-3
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+1,797
−296
Open
Changes from 17 commits
Commits
Show all changes
30 commits
Select commit
Hold shift + click to select a range
7632807
feat(iceberg): add BatchIcebergListExecutor and BatchIcebergFetchExec…
cf1ad3f
feat(iceberg): implement execute and debug traits for BatchIcebergLis…
03ae5de
feat: add SourceRefreshMode to catalog and stream plan
985d923
feat(iceberg): integrate metrics into BatchIcebergListExecutor
6d9f9e1
feat: enhance StreamFsFetch with refresh mode support
9093772
feat(iceberg): enhance BatchIcebergFetchExecutor with new functionality
16a7f3d
feat(iceberg): enhance delete file handling in Iceberg scan and fetch…
7646c6a
stash
c31e351
fix: improve error handling and logging in Iceberg source executors
12508fb
Merge branch 'main' into tab/batch-iceberg-3
tabVersion e8d2a11
feat(iceberg): integrate refresh mode handling in table creation
775acd1
feat(iceberg): enhance delete file processing in Iceberg scan and fet…
ce8ae20
refactor(iceberg): change logging level to debug for delete file proc…
07cd958
Merge branch 'main' into tab/batch-iceberg-3
tabVersion 1cf1894
Merge branch 'main' into tab/batch-iceberg-3
tabVersion 2cad418
refactor(iceberg): replace legacy scan function with new implementati…
db95e26
test(iceberg): add end-to-end test for refreshing Iceberg tables
04cade2
Merge remote-tracking branch 'origin' into tab/batch-iceberg-3
6da13ad
refactor(iceberg): update delete handling logic and improve type safety
4f5b6c5
refactor(iceberg): enhance delete handling and improve logging
7e6037b
Merge remote-tracking branch 'origin' into tab/batch-iceberg-3
afa89b0
refactor(iceberg): improve equality delete handling and type safety
68adb94
refactor(iceberg): simplify delete handling in scan task
816ec34
feat: implement refresh progress tracking for refresh table (#23671)
tabVersion cf28501
Merge remote-tracking branch 'origin' into tab/batch-iceberg-3
96b7f2e
refactor: simplify command scheduling and ID handling in barrier and …
08b5418
refactor: rename manual trigger refresh mode to full recompute in sou…
0855532
feat: add invalid refresh mode option and update refresh mode to full…
f704c26
feat: enhance dropped table handling by cleaning up refresh progress …
046e78a
Revert "feat: enhance dropped table handling by cleaning up refresh p…
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
129 changes: 129 additions & 0 deletions
129
e2e_test/iceberg/test_case/pure_slt/refresh_iceberg_table.slt
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,129 @@ | ||
| control substitution on | ||
|
|
||
| statement ok | ||
| create secret my_secret with ( | ||
| backend = 'meta' | ||
| ) as 'hummockadmin'; | ||
|
|
||
| statement ok | ||
| create connection my_conn | ||
| with ( | ||
| type = 'iceberg', | ||
| warehouse.path = 's3://hummock001/iceberg_connection', | ||
| s3.access.key = secret my_secret, | ||
| s3.secret.key = secret my_secret, | ||
| s3.endpoint = 'http://127.0.0.1:9301', | ||
| s3.region = 'us-west-2', | ||
| catalog.type = 'storage', | ||
| ); | ||
|
|
||
| statement ok | ||
| set iceberg_engine_connection = 'public.my_conn'; | ||
|
|
||
| statement ok | ||
| create table t_refresh (id int primary key, name varchar, foo varchar) with (commit_checkpoint_interval = 10) engine = iceberg; | ||
|
|
||
| statement ok | ||
| insert into t_refresh values(1, 'xxx', 'a') , (2, 'yyy', 'b'), (3, 'zzz', 'c'), (4, 'www', 'd'), (5, 'zzz', 'c'); | ||
|
|
||
| statement ok | ||
| flush; | ||
|
|
||
| statement ok | ||
| delete from t_refresh where id = 4; | ||
|
|
||
| statement ok | ||
| flush; | ||
|
|
||
| sleep 10s | ||
|
|
||
| query ? | ||
| select count(*) = 1 from rw_iceberg_files where source_name = '__iceberg_source_t_refresh' and content = 'PositionDeletes'; | ||
| ---- | ||
| t | ||
|
|
||
|
|
||
| statement ok | ||
| create table iceberg_batch_table ( primary key (id) ) with ( | ||
| connector = 'iceberg', | ||
| warehouse.path = 's3://hummock001/iceberg_connection', | ||
| s3.access.key = secret my_secret, | ||
| s3.secret.key = secret my_secret, | ||
| s3.endpoint = 'http://127.0.0.1:9301', | ||
| s3.region = 'us-west-2', | ||
| catalog.type = 'storage', | ||
| table.name = 't_refresh', | ||
| database.name = 'public', | ||
| refresh_mode = 'MANUAL_TRIGGER' | ||
| ); | ||
|
|
||
|
|
||
| statement ok | ||
| refresh table iceberg_batch_table; | ||
|
|
||
| sleep 10s | ||
|
|
||
| query I | ||
| select id from t_refresh order by id; | ||
| ---- | ||
| 1 | ||
| 2 | ||
| 3 | ||
| 5 | ||
|
|
||
| query I | ||
| select id from iceberg_batch_table order by id; | ||
| ---- | ||
| 1 | ||
| 2 | ||
| 3 | ||
| 5 | ||
|
|
||
| # ==== end PositionDeletes | ||
|
|
||
| statement ok | ||
| delete from t_refresh where name = 'zzz' and foo = 'c'; | ||
|
|
||
| statement ok | ||
| flush; | ||
|
|
||
| sleep 10s | ||
|
|
||
| query ? | ||
| select count(*) = 1 from rw_iceberg_files where source_name = '__iceberg_source_t_refresh' and content = 'EqualityDeletes'; | ||
| ---- | ||
| t | ||
|
|
||
|
|
||
| statement ok | ||
| refresh table iceberg_batch_table; | ||
|
|
||
| sleep 10s | ||
|
|
||
| query I | ||
| select id from t_refresh order by id; | ||
| ---- | ||
| 1 | ||
| 2 | ||
|
|
||
|
|
||
| query I | ||
| select id from iceberg_batch_table order by id; | ||
| ---- | ||
| 1 | ||
| 2 | ||
|
|
||
|
|
||
| # Cleanup | ||
|
|
||
| statement ok | ||
| drop table iceberg_batch_table; | ||
|
|
||
| statement ok | ||
| drop table t_refresh; | ||
|
|
||
| statement ok | ||
| drop connection my_conn; | ||
|
|
||
| statement ok | ||
| drop secret my_secret; |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we support auto refresh in the future, do we still use this
SourceRefreshModeManualTrigger?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe
full_recomputeis more precise thanmanual_triggerThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done with renaming: MANUAL_TRIGGER -> FULL_RECOMPUTE