Skip to content

Commit dc3bc13

Browse files
gabeiglioGabriel Igliozzi
andauthored
ignore void transform partitions from current partition field list (#606)
* ignore void transform partitions from current partition field list * use correct method calls to get partition schema * test tableDTO instead of tableDto * Add safe-guard to omit void partitions * revert fast property changes * Delete omitVoidPartitionFields from Config interface and impl * revert last changes * use equalsIgnoreCase and remove extra lines --------- Co-authored-by: Gabriel Igliozzi <[email protected]>
1 parent 3dd4eb4 commit dc3bc13

File tree

6 files changed

+215
-6
lines changed

6 files changed

+215
-6
lines changed

metacat-connector-hive/src/main/java/com/netflix/metacat/connector/hive/converters/HiveConnectorInfoConverter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,7 @@ public TableInfo fromIcebergTableToTableInfo(final QualifiedName name,
188188
final TableInfo tableInfo) {
189189
final org.apache.iceberg.Table table = tableWrapper.getTable();
190190
final List<FieldInfo> allFields =
191-
this.hiveTypeConverter.icebergeSchemaTofieldDtos(table.schema(), table.spec().fields());
191+
this.hiveTypeConverter.icebergSchemaTofieldDtos(table.schema(), table.spec().fields());
192192
final Map<String, String> tableParameters = new HashMap<>();
193193
tableParameters.put(DirectSqlTable.PARAM_TABLE_TYPE, DirectSqlTable.ICEBERG_TABLE_TYPE);
194194
tableParameters.put(DirectSqlTable.PARAM_METADATA_LOCATION, tableLoc);

metacat-connector-hive/src/main/java/com/netflix/metacat/connector/hive/converters/HiveTypeConverter.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -108,12 +108,13 @@ public Type toMetacatType(final String type) {
108108
* @param partitionFields partitioned fields
109109
* @return list of field Info
110110
*/
111-
public List<FieldInfo> icebergeSchemaTofieldDtos(final Schema schema,
111+
public List<FieldInfo> icebergSchemaTofieldDtos(final Schema schema,
112112
final List<PartitionField> partitionFields) {
113113
final List<FieldInfo> fields = Lists.newArrayList();
114-
final List<String> partitionNames =
115-
partitionFields.stream()
116-
.map(f -> schema.findField(f.sourceId()).name()).collect(Collectors.toList());
114+
final List<String> partitionNames = partitionFields.stream()
115+
.filter(f -> f.transform() != null && !f.transform().toString().equalsIgnoreCase("void"))
116+
.map(f -> schema.findField(f.sourceId()).name())
117+
.collect(Collectors.toList());
117118

118119
for (Types.NestedField field : schema.columns()) {
119120
final FieldInfo fieldInfo = new FieldInfo();

metacat-connector-hive/src/test/groovy/com/netflix/metacat/connector/hive/converters/HiveConnectorInfoConvertorSpec.groovy

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package com.netflix.metacat.connector.hive.converters
1919
import org.apache.iceberg.PartitionField
2020
import org.apache.iceberg.PartitionSpec
2121
import org.apache.iceberg.Schema
22+
import org.apache.iceberg.transforms.Identity
2223
import org.apache.iceberg.types.Type
2324
import org.apache.iceberg.types.Types
2425
import com.netflix.metacat.common.QualifiedName
@@ -58,7 +59,7 @@ class HiveConnectorInfoConvertorSpec extends Specification{
5859
def setup() {
5960
// Stub this to always return true
6061
config.isEpochInSeconds() >> true
61-
converter = new HiveConnectorInfoConverter( new HiveTypeConverter())
62+
converter = new HiveConnectorInfoConverter(new HiveTypeConverter())
6263
}
6364

6465
def 'test date to epoch seconds'() {
@@ -512,6 +513,7 @@ class HiveConnectorInfoConvertorSpec extends Specification{
512513
def tableInfo = converter.fromIcebergTableToTableInfo(QualifiedName.ofTable('c', 'd', 't'),
513514
icebergTableWrapper, "/tmp/test", TableInfo.builder().build() )
514515
then:
516+
2 * field.transform() >> Mock(Identity)
515517
1 * icebergTable.properties() >> ["test":"abd"]
516518
2 * icebergTable.spec() >> partSpec
517519
1 * partSpec.fields() >> [ field]

metacat-connector-hive/src/test/groovy/com/netflix/metacat/connector/hive/converters/HiveTypeConverterSpec.groovy

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,11 @@
1313

1414
package com.netflix.metacat.connector.hive.converters
1515

16+
import org.apache.iceberg.PartitionField
17+
import org.apache.iceberg.transforms.Identity
18+
import org.apache.iceberg.transforms.VoidTransform
19+
import org.apache.iceberg.Schema
20+
import org.apache.iceberg.types.Types
1621
import spock.lang.Shared
1722
import spock.lang.Specification
1823
import spock.lang.Unroll
@@ -255,4 +260,43 @@ class HiveTypeConverterSpec extends Specification {
255260
"array<struct<date:string,countryCodes:array<string>,source:string>>" | """{"type":"array","elementType":{"type":"row","fields":[{"name":"date","type":"string"},{"name":"countryCodes","type":{"type":"array","elementType":"string"}},{"name":"source","type":"string"}]}}"""
256261
"array<struct<Date:string,nestedArray:array<struct<date:string,countryCodes:array<string>,source:string>>>>" | """{"type":"array","elementType":{"type":"row","fields":[{"name":"Date","type":"string"},{"name":"nestedArray","type":{"type":"array","elementType":{"type":"row","fields":[{"name":"date","type":"string"},{"name":"countryCodes","type":{"type":"array","elementType":"string"}},{"name":"source","type":"string"}]}}}]}}"""
257262
}
263+
264+
def "Test treat void transforms partitions as non-partition field"() {
265+
given:
266+
// Initial schema with three fields
267+
def initialSchema = new Schema(
268+
Types.NestedField.optional(1, "field1", Types.BooleanType.get(), "added 1st - partition key"),
269+
Types.NestedField.optional(2, "field2", Types.StringType.get(), "added 2nd"),
270+
Types.NestedField.optional(3, "field3", Types.IntegerType.get(), "added 3rd")
271+
)
272+
273+
// Initial partition fields
274+
def initialPartitionFields = [
275+
new PartitionField(1, 1, "field1", new Identity()),
276+
new PartitionField(2, 2, "field2", new VoidTransform<String>()),
277+
]
278+
279+
when:
280+
def fieldDtos = this.converter.icebergSchemaTofieldDtos(initialSchema, initialPartitionFields)
281+
282+
then:
283+
fieldDtos.size() == 3
284+
285+
// Validate the first field
286+
def field1 = fieldDtos.find { it.name == "field1" }
287+
field1 != null
288+
field1.partitionKey == true
289+
290+
// Validate the second field
291+
def field2 = fieldDtos.find { it.name == "field2" }
292+
field2 != null
293+
field2.partitionKey == false
294+
295+
// Validate the third field
296+
def field3 = fieldDtos.find { it.name == "field3" }
297+
field3 != null
298+
field3.partitionKey == false
299+
300+
noExceptionThrown()
301+
}
258302
}
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
{
2+
"format-version" : 1,
3+
"table-uuid" : "77ea2333-acd1-4d8b-9870-b3dcece00c87",
4+
"location" : "file:/tmp/dat",
5+
"last-updated-ms" : 1725487921787,
6+
"last-column-id" : 4,
7+
"schema" : {
8+
"type" : "struct",
9+
"schema-id" : 0,
10+
"fields" : [ {
11+
"id" : 1,
12+
"name" : "field1",
13+
"required" : false,
14+
"type" : "long"
15+
}, {
16+
"id" : 2,
17+
"name" : "field2",
18+
"required" : false,
19+
"type" : "string"
20+
}, {
21+
"id" : 3,
22+
"name" : "field3",
23+
"required" : false,
24+
"type" : "long"
25+
}, {
26+
"id" : 4,
27+
"name" : "field4",
28+
"required" : false,
29+
"type" : "int"
30+
} ]
31+
},
32+
"current-schema-id" : 0,
33+
"schemas" : [ {
34+
"type" : "struct",
35+
"schema-id" : 0,
36+
"fields" : [ {
37+
"id" : 1,
38+
"name" : "field1",
39+
"required" : false,
40+
"type" : "long"
41+
}, {
42+
"id" : 2,
43+
"name" : "field2",
44+
"required" : false,
45+
"type" : "string"
46+
}, {
47+
"id" : 3,
48+
"name" : "field3",
49+
"required" : false,
50+
"type" : "long"
51+
}, {
52+
"id" : 4,
53+
"name" : "field4",
54+
"required" : false,
55+
"type" : "int"
56+
} ]
57+
} ],
58+
"partition-spec" : [ {
59+
"name" : "field1",
60+
"transform" : "identity",
61+
"source-id" : 1,
62+
"field-id" : 1000
63+
}, {
64+
"name" : "field2",
65+
"transform" : "void",
66+
"source-id" : 2,
67+
"field-id" : 1001
68+
} ],
69+
"default-spec-id" : 1,
70+
"partition-specs" : [ {
71+
"spec-id" : 0,
72+
"fields" : [ {
73+
"name" : "field1",
74+
"transform" : "identity",
75+
"source-id" : 1,
76+
"field-id" : 1000
77+
}, {
78+
"name" : "field2",
79+
"transform" : "identity",
80+
"source-id" : 2,
81+
"field-id" : 1001
82+
} ]
83+
}, {
84+
"spec-id" : 1,
85+
"fields" : [ {
86+
"name" : "field1",
87+
"transform" : "identity",
88+
"source-id" : 1,
89+
"field-id" : 1000
90+
}, {
91+
"name" : "field2",
92+
"transform" : "void",
93+
"source-id" : 2,
94+
"field-id" : 1001
95+
} ]
96+
} ],
97+
"last-partition-id" : 1001,
98+
"default-sort-order-id" : 0,
99+
"sort-orders" : [ {
100+
"order-id" : 0,
101+
"fields" : [ ]
102+
} ],
103+
"properties" : {
104+
"owner" : "owner",
105+
"acls" : "[{\"format_version\":1,\"principals\":[{\"name\":\"123\",\"principal_type\":\"USER\"}],\"resources\":[{\"resource_type\":\"TABLE\",\"uuid\":\"77ea2333-acd1-4d8b-9870-b3dcece00c87\",\"parent\":{\"resource_type\":\"SCHEMA\",\"name\":\"owner\",\"parent\":{\"resource_type\":\"CATALOG\",\"name\":\"prodhive\",\"parent\":null}}}],\"privileges\":[\"ALL\"],\"grantee\":{\"name\":\"123\",\"principal_type\":\"USER\"},\"with_grant\":true},{\"format_version\":1,\"principals\":[{\"name\":\"123\",\"principal_type\":\"USER\"}],\"resources\":[{\"resource_type\":\"TABLE\",\"uuid\":\"77ea2333-acd1-4d8b-9870-b3dcece00c87\",\"parent\":{\"resource_type\":\"SCHEMA\",\"name\":\"owner\",\"parent\":{\"resource_type\":\"CATALOG\",\"name\":\"prodhive\",\"parent\":null}}}],\"privileges\":[\"ALL\"],\"grantee\":{\"name\":\"123\",\"principal_type\":\"USER\"},\"with_grant\":false}]",
106+
"field.metadata.json" : "{\"1\":{},\"2\":{},\"3\":{},\"4\":{}}"
107+
},
108+
"current-snapshot-id" : -1,
109+
"refs" : { },
110+
"snapshots" : [ ],
111+
"statistics" : [ ],
112+
"snapshot-log" : [ ],
113+
"metadata-log" : [ {
114+
"timestamp-ms" : 1725487921770,
115+
"metadata-file" : "file:/tmp/data/metadata/00001-abf48887-aa4f-4bcc-9219-1e1721314ee1.metadata.json"
116+
} ]
117+
}

metacat-functional-tests/src/functionalTest/groovy/com/netflix/metacat/MetacatSmokeSpec.groovy

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ import feign.RetryableException
4141
import feign.Retryer
4242
import groovy.sql.Sql
4343
import org.apache.commons.io.FileUtils
44+
import org.apache.iceberg.PartitionField
4445
import org.joda.time.Instant
4546
import org.skyscreamer.jsonassert.JSONAssert
4647
import spock.lang.Ignore
@@ -831,6 +832,50 @@ class MetacatSmokeSpec extends Specification {
831832
api.deleteTable(catalogName, databaseName, tableName)
832833
}
833834

835+
@Unroll
836+
def "Test ignore void transform as partition fields"() {
837+
given:
838+
def catalogName = 'embedded-fast-hive-metastore'
839+
def databaseName = 'iceberg_db'
840+
def tableName = 'iceberg_table_6'
841+
def uri = isLocalEnv ? String.format('file:/tmp/data/') : null
842+
def tableDto = new TableDto(
843+
name: QualifiedName.ofTable(catalogName, databaseName, tableName),
844+
serde: new StorageDto(
845+
owner: 'metacat-test',
846+
inputFormat: 'org.apache.hadoop.mapred.TextInputFormat',
847+
outputFormat: 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat',
848+
serializationLib: 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe',
849+
parameters: [
850+
'serialization.format': '1'
851+
],
852+
uri: uri
853+
),
854+
definitionMetadata: null,
855+
dataMetadata: null,
856+
fields: null
857+
)
858+
859+
def metadataLocation = String.format('/tmp/data/metadata/00001-ba4b775c-7b1a-4a6f-aec0-03d3c851088c.metadata.json')
860+
def metadata = [table_type: 'ICEBERG', metadata_location: metadataLocation]
861+
tableDto.setMetadata(metadata)
862+
863+
when:
864+
try {api.createDatabase(catalogName, databaseName, new DatabaseCreateRequestDto())
865+
} catch (Exception ignored) {
866+
}
867+
api.createTable(catalogName, databaseName, tableName, tableDto)
868+
def tableDTO = api.getTable(catalogName, databaseName, tableName, true, true, true)
869+
870+
then:
871+
tableDTO.getFields().size() == 4
872+
tableDTO.getPartition_keys().size() == 1
873+
tableDTO.getPartition_keys()[0] == "field1"
874+
875+
cleanup:
876+
api.deleteTable(catalogName, databaseName, tableName)
877+
}
878+
834879
@Unroll
835880
def "Test get partitions from iceberg table using #filter"() {
836881
def catalogName = 'embedded-fast-hive-metastore'

0 commit comments

Comments
 (0)