Skip to content

Commit b5e2b24

Browse files
committed
Add support for random prefix with deletion vector in Delta Lake
1 parent 8899167 commit b5e2b24

File tree

8 files changed

+81
-6
lines changed

8 files changed

+81
-6
lines changed

plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMergeSink.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,7 @@ public class DeltaLakeMergeSink
130130
private final ParquetReaderOptions parquetReaderOptions;
131131
private final boolean deletionVectorEnabled;
132132
private final Map<String, DeletionVectorEntry> deletionVectors;
133+
private final int randomPrefixLength;
133134

134135
@Nullable
135136
private DeltaLakeCdfPageSink cdfPageSink;
@@ -153,7 +154,8 @@ public DeltaLakeMergeSink(
153154
ParquetReaderOptions parquetReaderOptions,
154155
FileFormatDataSourceStats fileFormatDataSourceStats,
155156
boolean deletionVectorEnabled,
156-
Map<String, DeletionVectorEntry> deletionVectors)
157+
Map<String, DeletionVectorEntry> deletionVectors,
158+
int randomPrefixLength)
157159
{
158160
this.typeOperators = requireNonNull(typeOperators, "typeOperators is null");
159161
this.session = requireNonNull(session, "session is null");
@@ -181,6 +183,7 @@ public DeltaLakeMergeSink(
181183
this.parquetReaderOptions = requireNonNull(parquetReaderOptions, "parquetReaderOptions is null");
182184
this.deletionVectorEnabled = deletionVectorEnabled;
183185
this.deletionVectors = ImmutableMap.copyOf(requireNonNull(deletionVectors, "deletionVectors is null"));
186+
this.randomPrefixLength = randomPrefixLength;
184187
dataColumnsIndices = new int[tableColumnCount];
185188
dataAndRowIdColumnsIndices = new int[tableColumnCount + 1];
186189
for (int i = 0; i < tableColumnCount; i++) {
@@ -408,7 +411,7 @@ private Slice writeDeletionVector(
408411

409412
DeletionVectorEntry deletionVectorEntry;
410413
try {
411-
deletionVectorEntry = writeDeletionVectors(fileSystem, rootTableLocation, deletedRows);
414+
deletionVectorEntry = writeDeletionVectors(fileSystem, rootTableLocation, deletedRows, randomPrefixLength);
412415
}
413416
catch (IOException e) {
414417
throw new TrinoException(DELTA_LAKE_BAD_WRITE, "Unable to write deletion vector file", e);

plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSinkProvider.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
import static io.trino.plugin.deltalake.DeltaLakeParquetSchemas.createParquetSchemaMapping;
5555
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.changeDataFeedEnabled;
5656
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.extractSchema;
57+
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.getRandomPrefixLength;
5758
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.isDeletionVectorEnabled;
5859
import static io.trino.spi.type.VarcharType.VARCHAR;
5960
import static java.util.Objects.requireNonNull;
@@ -199,7 +200,8 @@ public ConnectorMergeSink createMergeSink(ConnectorTransactionHandle transaction
199200
parquetReaderOptions,
200201
fileFormatDataSourceStats,
201202
isDeletionVectorEnabled(tableHandle.metadataEntry(), tableHandle.protocolEntry()),
202-
merge.deletionVectors());
203+
merge.deletionVectors(),
204+
getRandomPrefixLength(tableHandle.metadataEntry()));
203205
}
204206

205207
private DeltaLakeCdfPageSink createCdfPageSink(

plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/delete/DeletionVectors.java

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import java.nio.ByteBuffer;
2929
import java.util.OptionalInt;
3030
import java.util.UUID;
31+
import java.util.concurrent.ThreadLocalRandom;
3132
import java.util.zip.CRC32;
3233
import java.util.zip.Checksum;
3334

@@ -75,12 +76,18 @@ public static RoaringBitmapArray readDeletionVectors(TrinoFileSystem fileSystem,
7576
public static DeletionVectorEntry writeDeletionVectors(
7677
TrinoFileSystem fileSystem,
7778
Location location,
78-
RoaringBitmapArray deletedRows)
79+
RoaringBitmapArray deletedRows,
80+
int randomPrefixLength)
7981
throws IOException
8082
{
8183
UUID uuid = randomUUID();
82-
String deletionVectorFilename = "deletion_vector_" + uuid + ".bin";
8384
String pathOrInlineDv = encodeUUID(uuid);
85+
String deletionVectorFilename = "deletion_vector_" + uuid + ".bin";
86+
if (randomPrefixLength > 0) {
87+
String randomPrefix = randomPrefix(randomPrefixLength);
88+
pathOrInlineDv = randomPrefix + pathOrInlineDv;
89+
location = location.appendPath(randomPrefix);
90+
}
8491
int sizeInBytes = MAGIC_NUMBER_BYTE_SIZE + BIT_MAP_COUNT_BYTE_SIZE + BIT_MAP_KEY_BYTE_SIZE + deletedRows.serializedSizeInBytes();
8592
long cardinality = deletedRows.cardinality();
8693

@@ -100,6 +107,16 @@ public static DeletionVectorEntry writeDeletionVectors(
100107
return new DeletionVectorEntry(UUID_MARKER, pathOrInlineDv, offset, sizeInBytes, cardinality);
101108
}
102109

110+
private static String randomPrefix(int length)
111+
{
112+
String alphanumeric = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789";
113+
StringBuilder prefix = new StringBuilder(length);
114+
for (int i = 0; i < length; i++) {
115+
prefix.append(alphanumeric.charAt(ThreadLocalRandom.current().nextInt(alphanumeric.length())));
116+
}
117+
return prefix.toString();
118+
}
119+
103120
private static byte[] serializeAsByteArray(RoaringBitmapArray bitmaps, int sizeInBytes)
104121
{
105122
ByteBuffer buffer = ByteBuffer.allocate(sizeInBytes).order(LITTLE_ENDIAN);

plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/DeltaLakeSchemaSupport.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,18 @@ public static boolean isDeletionVectorEnabled(MetadataEntry metadataEntry, Proto
204204
return parseBoolean(metadataEntry.getConfiguration().get(DELETION_VECTORS_CONFIGURATION_KEY));
205205
}
206206

207+
public static int getRandomPrefixLength(MetadataEntry metadataEntry)
208+
{
209+
boolean randomizeFilePrefixes = parseBoolean(metadataEntry.getConfiguration().get("delta.randomizeFilePrefixes"));
210+
if (randomizeFilePrefixes) {
211+
// 2 is the default value in Delta Lake
212+
int randomPrefixLength = Integer.parseInt(metadataEntry.getConfiguration().getOrDefault("delta.randomPrefixLength", "2"));
213+
checkArgument(randomPrefixLength >= 0, "randomPrefixLength must be >= 0: %s", randomPrefixLength);
214+
return randomPrefixLength;
215+
}
216+
return 0;
217+
}
218+
207219
public static List<String> enabledUniversalFormats(MetadataEntry metadataEntry)
208220
{
209221
String formats = metadataEntry.getConfiguration().get(UNIVERSAL_FORMAT_CONFIGURATION_KEY);

plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeBasic.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1243,6 +1243,26 @@ public void testDeletionVectorsCheckPoint()
12431243
assertUpdate("DROP TABLE " + tableName);
12441244
}
12451245

1246+
@Test
1247+
public void testDeletionVectorsRandomPrefix()
1248+
throws Exception
1249+
{
1250+
String tableName = "deletion_vectors_random_prefix" + randomNameSuffix();
1251+
1252+
Path tableLocation = catalogDir.resolve(tableName);
1253+
copyDirectoryContents(new File(Resources.getResource("deltalake/deletion_vector_random_prefix").toURI()).toPath(), tableLocation);
1254+
assertUpdate("CALL system.register_table('%s', '%s', '%s')".formatted(getSession().getSchema().orElseThrow(), tableName, tableLocation.toUri()));
1255+
1256+
assertUpdate("INSERT INTO " + tableName + " VALUES (1, 10), (2, 20), (3, 30)", 3);
1257+
assertUpdate("DELETE FROM " + tableName + " WHERE a = 1", 1);
1258+
assertQuery("SELECT * FROM " + tableName, "VALUES (2, 20), (3, 30)");
1259+
1260+
assertThat(fileNamesIn(new URI(tableLocation.toString()).getPath(), true))
1261+
.anyMatch(path -> path.matches(tableLocation + "/[a-zA-Z0-9_]{3}/deletion_vector_[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}.bin"));
1262+
1263+
assertUpdate("DROP TABLE " + tableName);
1264+
}
1265+
12461266
@Test
12471267
public void testUnsupportedVacuumDeletionVectors()
12481268
throws Exception
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
Data generated using OSS Delta Lake 3.2.0:
2+
3+
```sql
4+
CREATE TABLE tpch.test_dv_random_prefix
5+
(a int, b int)
6+
USING delta
7+
TBLPROPERTIES(
8+
'delta.enableDeletionVectors' = true,
9+
'delta.randomizeFilePrefixes' = true,
10+
'delta.randomPrefixLength' = 3
11+
);
12+
```
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
{"commitInfo":{"timestamp":1730875806173,"operation":"CREATE TABLE","operationParameters":{"partitionBy":"[]","clusterBy":"[]","description":null,"isManaged":"true","properties":"{\"delta.randomPrefixLength\":\"3\",\"delta.randomizeFilePrefixes\":\"true\",\"delta.enableDeletionVectors\":\"true\"}"},"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{},"engineInfo":"Apache-Spark/3.5.0 Delta-Lake/3.2.0","txnId":"6723f2fb-7f22-402a-ac00-f2ffd838136e"}}
2+
{"metaData":{"id":"20f880b6-d9f7-4cd2-a21d-1a33b0b445f1","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"a\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"b\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{"delta.randomPrefixLength":"3","delta.randomizeFilePrefixes":"true","delta.enableDeletionVectors":"true"},"createdTime":1730875806156}}
3+
{"protocol":{"minReaderVersion":3,"minWriterVersion":7,"readerFeatures":["deletionVectors"],"writerFeatures":["deletionVectors"]}}

testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDeleteCompatibility.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -307,9 +307,15 @@ public void testDeletionVectorsWithRandomPrefix()
307307
"LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + tableName + "' " +
308308
"TBLPROPERTIES ('delta.enableDeletionVectors' = true, 'delta.randomizeFilePrefixes' = true)");
309309
try {
310-
onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES (1, 11), (2, 22)");
310+
onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES (1, 11), (2, 22), (3, 33)");
311+
311312
onDelta().executeQuery("DELETE FROM default." + tableName + " WHERE a = 2");
313+
assertThat(onDelta().executeQuery("SELECT * FROM default." + tableName))
314+
.containsOnly(row(1, 11), row(3, 33));
315+
assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + tableName))
316+
.containsOnly(row(1, 11), row(3, 33));
312317

318+
onTrino().executeQuery("DELETE FROM delta.default." + tableName + " WHERE a = 3");
313319
assertThat(onDelta().executeQuery("SELECT * FROM default." + tableName))
314320
.containsOnly(row(1, 11));
315321
assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + tableName))

0 commit comments

Comments
 (0)