diff --git a/e2e_test/iceberg/test_case/pure_slt/refresh_iceberg_table.slt b/e2e_test/iceberg/test_case/pure_slt/refresh_iceberg_table.slt new file mode 100644 index 0000000000000..50e41f06f6b18 --- /dev/null +++ b/e2e_test/iceberg/test_case/pure_slt/refresh_iceberg_table.slt @@ -0,0 +1,143 @@ +control substitution on + +statement ok +create secret my_secret with ( + backend = 'meta' +) as 'hummockadmin'; + +statement ok +create connection my_conn +with ( + type = 'iceberg', + warehouse.path = 's3://hummock001/iceberg_connection', + s3.access.key = secret my_secret, + s3.secret.key = secret my_secret, + s3.endpoint = 'http://127.0.0.1:9301', + s3.region = 'us-west-2', + catalog.type = 'storage', +); + +statement ok +set iceberg_engine_connection = 'public.my_conn'; + +statement ok +create table t_refresh (id int primary key, name varchar, foo varchar) with (commit_checkpoint_interval = 10) engine = iceberg; + +statement ok +insert into t_refresh values(1, 'xxx', 'a') , (2, 'yyy', 'b'), (3, 'zzz', 'c'), (4, 'www', 'd'), (5, 'zzz', 'c'); + +statement ok +flush; + +statement ok +delete from t_refresh where id = 4; + +statement ok +flush; + +sleep 10s + +query ? +select count(*) = 1 from rw_iceberg_files where source_name = '__iceberg_source_t_refresh' and content = 'PositionDeletes'; +---- +t + + +statement error Invalid key +create table iceberg_batch_table ( primary key (id) ) with ( + connector = 'iceberg', + warehouse.path = 's3://hummock001/iceberg_connection', + s3.access.key = secret my_secret, + s3.secret.key = secret my_secret, + s3.endpoint = 'http://127.0.0.1:9301', + s3.region = 'us-west-2', + catalog.type = 'storage', + table.name = 't_refresh', + database.name = 'public', + refresh_mode = 'invalid_option' +); + +statement ok +create table iceberg_batch_table ( primary key (id) ) with ( + connector = 'iceberg', + warehouse.path = 's3://hummock001/iceberg_connection', + s3.access.key = secret my_secret, + s3.secret.key = secret my_secret, + s3.endpoint = 'http://127.0.0.1:9301', + s3.region = 'us-west-2', + catalog.type = 'storage', + table.name = 't_refresh', + database.name = 'public', + refresh_mode = 'FULL_RECOMPUTE' +); + + +statement ok +refresh table iceberg_batch_table; + +sleep 10s + +query I +select id from t_refresh order by id; +---- +1 +2 +3 +5 + +query I +select id from iceberg_batch_table order by id; +---- +1 +2 +3 +5 + +# ==== end PositionDeletes + +statement ok +delete from t_refresh where name = 'zzz' and foo = 'c'; + +statement ok +flush; + +sleep 10s + +query ? +select count(*) = 1 from rw_iceberg_files where source_name = '__iceberg_source_t_refresh' and content = 'EqualityDeletes'; +---- +t + + +statement ok +refresh table iceberg_batch_table; + +sleep 10s + +query I +select id from t_refresh order by id; +---- +1 +2 + + +query I +select id from iceberg_batch_table order by id; +---- +1 +2 + + +# Cleanup + +statement ok +drop table iceberg_batch_table; + +statement ok +drop table t_refresh; + +statement ok +drop connection my_conn; + +statement ok +drop secret my_secret; diff --git a/proto/catalog.proto b/proto/catalog.proto index 21d2358e491f1..398a3bc9c990e 100644 --- a/proto/catalog.proto +++ b/proto/catalog.proto @@ -149,6 +149,9 @@ message Source { // Used for secret connect options. map secret_refs = 19; + // Source refresh mode + plan_common.SourceRefreshMode refresh_mode = 20; + // Per-source catalog version, used by schema change. uint64 version = 100; diff --git a/proto/plan_common.proto b/proto/plan_common.proto index 7bf942ea0551c..69677217f642e 100644 --- a/proto/plan_common.proto +++ b/proto/plan_common.proto @@ -304,3 +304,13 @@ enum AdditionalColumnType { ADDITIONAL_COLUMN_TYPE_PAYLOAD = 8; ADDITIONAL_COLUMN_TYPE_PULSAR_MESSAGE_ID_DATA = 9; // for pulsar source, used for ack message } + +message SourceRefreshMode { + message SourceRefreshModeStreaming {} + message SourceRefreshModeFullRecompute {} + + oneof refresh_mode { + SourceRefreshModeStreaming streaming = 1; + SourceRefreshModeFullRecompute full_recompute = 2; + } +} diff --git a/proto/stream_plan.proto b/proto/stream_plan.proto index 7ec4d81727f3d..6d5ef4bcbcd45 100644 --- a/proto/stream_plan.proto +++ b/proto/stream_plan.proto @@ -276,6 +276,8 @@ message StreamSource { map secret_refs = 10; // Downstream columns are used by list node to know which columns are needed. optional Columns downstream_columns = 11; + plan_common.SourceRefreshMode refresh_mode = 12; + optional uint32 associated_table_id = 13; } // copy contents from StreamSource to prevent compatibility issues in the future @@ -292,6 +294,8 @@ message StreamFsFetch { // Source rate limit optional uint32 rate_limit = 9; map secret_refs = 10; + plan_common.SourceRefreshMode refresh_mode = 11; + optional uint32 associated_table_id = 12; } // The executor only for receiving barrier from the meta service. It always resides in the leaves diff --git a/proto/stream_service.proto b/proto/stream_service.proto index 52874ff805da9..ae8e3d4391017 100644 --- a/proto/stream_service.proto +++ b/proto/stream_service.proto @@ -84,7 +84,15 @@ message BarrierCompleteResponse { // Used for refreshable batch source. // SourceExecutor reports the source load is finished, and then // meta will issue a LoadFinish barrier to notify MaterializeExecutor to start diff calculation. - repeated uint32 load_finished_source_ids = 11; + message LoadFinishedSource { + // The actor that reported the completion event. + uint32 reporter_actor_id = 1; + // The table ID for the refreshable batch source. + uint32 table_id = 2; + // The source identifier associated with the finished load. + uint32 associated_source_id = 3; + } + repeated LoadFinishedSource load_finished_sources = 11; map vector_index_adds = 12; repeated CdcTableBackfillProgress cdc_table_backfill_progress = 13; // Used for truncating tables in storage layer. @@ -97,7 +105,15 @@ message BarrierCompleteResponse { repeated uint32 refresh_finished_tables = 15; // SourceExecutor reports the source list is finished, and then // meta will issue a ListFinish barrier to notify the source to start loading. - repeated uint32 list_finished_source_ids = 16; + message ListFinishedSource { + // The actor that reported the completion event. + uint32 reporter_actor_id = 1; + // The table ID for the refreshable batch source. + uint32 table_id = 2; + // The source identifier associated with the completed listing. + uint32 associated_source_id = 3; + } + repeated ListFinishedSource list_finished_sources = 16; } message StreamingControlStreamRequest { diff --git a/src/batch/executors/src/executor/iceberg_scan.rs b/src/batch/executors/src/executor/iceberg_scan.rs index 2d03a6e204ef1..4de36fa4de160 100644 --- a/src/batch/executors/src/executor/iceberg_scan.rs +++ b/src/batch/executors/src/executor/iceberg_scan.rs @@ -22,7 +22,8 @@ use risingwave_common::catalog::{ use risingwave_common::types::{DataType, ScalarImpl}; use risingwave_connector::WithOptionsSecResolved; use risingwave_connector::source::iceberg::{ - IcebergFileScanTask, IcebergProperties, IcebergScanOpts, IcebergSplit, scan_task_to_chunk, + IcebergFileScanTask, IcebergProperties, IcebergScanOpts, IcebergSplit, + scan_task_to_chunk_with_deletes, }; use risingwave_connector::source::{ConnectorProperties, SplitImpl, SplitMetaData}; use risingwave_expr::expr::LiteralExpression; @@ -109,13 +110,14 @@ impl IcebergScanExecutor { for data_file_scan_task in data_file_scan_tasks { #[for_await] - for chunk in scan_task_to_chunk( + for chunk in scan_task_to_chunk_with_deletes( table.clone(), data_file_scan_task, IcebergScanOpts { chunk_size: self.chunk_size, need_seq_num: self.need_seq_num, need_file_path_and_pos: self.need_file_path_and_pos, + handle_delete_files: false, }, self.metrics.as_ref().map(|m| m.iceberg_scan_metrics()), ) { diff --git a/src/connector/src/source/iceberg/mod.rs b/src/connector/src/source/iceberg/mod.rs index cbe58537926e2..a1fd17b32cd03 100644 --- a/src/connector/src/source/iceberg/mod.rs +++ b/src/connector/src/source/iceberg/mod.rs @@ -15,10 +15,10 @@ pub mod parquet_file_handler; mod metrics; -use std::collections::{BinaryHeap, HashMap, HashSet}; +use std::collections::{BTreeSet, BinaryHeap, HashMap, HashSet}; use std::sync::Arc; -use anyhow::anyhow; +use anyhow::{Context, anyhow}; use async_trait::async_trait; use futures::StreamExt; use futures_async_stream::{for_await, try_stream}; @@ -31,13 +31,15 @@ use itertools::Itertools; pub use parquet_file_handler::*; use phf::{Set, phf_set}; use risingwave_common::array::arrow::IcebergArrowConvert; +use risingwave_common::array::arrow::arrow_array_iceberg::Array; use risingwave_common::array::{ArrayImpl, DataChunk, I64Array, Utf8Array}; use risingwave_common::bail; +use risingwave_common::bitmap::Bitmap; use risingwave_common::catalog::{ ICEBERG_FILE_PATH_COLUMN_NAME, ICEBERG_FILE_POS_COLUMN_NAME, ICEBERG_SEQUENCE_NUM_COLUMN_NAME, Schema, }; -use risingwave_common::types::JsonbVal; +use risingwave_common::types::{Datum, DatumRef, JsonbVal, ToDatumRef, ToOwnedDatum}; use risingwave_common::util::iter_util::ZipEqFast; use risingwave_common_estimate_size::EstimateSize; use risingwave_pb::batch_plan::iceberg_scan_node::IcebergScanType; @@ -407,9 +409,11 @@ impl IcebergSplitEnumerator { #[for_await] for task in file_scan_stream { - let mut task: FileScanTask = task.map_err(|e| anyhow!(e))?; - for delete_file in task.deletes.drain(..) { - let mut delete_file = delete_file.as_ref().clone(); + let task: FileScanTask = task.map_err(|e| anyhow!(e))?; + + // Collect delete files for separate scan types, but keep task.deletes intact + for delete_file in &task.deletes { + let delete_file = delete_file.as_ref().clone(); match delete_file.data_file_content { iceberg::spec::DataContentType::Data => { bail!("Data file should not in task deletes"); @@ -420,6 +424,7 @@ impl IcebergSplitEnumerator { } } iceberg::spec::DataContentType::PositionDeletes => { + let mut delete_file = delete_file; if position_delete_files_set.insert(delete_file.data_file_path.clone()) { delete_file.project_field_ids = Vec::default(); position_delete_files.push(delete_file); @@ -427,8 +432,10 @@ impl IcebergSplitEnumerator { } } } + match task.data_file_content { iceberg::spec::DataContentType::Data => { + // Keep the original task with its deletes field intact data_files.push(task); } iceberg::spec::DataContentType::EqualityDeletes => { @@ -635,23 +642,29 @@ pub struct IcebergScanOpts { pub chunk_size: usize, pub need_seq_num: bool, pub need_file_path_and_pos: bool, + pub handle_delete_files: bool, } +type EqualityDeleteEntries = Vec<(Vec, HashSet>)>; + +/// Scan a data file and optionally apply delete files (both position delete and equality delete). +/// This implementation follows the iceberg-rust `process_file_scan_task` logic for proper delete handling. #[try_stream(ok = DataChunk, error = ConnectorError)] -pub async fn scan_task_to_chunk( +pub async fn scan_task_to_chunk_with_deletes( table: Table, data_file_scan_task: FileScanTask, IcebergScanOpts { chunk_size, need_seq_num, need_file_path_and_pos, + handle_delete_files, }: IcebergScanOpts, metrics: Option>, ) { let table_name = table.identifier().name().to_owned(); let mut read_bytes = scopeguard::guard(0, |read_bytes| { - if let Some(metrics) = metrics { + if let Some(metrics) = metrics.clone() { metrics .iceberg_read_bytes .with_guarded_label_values(&[&table_name]) @@ -662,34 +675,319 @@ pub async fn scan_task_to_chunk( let data_file_path = data_file_scan_task.data_file_path.clone(); let data_sequence_number = data_file_scan_task.sequence_number; + tracing::debug!( + "scan_task_to_chunk_with_deletes: data_file={}, handle_delete_files={}, total_delete_files={}", + data_file_path, + handle_delete_files, + data_file_scan_task.deletes.len() + ); + + // Step 1: Load and process delete files + // We build both position deletes (sorted set of row positions) and equality deletes (hash set of row keys) + + // Build position delete vector - using BTreeSet for sorted storage (similar to DeleteVector in iceberg-rust) + let position_delete_set: BTreeSet = if handle_delete_files { + let mut deletes = BTreeSet::new(); + + // Filter position delete tasks for this specific data file + let position_delete_tasks: Vec<_> = data_file_scan_task + .deletes + .iter() + .filter(|delete| delete.data_file_content == DataContentType::PositionDeletes) + .cloned() + .collect(); + + tracing::debug!( + "Processing position deletes for data file: {}, found {} position delete tasks", + data_file_path, + position_delete_tasks.len() + ); + + for delete_task in position_delete_tasks { + let delete_task_file_path = delete_task.data_file_path.clone(); + + let delete_reader = table.reader_builder().with_batch_size(chunk_size).build(); + // Clone the FileScanTask (not Arc) to create a proper stream + let task_clone: FileScanTask = (*delete_task).clone(); + let delete_stream = tokio_stream::once(Ok(task_clone)); + let mut delete_record_stream = delete_reader.read(Box::pin(delete_stream)).await?; + + while let Some(record_batch) = delete_record_stream.next().await { + let record_batch = record_batch?; + + // Position delete files have schema: file_path (string), pos (long) + if let Some(file_path_col) = record_batch.column_by_name("file_path") + && let Some(pos_col) = record_batch.column_by_name("pos") + { + let file_paths = file_path_col + .as_any() + .downcast_ref::() + .with_context(|| "file_path column is not StringArray")?; + let positions = pos_col + .as_any() + .downcast_ref::() + .with_context(|| "pos column is not Int64Array")?; + + // Only include positions that match the current data file + for idx in 0..record_batch.num_rows() { + if !file_paths.is_null(idx) && !positions.is_null(idx) { + let file_path = file_paths.value(idx); + let pos = positions.value(idx); + + if file_path == data_file_path { + deletes.insert(pos); + } + } else { + tracing::warn!( + "Position delete file {} at line {}: file_path or pos is null", + delete_task_file_path, + idx + ); + } + } + } + } + } + + tracing::debug!( + "Built position delete set for data file {}: {:?}", + data_file_path, + deletes + ); + + deletes + } else { + BTreeSet::new() + }; + + // Build equality delete predicates + let equality_deletes: Option = if handle_delete_files { + let equality_delete_tasks: Vec<_> = data_file_scan_task + .deletes + .iter() + .filter(|delete| delete.data_file_content == DataContentType::EqualityDeletes) + .cloned() + .collect(); + + if !equality_delete_tasks.is_empty() { + let mut delete_key_map: HashMap, HashSet>> = HashMap::new(); + + for delete_task in equality_delete_tasks { + let equality_ids = delete_task.equality_ids.clone(); + + if equality_ids.is_empty() { + continue; + } + + let delete_schema = delete_task.schema(); + let delete_name_vec = equality_ids + .iter() + .filter_map(|id| delete_schema.name_by_field_id(*id)) + .map(|s| s.to_owned()) + .collect_vec(); + + if delete_name_vec.len() != equality_ids.len() { + tracing::warn!( + "Skip equality delete task due to missing column mappings: expected {} names, got {}", + equality_ids.len(), + delete_name_vec.len() + ); + continue; + } + + let delete_reader = table.reader_builder().with_batch_size(chunk_size).build(); + // Clone the FileScanTask (not Arc) to create a proper stream + let task_clone: FileScanTask = delete_task.as_ref().clone(); + let delete_stream = tokio_stream::once(Ok(task_clone)); + let mut delete_record_stream = delete_reader.read(Box::pin(delete_stream)).await?; + + let mut task_delete_key_set: HashSet> = HashSet::new(); + + while let Some(record_batch) = delete_record_stream.next().await { + let record_batch = record_batch?; + + let delete_chunk = + IcebergArrowConvert.chunk_from_record_batch(&record_batch)?; + + let field_indices = delete_name_vec + .iter() + .map(|field_name| { + record_batch + .schema() + .column_with_name(field_name) + .map(|(idx, _)| idx) + .unwrap() + }) + .collect_vec(); + + // Build delete keys from equality columns + for row_idx in 0..delete_chunk.capacity() { + let mut key = Vec::with_capacity(field_indices.len()); + for &col_idx in &field_indices { + let col = delete_chunk.column_at(col_idx); + key.push(col.value_at(row_idx).to_owned_datum()); + } + task_delete_key_set.insert(key); + } + } + + if !task_delete_key_set.is_empty() { + delete_key_map + .entry(delete_name_vec.clone()) + .or_default() + .extend(task_delete_key_set); + } + } + + if delete_key_map.is_empty() { + None + } else { + Some(delete_key_map.into_iter().collect()) + } + } else { + None + } + } else { + None + }; + + // Step 2: Read the data file let reader = table.reader_builder().with_batch_size(chunk_size).build(); let file_scan_stream = tokio_stream::once(Ok(data_file_scan_task)); - // FIXME: what if the start position is not 0? The logic for index seems not correct. let mut record_batch_stream = reader.read(Box::pin(file_scan_stream)).await?.enumerate(); - while let Some((index, record_batch)) = record_batch_stream.next().await { + // Step 3: Process each record batch and apply deletes + while let Some((batch_index, record_batch)) = record_batch_stream.next().await { let record_batch = record_batch?; + let batch_start_pos = (batch_index * chunk_size) as i64; + let batch_num_rows = record_batch.num_rows(); let mut chunk = IcebergArrowConvert.chunk_from_record_batch(&record_batch)?; + + // Apply position deletes using efficient range-based filtering + // Build visibility bitmap based on position deletes + let mut visibility = vec![true; batch_num_rows]; + + if !position_delete_set.is_empty() { + let batch_end_pos = batch_start_pos + batch_num_rows as i64; + + tracing::debug!( + "Applying position deletes to batch {}: range [{}, {}), total delete set size: {}", + batch_index, + batch_start_pos, + batch_end_pos, + position_delete_set.len() + ); + + let mut position_deleted_count = 0; + + // Use BTreeSet's range query for efficient filtering + // Only check positions that fall within this batch's range + for &deleted_pos in position_delete_set.range(batch_start_pos..batch_end_pos) { + let local_idx = (deleted_pos - batch_start_pos) as usize; + if local_idx < batch_num_rows { + visibility[local_idx] = false; + position_deleted_count += 1; + } + } + + tracing::debug!( + "Position delete results for batch {}: deleted {} rows", + batch_index, + position_deleted_count + ); + } + + // Apply equality deletes + if let Some(ref equality_delete_entries) = equality_deletes { + let (columns, _) = chunk.into_parts(); + + let delete_entries_info: Vec<(Vec, HashSet>>)> = + equality_delete_entries + .iter() + .map(|(delete_name_vec, delete_key_set)| { + let indices = delete_name_vec + .iter() + .map(|field_name| { + record_batch + .schema() + .column_with_name(field_name) + .map(|(idx, _)| idx) + .unwrap() + }) + .collect_vec(); + let delete_key_ref_set: HashSet>> = delete_key_set + .iter() + .map(|datum_vec| { + datum_vec.iter().map(|datum| datum.to_datum_ref()).collect() + }) + .collect(); + (indices, delete_key_ref_set) + }) + .collect::>(); + + let mut deleted_count = 0; + + // Check each row against the delete sets built per equality delete task + for (row_idx, item) in visibility.iter_mut().enumerate().take(batch_num_rows) { + if !*item { + continue; + } + + for (field_indices, delete_key_set) in &delete_entries_info { + let mut row_key = Vec::with_capacity(field_indices.len()); + for &col_idx in field_indices { + let datum = columns[col_idx].value_at(row_idx); + row_key.push(datum); + } + + if delete_key_set.contains(&row_key) { + *item = false; + deleted_count += 1; + break; + } + } + } + + tracing::debug!( + "Equality delete results for batch {}: deleted {} rows", + batch_index, + deleted_count + ); + + let columns: Vec<_> = columns.into_iter().collect(); + chunk = DataChunk::from_parts(columns.into(), Bitmap::from_bool_slice(&visibility)); + } else { + // Only position deletes to apply + let (columns, _) = chunk.into_parts(); + let columns: Vec<_> = columns.into_iter().collect(); + chunk = DataChunk::from_parts(columns.into(), Bitmap::from_bool_slice(&visibility)); + } + + // Step 4: Add metadata columns if requested if need_seq_num { let (mut columns, visibility) = chunk.into_parts(); columns.push(Arc::new(ArrayImpl::Int64(I64Array::from_iter( vec![data_sequence_number; visibility.len()], )))); - chunk = DataChunk::from_parts(columns.into(), visibility) - }; + chunk = DataChunk::from_parts(columns.into(), visibility); + } + if need_file_path_and_pos { let (mut columns, visibility) = chunk.into_parts(); columns.push(Arc::new(ArrayImpl::Utf8(Utf8Array::from_iter( vec![data_file_path.as_str(); visibility.len()], )))); - let index_start = (index * chunk_size) as i64; - columns.push(Arc::new(ArrayImpl::Int64(I64Array::from_iter( - (index_start..(index_start + visibility.len() as i64)).collect::>(), - )))); - chunk = DataChunk::from_parts(columns.into(), visibility) + + // Generate position values for each row in the batch + let positions: Vec = + (batch_start_pos..(batch_start_pos + visibility.len() as i64)).collect(); + columns.push(Arc::new(ArrayImpl::Int64(I64Array::from_iter(positions)))); + + chunk = DataChunk::from_parts(columns.into(), visibility); } + *read_bytes += chunk.estimated_heap_size() as u64; yield chunk; } diff --git a/src/frontend/src/catalog/source_catalog.rs b/src/frontend/src/catalog/source_catalog.rs index 3a9b37025ef02..c233a4114553a 100644 --- a/src/frontend/src/catalog/source_catalog.rs +++ b/src/frontend/src/catalog/source_catalog.rs @@ -17,6 +17,7 @@ use risingwave_common::util::epoch::Epoch; use risingwave_connector::{WithOptionsSecResolved, WithPropertiesExt}; use risingwave_pb::catalog::source::OptionalAssociatedTableId; use risingwave_pb::catalog::{PbSource, StreamSourceInfo, WatermarkDesc}; +use risingwave_pb::plan_common::SourceRefreshMode; use risingwave_sqlparser::ast; use risingwave_sqlparser::parser::Parser; use thiserror_ext::AsReport as _; @@ -53,6 +54,7 @@ pub struct SourceCatalog { pub created_at_cluster_version: Option, pub initialized_at_cluster_version: Option, pub rate_limit: Option, + pub refresh_mode: Option, } impl SourceCatalog { @@ -94,6 +96,7 @@ impl SourceCatalog { initialized_at_cluster_version: self.initialized_at_cluster_version.clone(), secret_refs, rate_limit: self.rate_limit, + refresh_mode: self.refresh_mode, } } @@ -211,6 +214,7 @@ impl From<&PbSource> for SourceCatalog { created_at_cluster_version: prost.created_at_cluster_version.clone(), initialized_at_cluster_version: prost.initialized_at_cluster_version.clone(), rate_limit, + refresh_mode: prost.refresh_mode, } } } diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index 49d14f928c739..44051a521607f 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -70,7 +70,8 @@ use risingwave_connector::{AUTO_SCHEMA_CHANGE_KEY, WithPropertiesExt}; use risingwave_pb::catalog::connection_params::PbConnectionType; use risingwave_pb::catalog::{PbSchemaRegistryNameStrategy, StreamSourceInfo, WatermarkDesc}; use risingwave_pb::plan_common::additional_column::ColumnType as AdditionalColumnType; -use risingwave_pb::plan_common::{EncodeType, FormatType}; +use risingwave_pb::plan_common::source_refresh_mode::{RefreshMode, SourceRefreshModeStreaming}; +use risingwave_pb::plan_common::{EncodeType, FormatType, SourceRefreshMode}; use risingwave_pb::stream_plan::PbStreamFragmentGraph; use risingwave_pb::telemetry::TelemetryDatabaseObject; use risingwave_sqlparser::ast::{ @@ -101,7 +102,7 @@ use crate::session::SessionImpl; use crate::session::current::notice_to_user; use crate::utils::{ OverwriteOptions, resolve_connection_ref_and_secret_ref, resolve_privatelink_in_with_option, - resolve_secret_ref_in_with_options, + resolve_secret_ref_in_with_options, resolve_source_refresh_mode_in_with_option, }; use crate::{OptimizerContext, WithOptions, WithOptionsSecResolved, bind_data_type, build_graph}; @@ -726,9 +727,22 @@ pub fn bind_connector_props( handler_args: &HandlerArgs, format_encode: &FormatEncodeOptions, is_create_source: bool, -) -> Result { +) -> Result<(WithOptions, SourceRefreshMode)> { let mut with_properties = handler_args.with_options.clone().into_connector_props(); validate_compatibility(format_encode, &mut with_properties)?; + let refresh_mode = { + let refresh_mode = resolve_source_refresh_mode_in_with_option(&mut with_properties)?; + if is_create_source && refresh_mode.is_some() { + return Err(RwError::from(ProtocolError( + "`refresh_mode` only supported for CREATE TABLE".to_owned(), + ))); + } + + refresh_mode.unwrap_or(SourceRefreshMode { + refresh_mode: Some(RefreshMode::Streaming(SourceRefreshModeStreaming {})), + }) + }; + let create_cdc_source_job = with_properties.is_shareable_cdc_connector(); if !is_create_source && with_properties.is_shareable_only_cdc_connector() { @@ -777,7 +791,7 @@ pub fn bind_connector_props( .entry("server.id".to_owned()) .or_insert(rand::rng().random_range(1..u32::MAX).to_string()); } - Ok(with_properties) + Ok((with_properties, refresh_mode)) } /// When the schema can be inferred from external system (like schema registry), @@ -823,6 +837,7 @@ pub async fn bind_create_source_or_table_with_connector( create_source_type: CreateSourceType, source_rate_limit: Option, sql_column_strategy: SqlColumnStrategy, + refresh_mode: SourceRefreshMode, ) -> Result { let session = &handler_args.session; let db_name: &str = &session.database(); @@ -831,13 +846,6 @@ pub async fn bind_create_source_or_table_with_connector( session.get_database_and_schema_id_for_create(schema_name.clone())?; let is_create_source = create_source_type != CreateSourceType::Table; - if !is_create_source && with_properties.is_iceberg_connector() { - return Err(ErrorCode::BindError( - "can't CREATE TABLE with iceberg connector\n\nHint: use CREATE SOURCE instead" - .to_owned(), - ) - .into()); - } if is_create_source { // reject refreshable batch source @@ -1044,6 +1052,7 @@ HINT: use `CREATE SOURCE WITH (...)` instead of `CREATE SOURCE (