Skip to content

Commit 4ee36ba

Browse files
gtrettenerogtret
andauthored
[BDW-887] Block Writes for Incompatible Iceberg Clients (#665)
* Block Writes for Incompatible Iceberg Clients * address comments --------- Co-authored-by: gtret <[email protected]>
1 parent f0cd348 commit 4ee36ba

File tree

12 files changed

+1507
-0
lines changed

12 files changed

+1507
-0
lines changed

metacat-connector-hive/src/main/java/com/netflix/metacat/connector/hive/converters/HiveConnectorInfoConverter.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,9 @@ public TableInfo fromIcebergTableToTableInfo(final QualifiedName name,
195195
tableParameters.put(DirectSqlTable.PARAM_PARTITION_SPEC, table.spec().toString());
196196
//adding iceberg table properties
197197
tableParameters.putAll(table.properties());
198+
199+
// Populate branch/tag metadata for optimization purposes
200+
tableParameters.putAll(tableWrapper.populateBranchTagMetadata());
198201
tableParameters.putAll(tableWrapper.getExtraProperties());
199202
final StorageInfo.StorageInfoBuilder storageInfoBuilder = StorageInfo.builder();
200203
if (tableInfo.getSerde() != null) {

metacat-connector-hive/src/main/java/com/netflix/metacat/connector/hive/iceberg/IcebergTableWrapper.java

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,99 @@
1919
import org.apache.iceberg.Table;
2020
import lombok.Data;
2121

22+
import java.util.Collections;
23+
import java.util.HashMap;
2224
import java.util.Map;
25+
import java.util.Set;
2326

2427
/**
2528
* This class represents the iceberg table.
2629
*/
2730
@Data
2831
public class IcebergTableWrapper {
32+
/** Key for indicating if the table has non-main branches. */
33+
public static final String ICEBERG_HAS_NON_MAIN_BRANCHES_KEY = "iceberg.has.non.main.branches";
34+
/** Key for indicating if the table has tags. */
35+
public static final String ICEBERG_HAS_TAGS_KEY = "iceberg.has.tags";
2936
private final Table table;
3037
private final Map<String, String> extraProperties;
38+
39+
/**
40+
* Check if the table has any non-main branches.
41+
* @return true if the table has branches other than main
42+
*/
43+
public boolean hasNonMainBranches() {
44+
final Set<String> branches = extractBranches();
45+
return branches.size() > 1;
46+
}
47+
48+
/**
49+
* Check if the table has any tags.
50+
* @return true if the table has tags
51+
*/
52+
public boolean hasTags() {
53+
final Set<String> tags = extractTags();
54+
return !tags.isEmpty();
55+
}
56+
57+
/**
58+
* Populate and return branch/tag metadata properties.
59+
* This should be called explicitly when metadata injection is needed.
60+
* @return map containing branch/tag metadata properties
61+
*/
62+
public Map<String, String> populateBranchTagMetadata() {
63+
final Map<String, String> branchTagMetadata = new HashMap<>();
64+
branchTagMetadata.put(ICEBERG_HAS_NON_MAIN_BRANCHES_KEY, String.valueOf(hasNonMainBranches()));
65+
branchTagMetadata.put(ICEBERG_HAS_TAGS_KEY, String.valueOf(hasTags()));
66+
return branchTagMetadata;
67+
}
68+
69+
/**
70+
* Get summary information about branches and tags for logging/debugging.
71+
* @return formatted string with branch and tag counts and names
72+
*/
73+
public String getBranchesAndTagsSummary() {
74+
final Set<String> branches = extractBranches();
75+
final Set<String> tags = extractTags();
76+
final StringBuilder summary = new StringBuilder();
77+
summary.append(String.format("branches=%d", branches.size()));
78+
if (!branches.isEmpty()) {
79+
summary.append(String.format(" %s", branches));
80+
}
81+
summary.append(String.format(", tags=%d", tags.size()));
82+
if (!tags.isEmpty()) {
83+
summary.append(String.format(" %s", tags));
84+
}
85+
return summary.toString();
86+
}
87+
88+
/**
89+
* Extract branch names from the table references.
90+
* @return set of branch names
91+
* @throws RuntimeException if unable to read table references
92+
*/
93+
private Set<String> extractBranches() {
94+
final var refs = table.refs();
95+
if (refs == null || refs.isEmpty()) {
96+
return Collections.emptySet();
97+
}
98+
return refs.keySet().stream()
99+
.filter(ref -> refs.get(ref).isBranch())
100+
.collect(java.util.stream.Collectors.toSet());
101+
}
102+
103+
/**
104+
* Extract tag names from the table references.
105+
* @return set of tag names
106+
* @throws RuntimeException if unable to read table references
107+
*/
108+
private Set<String> extractTags() {
109+
final var refs = table.refs();
110+
if (refs == null || refs.isEmpty()) {
111+
return Collections.emptySet();
112+
}
113+
return refs.keySet().stream()
114+
.filter(ref -> refs.get(ref).isTag())
115+
.collect(java.util.stream.Collectors.toSet());
116+
}
31117
}

metacat-connector-hive/src/test/groovy/com/netflix/metacat/connector/hive/converters/HiveConnectorInfoConvertorSpec.groovy

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -516,6 +516,10 @@ class HiveConnectorInfoConvertorSpec extends Specification{
516516
2 * field.transform() >> Mock(Identity)
517517
1 * icebergTable.properties() >> ["test":"abd"]
518518
2 * icebergTable.spec() >> partSpec
519+
2 * icebergTable.refs() >> ["main": Mock(org.apache.iceberg.SnapshotRef) {
520+
isBranch() >> true
521+
isTag() >> false
522+
}]
519523
1 * partSpec.fields() >> [ field]
520524
1 * icebergTable.schema() >> schema
521525
1 * schema.columns() >> [nestedField, nestedField2]
@@ -537,4 +541,31 @@ class HiveConnectorInfoConvertorSpec extends Specification{
537541
tableInfo.getFields().get(0).getComment() == 'fieldName doc'
538542

539543
}
544+
545+
def "test fromIcebergTableToTableInfo includes branch/tag metadata"() {
546+
def icebergTable = Mock(org.apache.iceberg.Table)
547+
def icebergTableWrapper = Mock(IcebergTableWrapper)
548+
when:
549+
def tableInfo = converter.fromIcebergTableToTableInfo(QualifiedName.ofTable('c', 'd', 't'),
550+
icebergTableWrapper, "/tmp/test", TableInfo.builder().build() )
551+
then:
552+
1 * icebergTableWrapper.getTable() >> icebergTable
553+
1 * icebergTableWrapper.populateBranchTagMetadata() >> [
554+
"iceberg.has.non.main.branches": "true",
555+
"iceberg.has.tags": "false"
556+
] // Called by converter and returns metadata map
557+
1 * icebergTableWrapper.getExtraProperties() >> [:] // Called by converter for other properties
558+
1 * icebergTable.properties() >> [:]
559+
1 * icebergTable.schema() >> Mock(Schema) {
560+
columns() >> []
561+
}
562+
2 * icebergTable.spec() >> Mock(PartitionSpec) {
563+
fields() >> []
564+
toString() >> "[]"
565+
}
566+
567+
// Verify that non-main-branch/tag metadata is injected via extraProperties
568+
tableInfo.getMetadata().get("iceberg.has.non.main.branches") == "true"
569+
tableInfo.getMetadata().get("iceberg.has.tags") == "false"
570+
}
540571
}
Lines changed: 209 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,209 @@
1+
/*
2+
* Copyright 2018 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.netflix.metacat.connector.hive.iceberg
18+
19+
import com.fasterxml.jackson.databind.JsonNode
20+
import com.fasterxml.jackson.databind.ObjectMapper
21+
import spock.lang.Specification
22+
23+
/**
24+
* Validation tests for metadata files used in functional tests.
25+
* This ensures our test metadata files are correctly formatted.
26+
*/
27+
class IcebergMetadataValidationSpec extends Specification {
28+
29+
def objectMapper = new ObjectMapper()
30+
31+
def "test metadata file with branches and tags is valid JSON"() {
32+
given:
33+
def metadataPath = "../metacat-functional-tests/metacat-test-cluster/etc-metacat/data/metadata/00003-with-branches-and-tags.metadata.json"
34+
def file = new File(metadataPath)
35+
36+
when:
37+
def json = objectMapper.readTree(file)
38+
39+
then:
40+
noExceptionThrown()
41+
json.has("format-version")
42+
json.get("format-version").asInt() == 2
43+
json.has("refs")
44+
45+
def refs = json.get("refs")
46+
refs.has("main")
47+
refs.has("feature-branch")
48+
refs.has("experimental")
49+
refs.has("v1.0.0")
50+
refs.has("v2.0.0")
51+
refs.has("release-2024-01")
52+
53+
// Verify branches
54+
refs.get("main").get("type").asText() == "branch"
55+
refs.get("feature-branch").get("type").asText() == "branch"
56+
refs.get("experimental").get("type").asText() == "branch"
57+
58+
// Verify tags
59+
refs.get("v1.0.0").get("type").asText() == "tag"
60+
refs.get("v2.0.0").get("type").asText() == "tag"
61+
refs.get("release-2024-01").get("type").asText() == "tag"
62+
}
63+
64+
def "test metadata file with main branch only is valid JSON"() {
65+
given:
66+
def metadataPath = "../metacat-functional-tests/metacat-test-cluster/etc-metacat/data/metadata/00004-main-branch-only.metadata.json"
67+
def file = new File(metadataPath)
68+
69+
when:
70+
def json = objectMapper.readTree(file)
71+
72+
then:
73+
noExceptionThrown()
74+
json.has("format-version")
75+
json.get("format-version").asInt() == 2
76+
json.has("refs")
77+
78+
def refs = json.get("refs")
79+
refs.has("main")
80+
refs.size() == 1 // Only main branch
81+
82+
// Verify only main branch exists
83+
refs.get("main").get("type").asText() == "branch"
84+
}
85+
86+
def "test metadata file created with Iceberg client < 0.14.1 (no refs section) is valid JSON"() {
87+
given:
88+
def metadataPath = "../metacat-functional-tests/metacat-test-cluster/etc-metacat/data/metadata/00005-old-client-no-refs.metadata.json"
89+
def file = new File(metadataPath)
90+
91+
when:
92+
def json = objectMapper.readTree(file)
93+
94+
then:
95+
noExceptionThrown()
96+
json.has("format-version")
97+
json.get("format-version").asInt() == 1
98+
99+
// Tables created with Iceberg < 0.14.1 should NOT have refs section
100+
!json.has("refs")
101+
102+
json.has("table-uuid")
103+
json.has("location")
104+
json.has("schema")
105+
json.has("current-schema-id")
106+
json.has("schemas")
107+
json.has("last-assigned-partition-id")
108+
json.has("current-snapshot-id")
109+
json.has("snapshots")
110+
111+
// ~Dec 2021 (before Iceberg 0.14.1 refs support)
112+
json.get("last-updated-ms").asLong() == 1640909164815L
113+
}
114+
115+
def "test metadata file v2 format has correct field names"() {
116+
given:
117+
def metadataPath = "../metacat-functional-tests/metacat-test-cluster/etc-metacat/data/metadata/00003-with-branches-and-tags.metadata.json"
118+
def file = new File(metadataPath)
119+
120+
when:
121+
def json = objectMapper.readTree(file)
122+
123+
then:
124+
noExceptionThrown()
125+
json.has("format-version")
126+
json.get("format-version").asInt() == 2
127+
128+
// Critical: v2 format must have "last-partition-id" not "last-assigned-partition-id"
129+
json.has("last-partition-id")
130+
!json.has("last-assigned-partition-id") // Wrong field name for v2
131+
132+
// v2 format specific requirements
133+
json.has("last-sequence-number")
134+
json.has("refs") // v2 files should have refs section
135+
}
136+
137+
def "test metadata files structure matches expected branch/tag counts"() {
138+
expect:
139+
def branchesTagsFile = new File("../metacat-functional-tests/metacat-test-cluster/etc-metacat/data/metadata/00003-with-branches-and-tags.metadata.json")
140+
def mainOnlyFile = new File("../metacat-functional-tests/metacat-test-cluster/etc-metacat/data/metadata/00004-main-branch-only.metadata.json")
141+
def oldClientFile = new File("../metacat-functional-tests/metacat-test-cluster/etc-metacat/data/metadata/00005-old-client-no-refs.metadata.json")
142+
def v1WithRefsFile = new File("../metacat-functional-tests/metacat-test-cluster/etc-metacat/data/metadata/00006-v1-with-branches-tags.metadata.json")
143+
144+
def branchesTagsJson = objectMapper.readTree(branchesTagsFile)
145+
def mainOnlyJson = objectMapper.readTree(mainOnlyFile)
146+
def oldClientJson = objectMapper.readTree(oldClientFile)
147+
def v1WithRefsJson = objectMapper.readTree(v1WithRefsFile)
148+
149+
// Count branches and tags in the branches/tags file
150+
def branchesTagsRefs = branchesTagsJson.get("refs")
151+
def branchCount = 0
152+
def tagCount = 0
153+
branchesTagsRefs.fields().forEachRemaining { entry ->
154+
if (entry.value.get("type").asText() == "branch") {
155+
branchCount++
156+
} else if (entry.value.get("type").asText() == "tag") {
157+
tagCount++
158+
}
159+
}
160+
161+
// Count branches in the main-only file
162+
def mainOnlyRefs = mainOnlyJson.get("refs")
163+
def mainOnlyBranchCount = 0
164+
def mainOnlyTagCount = 0
165+
mainOnlyRefs.fields().forEachRemaining { entry ->
166+
if (entry.value.get("type").asText() == "branch") {
167+
mainOnlyBranchCount++
168+
} else if (entry.value.get("type").asText() == "tag") {
169+
mainOnlyTagCount++
170+
}
171+
}
172+
173+
// Count branches and tags in the v1-with-refs file
174+
def v1WithRefsRefs = v1WithRefsJson.get("refs")
175+
def v1WithRefsBranchCount = 0
176+
def v1WithRefsTagCount = 0
177+
v1WithRefsRefs.fields().forEachRemaining { entry ->
178+
if (entry.value.get("type").asText() == "branch") {
179+
v1WithRefsBranchCount++
180+
} else if (entry.value.get("type").asText() == "tag") {
181+
v1WithRefsTagCount++
182+
}
183+
}
184+
185+
// JSON metadata vs Iceberg runtime behavior
186+
// - JSON metadata: Iceberg < 0.14.1 client tables have NO refs section
187+
// - Iceberg runtime: ALL tables get at least a "main" branch (auto-created by Iceberg)
188+
!oldClientJson.has("refs") // JSON has no refs section for pre-0.14.1 clients
189+
oldClientJson.get("format-version").asInt() == 1
190+
191+
// Verify expected counts for our integration tests
192+
branchCount == 3 // main, feature-branch, experimental
193+
tagCount == 3 // v1.0.0, v2.0.0, release-2024-01
194+
mainOnlyBranchCount == 1 // only main
195+
mainOnlyTagCount == 0 // no tags
196+
v1WithRefsBranchCount == 2 // main, dev-branch
197+
v1WithRefsTagCount == 1 // v3.0.0
198+
199+
// This confirms our integration test expectations (via Iceberg runtime, not JSON):
200+
// - branchesTagsFile should trigger hasNonMainBranches() == true (3 > 1)
201+
// - branchesTagsFile should trigger hasTags() == true (3 > 0)
202+
// - mainOnlyFile should trigger hasNonMainBranches() == false (1 == 1)
203+
// - mainOnlyFile should trigger hasTags() == false (0 == 0)
204+
// - oldClientFile (Iceberg < 0.14.1): JSON has no refs, but Iceberg runtime auto-creates main branch
205+
// so hasNonMainBranches() == false (1 branch - auto-created main only), hasTags() == false (0 tags)
206+
// - v1WithRefsFile should trigger hasNonMainBranches() == true (2 > 1)
207+
// - v1WithRefsFile should trigger hasTags() == true (1 > 0)
208+
}
209+
}

0 commit comments

Comments
 (0)