-
Notifications
You must be signed in to change notification settings - Fork 508
feat: Iceberg v3 deletion vectors #2561
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Draft
Shekharrajak
wants to merge
5
commits into
apache:main
Choose a base branch
from
Shekharrajak:feature/iceberg-v3-deletion-vectors
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Draft
Changes from all commits
Commits
Show all changes
5 commits
Select commit
Hold shift + click to select a range
4a8ff81
[iceberg] Add FormatVersionManager for V3 feature gating
Shekharrajak 34f0091
[iceberg] Add V3DeltaTaskWriter for deletion vector support
Shekharrajak c44b9ca
Implement V3DVWriter with Puffin deletion vectors
Shekharrajak d8da739
Add comprehensive DV tests with position validation
Shekharrajak 566aaee
Clarify DV test comments to show when deletions happen
Shekharrajak File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
214 changes: 214 additions & 0 deletions
214
...iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/writer/V3DeltaTaskWriter.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,214 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.fluss.lake.iceberg.tiering.writer; | ||
|
|
||
| import org.apache.fluss.lake.iceberg.version.FormatVersionManager; | ||
|
|
||
| import org.apache.iceberg.DeleteFile; | ||
| import org.apache.iceberg.FileFormat; | ||
| import org.apache.iceberg.PartitionKey; | ||
| import org.apache.iceberg.PartitionSpec; | ||
| import org.apache.iceberg.Schema; | ||
| import org.apache.iceberg.StructLike; | ||
| import org.apache.iceberg.Table; | ||
| import org.apache.iceberg.data.Record; | ||
| import org.apache.iceberg.deletes.BaseDVFileWriter; | ||
| import org.apache.iceberg.deletes.DVFileWriter; | ||
| import org.apache.iceberg.deletes.PositionDeleteIndex; | ||
| import org.apache.iceberg.io.BaseTaskWriter; | ||
| import org.apache.iceberg.io.DeleteWriteResult; | ||
| import org.apache.iceberg.io.FileAppenderFactory; | ||
| import org.apache.iceberg.io.FileIO; | ||
| import org.apache.iceberg.io.OutputFileFactory; | ||
| import org.apache.iceberg.io.WriteResult; | ||
| import org.apache.iceberg.relocated.com.google.common.collect.Lists; | ||
| import org.apache.iceberg.util.StructLikeMap; | ||
| import org.apache.iceberg.util.StructLikeUtil; | ||
| import org.apache.iceberg.util.StructProjection; | ||
|
|
||
| import javax.annotation.Nullable; | ||
|
|
||
| import java.io.IOException; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
| import java.util.function.Function; | ||
|
|
||
| import static org.apache.fluss.lake.iceberg.utils.IcebergConversions.toPartition; | ||
|
|
||
| /** V3-aware delta writer using deletion vectors for position deletes when supported. */ | ||
| public class V3DeltaTaskWriter extends BaseTaskWriter<Record> { | ||
|
|
||
| private final BaseEqualityDeltaWriter deltaWriter; | ||
| private final boolean useDeletionVectors; | ||
|
|
||
| public V3DeltaTaskWriter( | ||
| Table icebergTable, | ||
| Schema deleteSchema, | ||
| FileFormat format, | ||
| FileAppenderFactory<Record> appenderFactory, | ||
| OutputFileFactory fileFactory, | ||
| FileIO io, | ||
| long targetFileSize, | ||
| @Nullable String partition, | ||
| int bucket) { | ||
| super(icebergTable.spec(), format, appenderFactory, fileFactory, io, targetFileSize); | ||
|
|
||
| this.useDeletionVectors = FormatVersionManager.supportsDeletionVectors(icebergTable); | ||
| PartitionKey partitionKey = toPartition(icebergTable, partition, bucket); | ||
|
|
||
| if (useDeletionVectors) { | ||
| this.deltaWriter = | ||
| new V3DVWriter( | ||
| partitionKey, | ||
| icebergTable.schema(), | ||
| deleteSchema, | ||
| icebergTable.spec(), | ||
| fileFactory); | ||
| } else { | ||
| this.deltaWriter = | ||
| new V2EqualityDeltaWriter(partitionKey, icebergTable.schema(), deleteSchema); | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public void write(Record row) throws IOException { | ||
| deltaWriter.write(row); | ||
| } | ||
|
|
||
| public void delete(Record row) throws IOException { | ||
| deltaWriter.delete(row); | ||
| } | ||
|
|
||
| @Override | ||
| public WriteResult complete() throws IOException { | ||
| WriteResult baseResult = super.complete(); | ||
| List<DeleteFile> allDeleteFiles = Lists.newArrayList(baseResult.deleteFiles()); | ||
|
|
||
| if (useDeletionVectors && deltaWriter instanceof V3DVWriter) { | ||
| allDeleteFiles.addAll(((V3DVWriter) deltaWriter).getDvDeleteFiles()); | ||
| } | ||
|
|
||
| return WriteResult.builder() | ||
| .addDataFiles(baseResult.dataFiles()) | ||
| .addDeleteFiles(allDeleteFiles) | ||
| .addReferencedDataFiles(baseResult.referencedDataFiles()) | ||
| .build(); | ||
| } | ||
|
|
||
| @Override | ||
| public void close() throws IOException { | ||
| deltaWriter.close(); | ||
| } | ||
|
|
||
| /** V2 equality delta writer - uses base class position delete handling. */ | ||
| private class V2EqualityDeltaWriter extends BaseEqualityDeltaWriter { | ||
| V2EqualityDeltaWriter(PartitionKey partition, Schema schema, Schema eqDeleteSchema) { | ||
| super(partition, schema, eqDeleteSchema); | ||
| } | ||
|
|
||
| @Override | ||
| protected StructLike asStructLike(Record record) { | ||
| return record; | ||
| } | ||
|
|
||
| @Override | ||
| protected StructLike asStructLikeKey(Record record) { | ||
| return record; | ||
| } | ||
| } | ||
|
|
||
| /** V3 writer using DVFileWriter for position deletes instead of base class handling. */ | ||
| private class V3DVWriter extends BaseEqualityDeltaWriter { | ||
| private final DVFileWriter dvWriter; | ||
| private final StructProjection keyProjection; | ||
| private final PartitionSpec spec; | ||
| private final StructLike partition; | ||
| private final RollingFileWriter dataWriter; | ||
| private Map<StructLike, PathOffset> positionMap; | ||
| private DeleteWriteResult dvResult; | ||
|
|
||
| V3DVWriter( | ||
| PartitionKey partitionKey, | ||
| Schema schema, | ||
| Schema eqDeleteSchema, | ||
| PartitionSpec spec, | ||
| OutputFileFactory fileFactory) { | ||
| super(partitionKey, schema, eqDeleteSchema); | ||
| this.keyProjection = StructProjection.create(schema, eqDeleteSchema); | ||
| this.spec = spec; | ||
| this.partition = partitionKey; | ||
| this.dataWriter = new RollingFileWriter(partitionKey); | ||
| this.positionMap = StructLikeMap.create(eqDeleteSchema.asStruct()); | ||
|
|
||
| Function<String, PositionDeleteIndex> loadPreviousDeletes = path -> null; | ||
| this.dvWriter = new BaseDVFileWriter(fileFactory, loadPreviousDeletes); | ||
| } | ||
|
|
||
| @Override | ||
| public void write(Record row) throws IOException { | ||
| CharSequence path = dataWriter.currentPath(); | ||
| long pos = dataWriter.currentRows(); | ||
| StructLike key = StructLikeUtil.copy(keyProjection.wrap(row)); | ||
|
|
||
| PathOffset previous = positionMap.put(key, new PathOffset(path, pos)); | ||
| if (previous != null) { | ||
| dvWriter.delete(previous.path.toString(), previous.rowOffset, spec, partition); | ||
| } | ||
| dataWriter.write(row); | ||
| } | ||
|
|
||
| @Override | ||
| protected StructLike asStructLike(Record record) { | ||
| return record; | ||
| } | ||
|
|
||
| @Override | ||
| protected StructLike asStructLikeKey(Record record) { | ||
| return record; | ||
| } | ||
|
|
||
| List<DeleteFile> getDvDeleteFiles() { | ||
| return dvResult != null ? dvResult.deleteFiles() : Lists.newArrayList(); | ||
| } | ||
|
|
||
| @Override | ||
| public void close() throws IOException { | ||
| if (dataWriter != null) { | ||
| dataWriter.close(); | ||
| } | ||
| if (dvWriter != null) { | ||
| dvWriter.close(); | ||
| dvResult = dvWriter.result(); | ||
| } | ||
| if (positionMap != null) { | ||
| positionMap.clear(); | ||
| positionMap = null; | ||
| } | ||
| } | ||
|
|
||
| private class PathOffset { | ||
| final CharSequence path; | ||
| final long rowOffset; | ||
|
|
||
| PathOffset(CharSequence path, long rowOffset) { | ||
| this.path = path; | ||
| this.rowOffset = rowOffset; | ||
| } | ||
| } | ||
| } | ||
| } |
49 changes: 49 additions & 0 deletions
49
...ake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/version/FormatVersionManager.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,49 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.fluss.lake.iceberg.version; | ||
|
|
||
| import org.apache.iceberg.HasTableOperations; | ||
| import org.apache.iceberg.Table; | ||
|
|
||
| /** Manages Iceberg format version detection and V3 feature gating. */ | ||
| public final class FormatVersionManager { | ||
|
|
||
| public static final int DEFAULT_FORMAT_VERSION = 2; | ||
| public static final int V3_FORMAT_VERSION = 3; | ||
| public static final int MIN_VERSION_FOR_DELETION_VECTORS = 3; | ||
|
|
||
| private FormatVersionManager() {} | ||
|
|
||
| /** Detects format version from table metadata (not properties, as it's reserved). */ | ||
| public static int detectFormatVersion(Table table) { | ||
| if (table instanceof HasTableOperations) { | ||
| return ((HasTableOperations) table).operations().current().formatVersion(); | ||
| } | ||
| return DEFAULT_FORMAT_VERSION; | ||
| } | ||
|
|
||
| /** Checks if the table supports deletion vectors (V3+). */ | ||
| public static boolean supportsDeletionVectors(Table table) { | ||
| return detectFormatVersion(table) >= MIN_VERSION_FOR_DELETION_VECTORS; | ||
| } | ||
|
|
||
| /** Checks if the given format version is V3 or higher. */ | ||
| public static boolean isV3OrHigher(int formatVersion) { | ||
| return formatVersion >= V3_FORMAT_VERSION; | ||
| } | ||
| } | ||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this won't reliably work across all cases unfortunately. However, we have
TableUtil.formatVersionin Iceberg, which you should be able to use: https://github.com/apache/iceberg/blob/829ae7a11dc1eb62246c801ce1c7e501356c5463/core/src/main/java/org/apache/iceberg/TableUtil.java#L27There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @nastra this PR is drafted/closed so I would suggest not to review, as we will be creating a FIP first and getting approach consensus then moving forward with the development.
Thank you!