Skip to content

Commit da58b0a

Browse files
committed
refactor: Avoid overly complex itertools methods in log listing code
1 parent 95b41cf commit da58b0a

File tree

1 file changed

+75
-66
lines changed

1 file changed

+75
-66
lines changed

kernel/src/listed_log_files.rs

Lines changed: 75 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -87,12 +87,11 @@ fn list_log_files(
8787
// if the log_tail covers the entire requested range (i.e. starts at or before start_version),
8888
// we skip listing entirely. note that if we don't include this check, we will end up listing
8989
// and then just filtering out all the files we listed.
90-
let listed_files = log_tail_start
91-
// log_tail covers the entire requested range, so no listing is required
92-
.is_none_or(|tail_start| start_version < tail_start.version)
93-
.then(|| -> DeltaResult<_> {
94-
// NOTE: since engine APIs don't limit listing, we list from start_version and filter
95-
Ok(storage
90+
let listed_files = if log_tail_start.is_some_and(|tail| tail.version <= start_version) {
91+
None
92+
} else {
93+
Some(
94+
storage
9695
.list_from(&start_from)?
9796
.map(|meta| ParsedLogPath::try_from(meta?))
9897
// NOTE: this filters out .crc files etc which start with "." - some engines
@@ -106,18 +105,17 @@ fn list_log_files(
106105
// discard any path with too-large version; keep errors
107106
Ok(path) => path.version <= list_end_version,
108107
Err(_) => true,
109-
}))
110-
})
111-
.transpose()?
112-
.into_iter()
113-
.flatten();
108+
}),
109+
)
110+
};
114111

115112
// return chained [listed_files..log_tail], filtering log_tail by the requested range
116113
let filtered_log_tail = log_tail
117114
.into_iter()
118115
.filter(move |entry| entry.version >= start_version && entry.version <= end_version)
119116
.map(Ok);
120117

118+
let listed_files = listed_files.into_iter().flatten();
121119
Ok(listed_files.chain(filtered_log_tail))
122120
}
123121

@@ -254,47 +252,29 @@ impl ListedLogFiles {
254252
start_version: Option<Version>,
255253
end_version: Option<Version>,
256254
) -> DeltaResult<Self> {
257-
let log_files = list_log_files(storage, log_root, log_tail, start_version, end_version)?;
258-
259-
log_files.process_results(|iter| {
260-
let mut ascending_commit_files = Vec::new();
261-
let mut ascending_compaction_files = Vec::new();
262-
let mut checkpoint_parts = vec![];
263-
let mut latest_crc_file: Option<ParsedLogPath> = None;
264-
let mut latest_commit_file: Option<ParsedLogPath> = None;
265-
266-
// Group log files by version
267-
let log_files_per_version = iter.chunk_by(|x| x.version);
268-
269-
for (version, files) in &log_files_per_version {
270-
let mut new_checkpoint_parts = vec![];
271-
for file in files {
272-
use LogPathFileType::*;
273-
match file.file_type {
274-
Commit | StagedCommit => ascending_commit_files.push(file),
275-
CompactedCommit { hi } if end_version.is_none_or(|end| hi <= end) => {
276-
ascending_compaction_files.push(file);
277-
}
278-
CompactedCommit { .. } => (), // Failed the bounds check above
279-
SinglePartCheckpoint | UuidCheckpoint | MultiPartCheckpoint { .. } => {
280-
new_checkpoint_parts.push(file)
281-
}
282-
Crc => {
283-
let latest_crc_ref = latest_crc_file.as_ref();
284-
if latest_crc_ref.is_none_or(|latest| latest.version < file.version) {
285-
latest_crc_file = Some(file);
286-
}
287-
}
288-
Unknown => {
289-
warn!(
290-
"Found file {} with unknown file type {:?} at version {}",
291-
file.filename, file.file_type, version
292-
);
293-
}
294-
}
295-
}
255+
let mut log_files =
256+
list_log_files(storage, log_root, log_tail, start_version, end_version)?;
257+
258+
let mut ascending_commit_files = Vec::new();
259+
let mut ascending_compaction_files = Vec::new();
260+
let mut checkpoint_parts = vec![];
261+
let mut latest_crc_file: Option<ParsedLogPath> = None;
262+
let mut latest_commit_file: Option<ParsedLogPath> = None;
263+
let mut new_checkpoint_parts = vec![];
264+
let mut current_version = None;
265+
loop {
266+
let file = log_files.next().transpose()?;
267+
268+
// Flush any in-progress group unless the new file exists and belongs to it.
269+
if let Some(group_version) =
270+
current_version.filter(|v| file.as_ref().is_none_or(|f| f.version != *v))
271+
{
296272
// Group and find the first complete checkpoint for this version.
297273
// All checkpoints for the same version are equivalent, so we only take one.
274+
//
275+
// If this version has a complete checkpoint, we can drop the existing commit and
276+
// compaction files we collected so far -- except we must keep the latest commit.
277+
let new_checkpoint_parts = std::mem::take(&mut new_checkpoint_parts);
298278
if let Some((_, complete_checkpoint)) = group_checkpoint_parts(new_checkpoint_parts)
299279
.into_iter()
300280
// `num_parts` is guaranteed to be non-negative and within `usize` range
@@ -308,30 +288,59 @@ impl ListedLogFiles {
308288
// from before the checkpoint
309289
latest_commit_file = ascending_commit_files
310290
.pop()
311-
.filter(|commit| commit.version == version);
291+
.filter(|commit: &ParsedLogPath| commit.version == group_version);
312292
// Log replay only uses commits/compactions after a complete checkpoint
313293
ascending_commit_files.clear();
314294
ascending_compaction_files.clear();
315295
}
316296
}
317297

318-
// Since ascending_commit_files is cleared at each checkpoint, if it's non-empty here
319-
// it contains only commits after the most recent checkpoint. The last element is the
320-
// highest version commit overall, so we update latest_commit_file to it. If it's empty,
321-
// we keep the value set at the checkpoint (if a commit existed at the checkpoint version),
322-
// or remains None.
323-
if let Some(commit_file) = ascending_commit_files.last() {
324-
latest_commit_file = Some(commit_file.clone());
298+
// NOTE: We break for EOF only _after_ flushing the last group.
299+
let Some(file) = file else {
300+
break;
301+
};
302+
303+
// Capture the new version before we consume the file
304+
current_version = Some(file.version);
305+
306+
use LogPathFileType::*;
307+
match file.file_type {
308+
Commit | StagedCommit => ascending_commit_files.push(file),
309+
CompactedCommit { hi } if end_version.is_none_or(|end| hi <= end) => {
310+
ascending_compaction_files.push(file);
311+
}
312+
CompactedCommit { .. } => (), // Failed the bounds check above
313+
SinglePartCheckpoint | UuidCheckpoint | MultiPartCheckpoint { .. } => {
314+
new_checkpoint_parts.push(file)
315+
}
316+
Crc => {
317+
latest_crc_file.replace(file);
318+
}
319+
Unknown => {
320+
warn!(
321+
"Found file {} with unknown file type {:?} at version {}",
322+
file.filename, file.file_type, file.version
323+
);
324+
}
325325
}
326+
}
326327

327-
ListedLogFiles::try_new(
328-
ascending_commit_files,
329-
ascending_compaction_files,
330-
checkpoint_parts,
331-
latest_crc_file,
332-
latest_commit_file,
333-
)
334-
})?
328+
// Since ascending_commit_files is cleared at each checkpoint, if it's non-empty here
329+
// it contains only commits after the most recent checkpoint. The last element is the
330+
// highest version commit overall, so we update latest_commit_file to it. If it's empty,
331+
// we keep the value set at the checkpoint (if a commit existed at the checkpoint version),
332+
// or remains None.
333+
if let Some(commit_file) = ascending_commit_files.last() {
334+
latest_commit_file = Some(commit_file.clone());
335+
}
336+
337+
ListedLogFiles::try_new(
338+
ascending_commit_files,
339+
ascending_compaction_files,
340+
checkpoint_parts,
341+
latest_crc_file,
342+
latest_commit_file,
343+
)
335344
}
336345

337346
/// List all commit and checkpoint files after the provided checkpoint. It is guaranteed that all

0 commit comments

Comments
 (0)