Skip to content

Commit f9ba55e

Browse files
feat(#286): add serializers for ZoneId, ZoneOffset, ZonedDateTime and OffsetDateTime
1 parent 0e78de9 commit f9ba55e

20 files changed

+318
-31
lines changed

modules/flink-common-api/src/main/scala/org/apache/flinkx/api/package.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package org.apache.flinkx
22

33
import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
4+
import org.apache.flink.api.common.typeutils.TypeSerializer
5+
import org.apache.flink.api.java.typeutils.runtime.NullableSerializer
46

57
package object api {
68

@@ -13,4 +15,18 @@ package object api {
1315
/** Documentation of [[TypeInformation#getTotalFields()]] states the total number of fields must be at least 1. */
1416
private[api] val MinimumTotalFields: Int = 1
1517

18+
/** Documentation of [[TypeSerializer#getLength()]] states data type with variable length must return `-1`. */
19+
private[api] val VariableLengthDataType: Int = -1
20+
21+
/** Mark a null value in the stream of serialized data. It is validly used only when these conditions are met:
22+
* - Used in both serialize and deserialize methods of the serializer.
23+
* - The range of actual data doesn't include [[Int.MinValue]], i.e., the size of a collection can be only >= 0.
24+
* - The actual data written in the stream is an Int: the first data to deserialize must be an Int for both null
25+
* and non-null cases.
26+
*
27+
* If one of these conditions is not met, consider using another marker or wrap your serializer into a
28+
* [[NullableSerializer]].
29+
*/
30+
private[api] val NullMarker: Int = Int.MinValue
31+
1632
}

modules/flink-common-api/src/main/scala/org/apache/flinkx/api/serializer/ArraySerializer.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package org.apache.flinkx.api.serializer
22

33
import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeSerializerSnapshot}
44
import org.apache.flink.core.memory.{DataInputView, DataOutputView}
5+
import org.apache.flinkx.api.VariableLengthDataType
56

67
import scala.reflect.ClassTag
78

@@ -38,7 +39,7 @@ class ArraySerializer[T](val child: TypeSerializer[T], clazz: Class[T]) extends
3839
}
3940
}
4041

41-
override def getLength: Int = -1
42+
override def getLength: Int = VariableLengthDataType
4243

4344
override def deserialize(source: DataInputView): Array[T] = {
4445
val length = source.readInt()

modules/flink-common-api/src/main/scala/org/apache/flinkx/api/serializer/CoproductSerializer.scala

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package org.apache.flinkx.api.serializer
33
import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeSerializerSchemaCompatibility, TypeSerializerSnapshot}
44
import org.apache.flink.core.memory.{DataInputView, DataOutputView}
55
import org.apache.flink.util.InstantiationUtil
6+
import org.apache.flinkx.api.VariableLengthDataType
67
import org.apache.flinkx.api.serializer.CoproductSerializer.CoproductSerializerSnapshot
78

89
class CoproductSerializer[T](subtypeClasses: Array[Class[_]], subtypeSerializers: Array[TypeSerializer[_]])
@@ -32,7 +33,14 @@ class CoproductSerializer[T](subtypeClasses: Array[Class[_]], subtypeSerializers
3233
// this one may be used for later reuse, but we never reuse coproducts due to their unclear concrete type
3334
subtypeSerializers.head.createInstance().asInstanceOf[T]
3435

35-
override def getLength: Int = -1
36+
override val getLength: Int = {
37+
val length = subtypeSerializers(0).getLength
38+
if (subtypeSerializers.forall(_.getLength == length)) {
39+
length
40+
} else {
41+
VariableLengthDataType
42+
}
43+
}
3644

3745
override def serialize(record: T, target: DataOutputView): Unit = {
3846
var subtypeIndex = 0

modules/flink-common-api/src/main/scala/org/apache/flinkx/api/serializer/EitherSerializer.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package org.apache.flinkx.api.serializer
2020
import org.apache.flink.annotation.Internal
2121
import org.apache.flink.api.common.typeutils._
2222
import org.apache.flink.core.memory.{DataInputView, DataOutputView}
23+
import org.apache.flinkx.api.VariableLengthDataType
2324

2425
/** Serializer for [[Either]]. Copied from Flink 1.14.
2526
*/
@@ -50,7 +51,12 @@ class EitherSerializer[A, B](
5051
(rightSerializer == null || rightSerializer.isImmutableType)
5152
}
5253

53-
override def getLength: Int = -1
54+
override val getLength: Int =
55+
if (leftSerializer.getLength == rightSerializer.getLength) {
56+
leftSerializer.getLength
57+
} else {
58+
VariableLengthDataType
59+
}
5460

5561
override def copy(from: Either[A, B]): Either[A, B] = from match {
5662
case Left(a) => if (leftSerializer.isImmutableType) from else Left(leftSerializer.copy(a))

modules/flink-common-api/src/main/scala/org/apache/flinkx/api/serializer/ListCCSerializer.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package org.apache.flinkx.api.serializer
22

33
import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeSerializerSnapshot}
44
import org.apache.flink.core.memory.{DataInputView, DataOutputView}
5+
import org.apache.flinkx.api.VariableLengthDataType
56

67
class ListCCSerializer[T](child: TypeSerializer[T], clazz: Class[T]) extends MutableSerializer[::[T]] {
78

@@ -26,7 +27,7 @@ class ListCCSerializer[T](child: TypeSerializer[T], clazz: Class[T]) extends Mut
2627
}
2728

2829
override def createInstance(): ::[T] = throw new IllegalArgumentException("cannot create instance of non-empty list")
29-
override def getLength: Int = -1
30+
override def getLength: Int = VariableLengthDataType
3031
override def deserialize(source: DataInputView): ::[T] = {
3132
val count = source.readInt()
3233
val result = (0 until count)

modules/flink-common-api/src/main/scala/org/apache/flinkx/api/serializer/ListSerializer.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package org.apache.flinkx.api.serializer
22

33
import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeSerializerSnapshot}
44
import org.apache.flink.core.memory.{DataInputView, DataOutputView}
5+
import org.apache.flinkx.api.VariableLengthDataType
56

67
class ListSerializer[T](child: TypeSerializer[T], clazz: Class[T]) extends MutableSerializer[List[T]] {
78

@@ -25,7 +26,7 @@ class ListSerializer[T](child: TypeSerializer[T], clazz: Class[T]) extends Mutab
2526
}
2627

2728
override def createInstance(): List[T] = List.empty[T]
28-
override def getLength: Int = -1
29+
override def getLength: Int = VariableLengthDataType
2930
override def deserialize(source: DataInputView): List[T] = {
3031
var remaining = source.readInt()
3132
val builder = List.newBuilder[T]

modules/flink-common-api/src/main/scala/org/apache/flinkx/api/serializer/MapSerializer.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package org.apache.flinkx.api.serializer
33
import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeSerializerSchemaCompatibility, TypeSerializerSnapshot}
44
import org.apache.flink.core.memory.{DataInputView, DataOutputView}
55
import org.apache.flink.util.InstantiationUtil
6+
import org.apache.flinkx.api.VariableLengthDataType
67
import org.apache.flinkx.api.serializer.MapSerializer._
78

89
class MapSerializer[K, V](ks: TypeSerializer[K], vs: TypeSerializer[V]) extends MutableSerializer[Map[K, V]] {
@@ -28,7 +29,7 @@ class MapSerializer[K, V](ks: TypeSerializer[K], vs: TypeSerializer[V]) extends
2829
}
2930

3031
override def createInstance(): Map[K, V] = Map.empty[K, V]
31-
override def getLength: Int = -1
32+
override def getLength: Int = VariableLengthDataType
3233
override def deserialize(source: DataInputView): Map[K, V] = {
3334
var remaining = source.readInt()
3435
val builder = Map.newBuilder[K, V]

modules/flink-common-api/src/main/scala/org/apache/flinkx/api/serializer/NothingSerializer.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,10 @@
1818
package org.apache.flinkx.api.serializer
1919

2020
import java.util.function.Supplier
21-
2221
import org.apache.flink.annotation.Internal
2322
import org.apache.flink.api.common.typeutils.{SimpleTypeSerializerSnapshot, TypeSerializer, TypeSerializerSnapshot}
2423
import org.apache.flink.core.memory.{DataInputView, DataOutputView}
24+
import org.apache.flinkx.api.VariableLengthDataType
2525

2626
/** Serializer for cases where no serializer is required but the system still expects one. This happens for
2727
* OptionTypeInfo when None is used, or for Either when one of the type parameters is Nothing.
@@ -35,7 +35,7 @@ class NothingSerializer extends ImmutableSerializer[Any] {
3535
Integer.valueOf(-1)
3636
}
3737

38-
override def getLength: Int = -1
38+
override def getLength: Int = VariableLengthDataType
3939

4040
override def copy(from: Any): Any =
4141
throw new RuntimeException("This must not be used. You encountered a bug.")

modules/flink-common-api/src/main/scala/org/apache/flinkx/api/serializer/OptionSerializer.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package org.apache.flinkx.api.serializer
2020
import org.apache.flink.annotation.Internal
2121
import org.apache.flink.api.common.typeutils._
2222
import org.apache.flink.core.memory.{DataInputView, DataOutputView}
23+
import org.apache.flinkx.api.VariableLengthDataType
2324

2425
/** Serializer for [[Option]].
2526
*/
@@ -41,7 +42,7 @@ class OptionSerializer[A](val elemSerializer: TypeSerializer[A]) extends Mutable
4142

4243
override val isImmutableType: Boolean = elemSerializer == null || elemSerializer.isImmutableType
4344

44-
override def getLength: Int = -1
45+
override def getLength: Int = VariableLengthDataType
4546

4647
override def copy(from: Option[A]): Option[A] = from match {
4748
case Some(a) => if (isImmutableType) from else Some(elemSerializer.copy(a))

modules/flink-common-api/src/main/scala/org/apache/flinkx/api/serializer/SeqSerializer.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@ package org.apache.flinkx.api.serializer
22

33
import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeSerializerSnapshot}
44
import org.apache.flink.core.memory.{DataInputView, DataOutputView}
5+
import org.apache.flinkx.api.VariableLengthDataType
56

6-
import scala.collection.immutable
77
import scala.collection.immutable.ArraySeq
88
import scala.reflect.ClassTag
99

@@ -31,7 +31,7 @@ class SeqSerializer[T](child: TypeSerializer[T], clazz: Class[T]) extends Mutabl
3131
}
3232

3333
override def createInstance(): Seq[T] = Seq.empty[T]
34-
override def getLength: Int = -1
34+
override def getLength: Int = VariableLengthDataType
3535
override def deserialize(source: DataInputView): Seq[T] = {
3636
val length = source.readInt()
3737
val array = new Array[T](length)

0 commit comments

Comments
 (0)