Skip to content

Commit 16a7f3d

Browse files
author
tab
committed
feat(iceberg): enhance delete file handling in Iceberg scan and fetch executors
- Added support for handling delete files in `scan_task_to_chunk_with_deletes` for Iceberg data processing. - Introduced `handle_delete_files` option in `IcebergScanOpts` to control delete file processing. - Updated `BatchIcebergFetchExecutor` to enable delete file handling for streaming sources. - Maintained backward compatibility with a legacy scan function that delegates to the enhanced version.
1 parent 9093772 commit 16a7f3d

File tree

4 files changed

+219
-8
lines changed

4 files changed

+219
-8
lines changed

src/batch/executors/src/executor/iceberg_scan.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@ impl IcebergScanExecutor {
116116
chunk_size: self.chunk_size,
117117
need_seq_num: self.need_seq_num,
118118
need_file_path_and_pos: self.need_file_path_and_pos,
119+
handle_delete_files: false,
119120
},
120121
self.metrics.as_ref().map(|m| m.iceberg_scan_metrics()),
121122
) {

src/connector/src/source/iceberg/mod.rs

Lines changed: 215 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ mod metrics;
1818
use std::collections::{BinaryHeap, HashMap, HashSet};
1919
use std::sync::Arc;
2020

21-
use anyhow::anyhow;
21+
use anyhow::{Context, anyhow};
2222
use async_trait::async_trait;
2323
use futures::StreamExt;
2424
use futures_async_stream::{for_await, try_stream};
@@ -33,6 +33,7 @@ use phf::{Set, phf_set};
3333
use risingwave_common::array::arrow::IcebergArrowConvert;
3434
use risingwave_common::array::{ArrayImpl, DataChunk, I64Array, Utf8Array};
3535
use risingwave_common::bail;
36+
use risingwave_common::bitmap::Bitmap;
3637
use risingwave_common::catalog::{
3738
ICEBERG_FILE_PATH_COLUMN_NAME, ICEBERG_FILE_POS_COLUMN_NAME, ICEBERG_SEQUENCE_NUM_COLUMN_NAME,
3839
Schema,
@@ -630,23 +631,27 @@ pub struct IcebergScanOpts {
630631
pub chunk_size: usize,
631632
pub need_seq_num: bool,
632633
pub need_file_path_and_pos: bool,
634+
pub handle_delete_files: bool,
633635
}
634636

637+
/// Scan a data file and optionally apply delete files (both position delete and equality delete).
638+
/// This is the enhanced version that supports delete file processing.
635639
#[try_stream(ok = DataChunk, error = ConnectorError)]
636-
pub async fn scan_task_to_chunk(
640+
pub async fn scan_task_to_chunk_with_deletes(
637641
table: Table,
638642
data_file_scan_task: FileScanTask,
639643
IcebergScanOpts {
640644
chunk_size,
641645
need_seq_num,
642646
need_file_path_and_pos,
647+
handle_delete_files,
643648
}: IcebergScanOpts,
644649
metrics: Option<Arc<IcebergScanMetrics>>,
645650
) {
646651
let table_name = table.identifier().name().to_owned();
647652

648653
let mut read_bytes = scopeguard::guard(0, |read_bytes| {
649-
if let Some(metrics) = metrics {
654+
if let Some(metrics) = metrics.clone() {
650655
metrics
651656
.iceberg_read_bytes
652657
.with_guarded_label_values(&[&table_name])
@@ -657,6 +662,128 @@ pub async fn scan_task_to_chunk(
657662
let data_file_path = data_file_scan_task.data_file_path.clone();
658663
let data_sequence_number = data_file_scan_task.sequence_number;
659664

665+
// Extract delete files before moving data_file_scan_task (only if delete handling is enabled)
666+
let position_delete_tasks: Vec<_> = if handle_delete_files {
667+
data_file_scan_task
668+
.deletes
669+
.iter()
670+
.filter(|delete| delete.data_file_content == DataContentType::PositionDeletes)
671+
.cloned()
672+
.collect()
673+
} else {
674+
vec![]
675+
};
676+
677+
let equality_delete_tasks: Vec<_> = if handle_delete_files {
678+
data_file_scan_task
679+
.deletes
680+
.iter()
681+
.filter(|delete| delete.data_file_content == DataContentType::EqualityDeletes)
682+
.cloned()
683+
.collect()
684+
} else {
685+
vec![]
686+
};
687+
688+
// Read position delete files to build a set of positions to delete
689+
// Position delete format: (file_path: String, pos: i64)
690+
let mut position_deletes: HashMap<String, HashSet<i64>> = HashMap::new();
691+
692+
if !position_delete_tasks.is_empty() {
693+
for delete_task in position_delete_tasks {
694+
let delete_reader = table.reader_builder().with_batch_size(chunk_size).build();
695+
let delete_stream = tokio_stream::once(Ok((*delete_task).clone()));
696+
let mut delete_record_stream = delete_reader.read(Box::pin(delete_stream)).await?;
697+
698+
while let Some(record_batch) = delete_record_stream.next().await {
699+
let record_batch = record_batch?;
700+
701+
// Position delete files have schema: file_path (string), pos (long)
702+
// Extract file_path and pos columns
703+
if let Some(file_path_col) = record_batch.column_by_name("file_path") {
704+
if let Some(pos_col) = record_batch.column_by_name("pos") {
705+
use risingwave_common::array::arrow::arrow_array_iceberg::Array;
706+
707+
let file_paths = file_path_col
708+
.as_any()
709+
.downcast_ref::<risingwave_common::array::arrow::arrow_array_iceberg::StringArray>()
710+
.with_context(|| "file_path column is not StringArray")?;
711+
let positions = pos_col
712+
.as_any()
713+
.downcast_ref::<risingwave_common::array::arrow::arrow_array_iceberg::Int64Array>()
714+
.with_context(|| "pos column is not Int64Array")?;
715+
716+
for idx in 0..record_batch.num_rows() {
717+
if !file_paths.is_null(idx) && !positions.is_null(idx) {
718+
let file_path = file_paths.value(idx);
719+
let pos = positions.value(idx);
720+
position_deletes
721+
.entry(file_path.to_string())
722+
.or_insert_with(HashSet::new)
723+
.insert(pos);
724+
}
725+
}
726+
}
727+
}
728+
}
729+
}
730+
}
731+
732+
// Read equality delete files to build a set of rows to delete based on equality columns
733+
// Equality delete format: contains the equality columns specified in equality_ids
734+
// We need to extract equality_ids and build a hashset of delete keys
735+
let mut equality_deletes: Option<(Vec<String>, HashSet<Vec<String>>)> = None;
736+
737+
if !equality_delete_tasks.is_empty() {
738+
// Get equality_ids from the first delete task (they should all be the same)
739+
let equality_ids = equality_delete_tasks[0].equality_ids.clone();
740+
741+
// Get the schema and map field IDs to field names
742+
let schema = table.metadata().current_schema();
743+
let equality_field_names: Vec<String> = equality_ids
744+
.iter()
745+
.filter_map(|id| schema.name_by_field_id(*id).map(|s| s.to_owned()))
746+
.collect();
747+
748+
if !equality_field_names.is_empty() {
749+
let mut delete_key_set: HashSet<Vec<String>> = HashSet::new();
750+
751+
for delete_task in equality_delete_tasks {
752+
let delete_reader = table.reader_builder().with_batch_size(chunk_size).build();
753+
let delete_stream = tokio_stream::once(Ok((*delete_task).clone()));
754+
let mut delete_record_stream = delete_reader.read(Box::pin(delete_stream)).await?;
755+
756+
while let Some(record_batch) = delete_record_stream.next().await {
757+
let record_batch = record_batch?;
758+
let delete_chunk =
759+
IcebergArrowConvert.chunk_from_record_batch(&record_batch)?;
760+
761+
// Extract the equality columns and build keys
762+
for row_idx in 0..delete_chunk.capacity() {
763+
let mut key = Vec::with_capacity(equality_field_names.len());
764+
for field_name in &equality_field_names {
765+
// Find the column index for this field
766+
if let Some(col_idx) = record_batch
767+
.schema()
768+
.column_with_name(field_name)
769+
.map(|(idx, _)| idx)
770+
{
771+
let col = delete_chunk.column_at(col_idx);
772+
let datum = col.value_at(row_idx);
773+
// Convert datum to string for key comparison
774+
key.push(format!("{:?}", datum));
775+
}
776+
}
777+
delete_key_set.insert(key);
778+
}
779+
}
780+
}
781+
782+
equality_deletes = Some((equality_field_names, delete_key_set));
783+
}
784+
}
785+
786+
// Now read the data file
660787
let reader = table.reader_builder().with_batch_size(chunk_size).build();
661788
let file_scan_stream = tokio_stream::once(Ok(data_file_scan_task));
662789

@@ -667,6 +794,76 @@ pub async fn scan_task_to_chunk(
667794
let record_batch = record_batch?;
668795

669796
let mut chunk = IcebergArrowConvert.chunk_from_record_batch(&record_batch)?;
797+
798+
// Apply position deletes if any
799+
if !position_deletes.is_empty() {
800+
if let Some(deleted_positions) = position_deletes.get(&data_file_path) {
801+
let index_start = (index * chunk_size) as i64;
802+
let mut visibility = vec![true; chunk.capacity()];
803+
804+
for row_idx in 0..chunk.capacity() {
805+
let global_pos = index_start + row_idx as i64;
806+
if deleted_positions.contains(&global_pos) {
807+
visibility[row_idx] = false;
808+
}
809+
}
810+
811+
let (columns, _) = chunk.into_parts();
812+
let columns: Vec<_> = columns.into_iter().collect();
813+
chunk = DataChunk::from_parts(columns.into(), Bitmap::from_bool_slice(&visibility));
814+
}
815+
}
816+
817+
// Apply equality deletes if any
818+
// For equality deletes, we need to check if any row in the chunk matches
819+
// the delete predicates based on equality columns
820+
if let Some((ref equality_field_names, ref delete_key_set)) = equality_deletes {
821+
// Get the schema for the current record batch to map field names to column indices
822+
let data_schema = record_batch.schema();
823+
824+
// Build a mapping from equality field names to column indices in the data
825+
let equality_col_indices: Vec<usize> = equality_field_names
826+
.iter()
827+
.filter_map(|field_name| {
828+
data_schema.column_with_name(field_name).map(|(idx, _)| idx)
829+
})
830+
.collect();
831+
832+
// Only apply equality deletes if we found all the equality columns
833+
if equality_col_indices.len() == equality_field_names.len() {
834+
let (columns, visibility) = chunk.into_parts();
835+
let mut new_visibility_vec = vec![true; columns[0].len()];
836+
837+
// For each row in the chunk, build the key and check if it's in the delete set
838+
for row_idx in 0..columns[0].len() {
839+
// Only check rows that are currently visible
840+
if visibility.is_set(row_idx) {
841+
let mut row_key = Vec::with_capacity(equality_field_names.len());
842+
843+
// Build the key for this row using equality columns
844+
for &col_idx in &equality_col_indices {
845+
let datum = columns[col_idx].value_at(row_idx);
846+
row_key.push(format!("{:?}", datum));
847+
}
848+
849+
// If this row's key is in the delete set, mark it as invisible
850+
if delete_key_set.contains(&row_key) {
851+
new_visibility_vec[row_idx] = false;
852+
}
853+
} else {
854+
// If already invisible, keep it invisible
855+
new_visibility_vec[row_idx] = false;
856+
}
857+
}
858+
859+
let columns: Vec<_> = columns.into_iter().collect();
860+
chunk = DataChunk::from_parts(
861+
columns.into(),
862+
Bitmap::from_bool_slice(&new_visibility_vec),
863+
);
864+
}
865+
}
866+
670867
if need_seq_num {
671868
let (mut columns, visibility) = chunk.into_parts();
672869
columns.push(Arc::new(ArrayImpl::Int64(I64Array::from_iter(
@@ -690,6 +887,21 @@ pub async fn scan_task_to_chunk(
690887
}
691888
}
692889

890+
/// Legacy scan function that doesn't process delete files.
891+
/// Kept for backward compatibility. Delegates to scan_task_to_chunk_with_deletes.
892+
#[try_stream(ok = DataChunk, error = ConnectorError)]
893+
pub async fn scan_task_to_chunk(
894+
table: Table,
895+
data_file_scan_task: FileScanTask,
896+
opts: IcebergScanOpts,
897+
metrics: Option<Arc<IcebergScanMetrics>>,
898+
) {
899+
#[for_await]
900+
for chunk in scan_task_to_chunk_with_deletes(table, data_file_scan_task, opts, metrics) {
901+
yield chunk?;
902+
}
903+
}
904+
693905
#[derive(Debug)]
694906
pub struct IcebergFileReader {}
695907

src/stream/src/executor/source/batch_source/batch_iceberg_fetch.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,7 @@ use tokio::sync::mpsc::UnboundedReceiver;
3737

3838
use crate::executor::prelude::*;
3939
use crate::executor::source::{
40-
ChunksWithState, PersistedFileScanTask, StreamSourceCore, barrier_to_message_stream,
41-
prune_additional_cols,
40+
ChunksWithState, PersistedFileScanTask, StreamSourceCore, prune_additional_cols,
4241
};
4342
use crate::executor::stream_reader::StreamReaderWithPause;
4443
use crate::task::LocalBarrierManager;
@@ -122,10 +121,7 @@ impl<S: StateStore> BatchIcebergFetchExecutor<S> {
122121
match msg {
123122
Err(e) => {
124123
tracing::error!(error = %e.as_report(), "Fetch Error");
125-
splits_on_fetch = 0;
126124
file_queue.clear();
127-
is_refreshing = false;
128-
is_list_finished = false;
129125
*is_load_finished.write() = false;
130126
return Err(e);
131127
}
@@ -343,6 +339,7 @@ impl<S: StateStore> BatchIcebergFetchExecutor<S> {
343339
chunk_size: streaming_config.developer.chunk_size,
344340
need_seq_num: true, /* Although this column is unnecessary, we still keep it for potential usage in the future */
345341
need_file_path_and_pos: true,
342+
handle_delete_files: true, // Enable delete file handling for streaming source
346343
},
347344
None,
348345
) {

src/stream/src/executor/source/iceberg_fetch_executor.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -364,6 +364,7 @@ impl<S: StateStore> IcebergFetchExecutor<S> {
364364
chunk_size: streaming_config.developer.chunk_size,
365365
need_seq_num: true, /* Although this column is unnecessary, we still keep it for potential usage in the future */
366366
need_file_path_and_pos: true,
367+
handle_delete_files: false,
367368
},
368369
None,
369370
) {

0 commit comments

Comments
 (0)