Skip to content

Commit 5624a6a

Browse files
authored
fix: Don't return errors from ParsedLogPath::try_from (#1433)
## What changes are proposed in this pull request? This change effectively makes log path parsing infallible. We no longer return errors from `ParsedLogPath::try_from`. This fixes #1432. This _also_ turns any valid looking files that aren't in an expected subdir to be of "Unknown" type. (either `_delta_log` for most everything except `_staged_commits` for staged files) As a follow-up we need to _actually_ just return an `Option<ParsedLogPath<Location>>` from that method. Doing so requires numerous tricky changes in our log listing code and I wanted to get a more targeted fix in for the issue and then we can tackle the refactor at a more leisurely pace. At that time I think we could also clean up the `try_from` method logic to make it a bit more clear, but again I didn't want to make large changes close to release. ## How was this change tested? Modified existing unit tests, added a new test in `read.rs` that we can read a table with a `0.zip` file in the log dir.
1 parent d2ba5ea commit 5624a6a

File tree

4 files changed

+268
-74
lines changed

4 files changed

+268
-74
lines changed

kernel/src/checkpoint/tests.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -181,11 +181,11 @@ fn write_commit_to_store(
181181
.collect();
182182
let content = json_lines.join("\n");
183183

184-
let commit_path = format!("_delta_log/{}", delta_path_for_version(version, "json"));
184+
let commit_path = delta_path_for_version(version, "json");
185185

186186
tokio::runtime::Runtime::new()
187187
.expect("create tokio runtime")
188-
.block_on(async { store.put(&Path::from(commit_path), content.into()).await })?;
188+
.block_on(async { store.put(&commit_path, content.into()).await })?;
189189

190190
Ok(())
191191
}

kernel/src/log_segment/tests.rs

Lines changed: 49 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1760,12 +1760,18 @@ fn test_debug_assert_listed_log_file_in_order_compaction_files() {
17601760
let _ = ListedLogFiles::try_new(
17611761
vec![],
17621762
vec![
1763-
create_log_path("file:///00000000000000000000.00000000000000000004.compacted.json"),
1764-
create_log_path("file:///00000000000000000001.00000000000000000002.compacted.json"),
1763+
create_log_path(
1764+
"file:///_delta_log/00000000000000000000.00000000000000000004.compacted.json",
1765+
),
1766+
create_log_path(
1767+
"file:///_delta_log/00000000000000000001.00000000000000000002.compacted.json",
1768+
),
17651769
],
17661770
vec![],
17671771
None,
1768-
Some(create_log_path("file:///00000000000000000001.json")),
1772+
Some(create_log_path(
1773+
"file:///_delta_log/00000000000000000001.json",
1774+
)),
17691775
);
17701776
}
17711777

@@ -1776,12 +1782,18 @@ fn test_debug_assert_listed_log_file_out_of_order_compaction_files() {
17761782
let _ = ListedLogFiles::try_new(
17771783
vec![],
17781784
vec![
1779-
create_log_path("file:///00000000000000000000.00000000000000000004.compacted.json"),
1780-
create_log_path("file:///00000000000000000000.00000000000000000003.compacted.json"),
1785+
create_log_path(
1786+
"file:///_delta_log/00000000000000000000.00000000000000000004.compacted.json",
1787+
),
1788+
create_log_path(
1789+
"file:///_delta_log/00000000000000000000.00000000000000000003.compacted.json",
1790+
),
17811791
],
17821792
vec![],
17831793
None,
1784-
Some(create_log_path("file:///00000000000000000001.json")),
1794+
Some(create_log_path(
1795+
"file:///_delta_log/00000000000000000001.json",
1796+
)),
17851797
);
17861798
}
17871799

@@ -1793,11 +1805,17 @@ fn test_debug_assert_listed_log_file_different_multipart_checkpoint_versions() {
17931805
vec![],
17941806
vec![],
17951807
vec![
1796-
create_log_path("00000000000000000010.checkpoint.0000000001.0000000002.parquet"),
1797-
create_log_path("00000000000000000011.checkpoint.0000000002.0000000002.parquet"),
1808+
create_log_path(
1809+
"file:///_delta_log/00000000000000000010.checkpoint.0000000001.0000000002.parquet",
1810+
),
1811+
create_log_path(
1812+
"file:///_delta_log/00000000000000000011.checkpoint.0000000002.0000000002.parquet",
1813+
),
17981814
],
17991815
None,
1800-
Some(create_log_path("file:///00000000000000000001.json")),
1816+
Some(create_log_path(
1817+
"file:///_delta_log/00000000000000000001.json",
1818+
)),
18011819
);
18021820
}
18031821

@@ -1809,11 +1827,17 @@ fn test_debug_assert_listed_log_file_invalid_multipart_checkpoint() {
18091827
vec![],
18101828
vec![],
18111829
vec![
1812-
create_log_path("00000000000000000010.checkpoint.0000000001.0000000003.parquet"),
1813-
create_log_path("00000000000000000011.checkpoint.0000000002.0000000003.parquet"),
1830+
create_log_path(
1831+
"file:///_delta_log/00000000000000000010.checkpoint.0000000001.0000000003.parquet",
1832+
),
1833+
create_log_path(
1834+
"file:///_delta_log/00000000000000000011.checkpoint.0000000002.0000000003.parquet",
1835+
),
18141836
],
18151837
None,
1816-
Some(create_log_path("file:///00000000000000000001.json")),
1838+
Some(create_log_path(
1839+
"file:///_delta_log/00000000000000000001.json",
1840+
)),
18171841
);
18181842
}
18191843

@@ -2187,27 +2211,31 @@ fn test_latest_commit_file_edge_case_commit_before_checkpoint() {
21872211
fn test_log_segment_contiguous_commit_files() {
21882212
let res = ListedLogFiles::try_new(
21892213
vec![
2190-
create_log_path("file:///00000000000000000001.json"),
2191-
create_log_path("file:///00000000000000000002.json"),
2192-
create_log_path("file:///00000000000000000003.json"),
2214+
create_log_path("file:///_delta_log/00000000000000000001.json"),
2215+
create_log_path("file:///_delta_log/00000000000000000002.json"),
2216+
create_log_path("file:///_delta_log/00000000000000000003.json"),
21932217
],
21942218
vec![],
21952219
vec![],
21962220
None,
2197-
Some(create_log_path("file:///00000000000000000001.json")),
2221+
Some(create_log_path(
2222+
"file:///_delta_log/00000000000000000001.json",
2223+
)),
21982224
);
21992225
assert!(res.is_ok());
22002226

22012227
// allow gaps in ListedLogFiles
22022228
let listed = ListedLogFiles::try_new(
22032229
vec![
2204-
create_log_path("file:///00000000000000000001.json"),
2205-
create_log_path("file:///00000000000000000003.json"),
2230+
create_log_path("file:///_delta_log/00000000000000000001.json"),
2231+
create_log_path("file:///_delta_log/00000000000000000003.json"),
22062232
],
22072233
vec![],
22082234
vec![],
22092235
None,
2210-
Some(create_log_path("file:///00000000000000000001.json")),
2236+
Some(create_log_path(
2237+
"file:///_delta_log/00000000000000000001.json",
2238+
)),
22112239
);
22122240

22132241
// disallow gaps in LogSegment
@@ -2217,11 +2245,11 @@ fn test_log_segment_contiguous_commit_files() {
22172245
"Generic delta kernel error: Expected ordered \
22182246
contiguous commit files [ParsedLogPath { location: FileMeta { location: Url { scheme: \
22192247
\"file\", cannot_be_a_base: false, username: \"\", password: None, host: None, port: \
2220-
None, path: \"/00000000000000000001.json\", query: None, fragment: None }, last_modified: \
2248+
None, path: \"/_delta_log/00000000000000000001.json\", query: None, fragment: None }, last_modified: \
22212249
0, size: 0 }, filename: \"00000000000000000001.json\", extension: \"json\", version: 1, \
22222250
file_type: Commit }, ParsedLogPath { location: FileMeta { location: Url { scheme: \
22232251
\"file\", cannot_be_a_base: false, username: \"\", password: None, host: None, port: \
2224-
None, path: \"/00000000000000000003.json\", query: None, fragment: None }, last_modified: \
2252+
None, path: \"/_delta_log/00000000000000000003.json\", query: None, fragment: None }, last_modified: \
22252253
0, size: 0 }, filename: \"00000000000000000003.json\", extension: \"json\", version: 3, \
22262254
file_type: Commit }]",
22272255
);

0 commit comments

Comments
 (0)