Skip to content

Commit 0053b41

Browse files
authored
[FLINK-37643] Support partial deletes when converting to external data structures (apache#26436)
1 parent 98000fb commit 0053b41

File tree

4 files changed

+134
-7
lines changed

4 files changed

+134
-7
lines changed

flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/ArrayData.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -215,9 +215,6 @@ static ElementGetter createElementGetter(LogicalType elementType) {
215215
default:
216216
throw new IllegalArgumentException();
217217
}
218-
if (!elementType.isNullable()) {
219-
return elementGetter;
220-
}
221218
return (array, pos) -> {
222219
if (array.isNullAt(pos)) {
223220
return null;
@@ -233,6 +230,12 @@ static ElementGetter createElementGetter(LogicalType elementType) {
233230
*/
234231
@PublicEvolving
235232
interface ElementGetter extends Serializable {
233+
234+
/**
235+
* Converters and serializers always support nullability. The NOT NULL constraint is only
236+
* considered on SQL semantic level but not data transfer. E.g. partial deletes (i.e.
237+
* key-only upserts) set all non-key fields to null, regardless of logical type.
238+
*/
236239
@Nullable
237240
Object getElementOrNull(ArrayData array, int pos);
238241
}

flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/RowData.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -287,9 +287,6 @@ static FieldGetter createFieldGetter(LogicalType fieldType, int fieldPos) {
287287
default:
288288
throw new IllegalArgumentException();
289289
}
290-
if (!fieldType.isNullable()) {
291-
return fieldGetter;
292-
}
293290
return row -> {
294291
if (row.isNullAt(fieldPos)) {
295292
return null;
@@ -305,6 +302,11 @@ static FieldGetter createFieldGetter(LogicalType fieldType, int fieldPos) {
305302
*/
306303
@PublicEvolving
307304
interface FieldGetter extends Serializable {
305+
/**
306+
* Converters and serializers always support nullability. The NOT NULL constraint is only
307+
* considered on SQL semantic level but not data transfer. E.g. partial deletes (i.e.
308+
* key-only upserts) set all non-key fields to null, regardless of logical type.
309+
*/
308310
@Nullable
309311
Object getFieldOrNull(RowData row);
310312
}

flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/data/DataStructureConvertersTest.java

Lines changed: 63 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -355,7 +355,69 @@ static List<TestSpec> testData() {
355355
TestSpec.forDataType(
356356
DataTypes.STRUCTURED(GenericPojo.class, FIELD("value", DATE())))
357357
.convertedTo(
358-
GenericPojo.class, new GenericPojo<>(LocalDate.ofEpochDay(123))));
358+
GenericPojo.class, new GenericPojo<>(LocalDate.ofEpochDay(123))),
359+
360+
// partial delete messages
361+
TestSpec.forDataType(
362+
DataTypes.ROW(
363+
DataTypes.FIELD("f0", DataTypes.TINYINT().notNull()),
364+
DataTypes.FIELD("f1", DataTypes.SMALLINT().notNull()),
365+
DataTypes.FIELD("f2", DataTypes.INT().notNull()),
366+
DataTypes.FIELD("f3", DataTypes.BIGINT().notNull()),
367+
DataTypes.FIELD("f4", DataTypes.DOUBLE().notNull()),
368+
DataTypes.FIELD("f5", DataTypes.FLOAT().notNull()),
369+
DataTypes.FIELD("f6", DataTypes.DATE().notNull()),
370+
DataTypes.FIELD("f7", DataTypes.BINARY(12).notNull()),
371+
DataTypes.FIELD("f8", DataTypes.VARBINARY(12).notNull()),
372+
DataTypes.FIELD("f9", DataTypes.CHAR(12).notNull()),
373+
DataTypes.FIELD("f10", DataTypes.VARCHAR(12).notNull()),
374+
DataTypes.FIELD("f11", DataTypes.BOOLEAN().notNull()),
375+
DataTypes.FIELD("f12", DataTypes.TIME().notNull()),
376+
DataTypes.FIELD("f13", DataTypes.TIMESTAMP().notNull()),
377+
DataTypes.FIELD("f14", DataTypes.TIMESTAMP_LTZ().notNull()),
378+
DataTypes.FIELD("f15", DataTypes.DECIMAL(10, 2).notNull()),
379+
DataTypes.FIELD(
380+
"f16",
381+
DataTypes.MAP(
382+
DataTypes.INT().notNull(),
383+
DataTypes.STRING().notNull())
384+
.notNull()),
385+
DataTypes.FIELD(
386+
"f17",
387+
DataTypes.ARRAY(DataTypes.INT().notNull())
388+
.notNull()),
389+
DataTypes.FIELD(
390+
"f18",
391+
DataTypes.ARRAY(DataTypes.INT().notNull())
392+
.notNull()),
393+
DataTypes.FIELD(
394+
"f19",
395+
DataTypes.MULTISET(DataTypes.INT().notNull())
396+
.notNull())))
397+
.convertedTo(
398+
Row.class,
399+
Row.ofKind(
400+
RowKind.DELETE,
401+
null,
402+
null,
403+
null,
404+
null,
405+
null,
406+
null,
407+
null,
408+
null,
409+
null,
410+
null,
411+
null,
412+
null,
413+
null,
414+
null,
415+
null,
416+
null,
417+
null,
418+
null,
419+
new Integer[] {null, null},
420+
null)));
359421
}
360422

361423
@ParameterizedTest(name = "{index}: {0}")

flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/typeutils/RowDataSerializerTest.java

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,21 @@
3333
import org.apache.flink.table.data.writer.BinaryArrayWriter;
3434
import org.apache.flink.table.types.DataType;
3535
import org.apache.flink.table.types.logical.ArrayType;
36+
import org.apache.flink.table.types.logical.BigIntType;
37+
import org.apache.flink.table.types.logical.BinaryType;
38+
import org.apache.flink.table.types.logical.CharType;
39+
import org.apache.flink.table.types.logical.DateType;
40+
import org.apache.flink.table.types.logical.DayTimeIntervalType;
41+
import org.apache.flink.table.types.logical.DecimalType;
3642
import org.apache.flink.table.types.logical.DoubleType;
43+
import org.apache.flink.table.types.logical.FloatType;
3744
import org.apache.flink.table.types.logical.IntType;
45+
import org.apache.flink.table.types.logical.LocalZonedTimestampType;
3846
import org.apache.flink.table.types.logical.LogicalType;
3947
import org.apache.flink.table.types.logical.MapType;
4048
import org.apache.flink.table.types.logical.RawType;
49+
import org.apache.flink.table.types.logical.SmallIntType;
50+
import org.apache.flink.table.types.logical.VarBinaryType;
4151
import org.apache.flink.table.types.logical.VarCharType;
4252
import org.apache.flink.testutils.DeeplyEqualsChecker;
4353

@@ -398,4 +408,54 @@ private static RowDataSerializer getRowSerializer() {
398408
InternalSerializers.<RowData>create(NESTED_DATA_TYPE.getLogicalType());
399409
}
400410
}
411+
412+
/**
413+
* Converters and serializers always support nullability. The NOT NULL constraint is only
414+
* considered on SQL semantic level but not data transfer. E.g. partial deletes (i.e. key-only
415+
* upserts) set all non-key fields to null, regardless of logical type.
416+
*/
417+
static final class RowDataSerializerWithNullForNotNullTypeTest extends RowDataSerializerTest {
418+
public RowDataSerializerWithNullForNotNullTypeTest() {
419+
super(getRowSerializer(), getData());
420+
}
421+
422+
private static RowData[] getData() {
423+
GenericRowData row = new GenericRowData(13);
424+
row.setField(0, 2);
425+
row.setField(1, null);
426+
row.setField(3, null);
427+
row.setField(4, null);
428+
row.setField(5, null);
429+
row.setField(6, null);
430+
row.setField(7, null);
431+
row.setField(8, null);
432+
row.setField(9, null);
433+
row.setField(10, null);
434+
row.setField(11, null);
435+
row.setField(12, null);
436+
437+
return new RowData[] {row};
438+
}
439+
440+
private static RowDataSerializer getRowSerializer() {
441+
InternalTypeInfo<RowData> typeInfo =
442+
InternalTypeInfo.ofFields(
443+
new IntType(false),
444+
new SmallIntType(false),
445+
new BigIntType(false),
446+
new VarCharType(false, VarCharType.MAX_LENGTH),
447+
new CharType(false, CharType.MAX_LENGTH),
448+
new BinaryType(false, BinaryType.MAX_LENGTH),
449+
new VarBinaryType(false, VarBinaryType.MAX_LENGTH),
450+
new DateType(false),
451+
new DayTimeIntervalType(
452+
false, DayTimeIntervalType.DayTimeResolution.DAY, 1, 6),
453+
new DecimalType(false, 10, 2),
454+
new FloatType(false),
455+
new DoubleType(false),
456+
new LocalZonedTimestampType(false, 3));
457+
458+
return typeInfo.toRowSerializer();
459+
}
460+
}
401461
}

0 commit comments

Comments
 (0)