Skip to content

Commit 096d326

Browse files
[Perf] Implement copy() and use while loop in serializers (#279)
* [perf] implement specific direct-memory copy when more optimized than generic de/ser * [perf] Replace for comprehension with while loop * Improve checks on serializer: instance and binary copy Add checks on type information Add Scalatest matchers "haveTypeInfo" and beSerializable * review: replace custom Scalatest matchers with simple test functions * review: Fix checks on arity and totalFields * Rename checkBinaryCopy to checkDataViewCopy * Extract type information constants
1 parent 9ffffb4 commit 096d326

18 files changed

+435
-153
lines changed
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package org.apache.flinkx
2+
3+
import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
4+
5+
package object api {
6+
7+
/** Basic type has an arity of 1. See [[BasicTypeInfo#getArity()]] */
8+
private[api] val BasicTypeArity: Int = 1
9+
10+
/** Basic type has 1 field. See [[BasicTypeInfo#getTotalFields()]] */
11+
private[api] val BasicTypeTotalFields: Int = 1
12+
13+
/** Documentation of [[TypeInformation#getTotalFields()]] states the total number of fields must be at least 1. */
14+
private[api] val MinimumTotalFields: Int = 1
15+
16+
}

modules/flink-common-api/src/main/scala/org/apache/flinkx/api/serializer/ArraySerializer.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,15 @@ class ArraySerializer[T](val child: TypeSerializer[T], clazz: Class[T]) extends
6262
}
6363
}
6464

65+
override def copy(source: DataInputView, target: DataOutputView): Unit = {
66+
var remaining = source.readInt()
67+
target.writeInt(remaining)
68+
while (remaining > 0) {
69+
child.copy(source, target)
70+
remaining -= 1
71+
}
72+
}
73+
6574
override def snapshotConfiguration(): TypeSerializerSnapshot[Array[T]] =
6675
new CollectionSerializerSnapshot[Array, T, ArraySerializer[T]](child, classOf[ArraySerializer[T]], clazz)
6776

modules/flink-common-api/src/main/scala/org/apache/flinkx/api/serializer/CoproductSerializer.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,13 @@ class CoproductSerializer[T](subtypeClasses: Array[Class[_]], subtypeSerializers
5858
subtype.asInstanceOf[TypeSerializer[T]].deserialize(source)
5959
}
6060

61+
override def copy(source: DataInputView, target: DataOutputView): Unit = {
62+
val index = source.readByte()
63+
val subtype = subtypeSerializers(index.toInt)
64+
target.writeByte(index)
65+
subtype.asInstanceOf[TypeSerializer[T]].copy(source, target)
66+
}
67+
6168
override def snapshotConfiguration(): TypeSerializerSnapshot[T] =
6269
new CoproductSerializerSnapshot(subtypeClasses, subtypeSerializers)
6370
}

modules/flink-common-api/src/main/scala/org/apache/flinkx/api/serializer/ListCCSerializer.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,15 @@ class ListCCSerializer[T](child: TypeSerializer[T], clazz: Class[T]) extends Mut
3939
record.foreach(element => child.serialize(element, target))
4040
}
4141

42+
override def copy(source: DataInputView, target: DataOutputView): Unit = {
43+
var remaining = source.readInt()
44+
target.writeInt(remaining)
45+
while (remaining > 0) {
46+
child.copy(source, target)
47+
remaining -= 1
48+
}
49+
}
50+
4251
override def snapshotConfiguration(): TypeSerializerSnapshot[::[T]] =
4352
new CollectionSerializerSnapshot[::, T, ListCCSerializer[T]](child, classOf[ListCCSerializer[T]], clazz)
4453

modules/flink-common-api/src/main/scala/org/apache/flinkx/api/serializer/ListSerializer.scala

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,19 +27,30 @@ class ListSerializer[T](child: TypeSerializer[T], clazz: Class[T]) extends Mutab
2727
override def createInstance(): List[T] = List.empty[T]
2828
override def getLength: Int = -1
2929
override def deserialize(source: DataInputView): List[T] = {
30-
val count = source.readInt()
31-
val result = for {
32-
_ <- 0 until count
33-
} yield {
34-
child.deserialize(source)
30+
var remaining = source.readInt()
31+
val builder = List.newBuilder[T]
32+
builder.sizeHint(remaining)
33+
while (remaining > 0) {
34+
builder.addOne(child.deserialize(source))
35+
remaining -= 1
3536
}
36-
result.toList
37+
builder.result()
3738
}
39+
3840
override def serialize(record: List[T], target: DataOutputView): Unit = {
3941
target.writeInt(record.size)
4042
record.foreach(element => child.serialize(element, target))
4143
}
4244

45+
override def copy(source: DataInputView, target: DataOutputView): Unit = {
46+
var remaining = source.readInt()
47+
target.writeInt(remaining)
48+
while (remaining > 0) {
49+
child.copy(source, target)
50+
remaining -= 1
51+
}
52+
}
53+
4354
override def snapshotConfiguration(): TypeSerializerSnapshot[List[T]] =
4455
new CollectionSerializerSnapshot[List, T, ListSerializer[T]](child, classOf[ListSerializer[T]], clazz)
4556

modules/flink-common-api/src/main/scala/org/apache/flinkx/api/serializer/MapSerializer.scala

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,15 +30,16 @@ class MapSerializer[K, V](ks: TypeSerializer[K], vs: TypeSerializer[V]) extends
3030
override def createInstance(): Map[K, V] = Map.empty[K, V]
3131
override def getLength: Int = -1
3232
override def deserialize(source: DataInputView): Map[K, V] = {
33-
val count = source.readInt()
34-
val result = for {
35-
_ <- 0 until count
36-
} yield {
33+
var remaining = source.readInt()
34+
val builder = Map.newBuilder[K, V]
35+
builder.sizeHint(remaining)
36+
while (remaining > 0) {
3737
val key = ks.deserialize(source)
3838
val value = vs.deserialize(source)
39-
key -> value
39+
builder.addOne(key -> value)
40+
remaining -= 1
4041
}
41-
result.toMap
42+
builder.result()
4243
}
4344
override def serialize(record: Map[K, V], target: DataOutputView): Unit = {
4445
target.writeInt(record.size)
@@ -48,6 +49,16 @@ class MapSerializer[K, V](ks: TypeSerializer[K], vs: TypeSerializer[V]) extends
4849
})
4950
}
5051

52+
override def copy(source: DataInputView, target: DataOutputView): Unit = {
53+
var remaining = source.readInt()
54+
target.writeInt(remaining)
55+
while (remaining > 0) {
56+
ks.copy(source, target)
57+
vs.copy(source, target)
58+
remaining -= 1
59+
}
60+
}
61+
5162
override def snapshotConfiguration(): TypeSerializerSnapshot[Map[K, V]] = new MapSerializerSnapshot(ks, vs)
5263
}
5364

modules/flink-common-api/src/main/scala/org/apache/flinkx/api/serializer/MappedSerializer.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@ case class MappedSerializer[A, B](mapper: TypeMapper[A, B], ser: TypeSerializer[
4949

5050
override def deserialize(source: DataInputView): A = mapper.contramap(ser.deserialize(source))
5151

52+
override def copy(source: DataInputView, target: DataOutputView): Unit = ser.copy(source, target)
53+
5254
override def snapshotConfiguration(): TypeSerializerSnapshot[A] = new MappedSerializerSnapshot[A, B](mapper, ser)
5355

5456
override def createInstance(): A = mapper.contramap(ser.createInstance())

modules/flink-common-api/src/main/scala/org/apache/flinkx/api/serializer/SeqSerializer.scala

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,14 @@ package org.apache.flinkx.api.serializer
33
import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeSerializerSnapshot}
44
import org.apache.flink.core.memory.{DataInputView, DataOutputView}
55

6+
import scala.collection.immutable
7+
import scala.collection.immutable.ArraySeq
8+
import scala.reflect.ClassTag
9+
610
class SeqSerializer[T](child: TypeSerializer[T], clazz: Class[T]) extends MutableSerializer[Seq[T]] {
711

12+
private implicit val classTag: ClassTag[T] = ClassTag(clazz)
13+
814
override val isImmutableType: Boolean = child.isImmutableType
915

1016
override def copy(from: Seq[T]): Seq[T] = {
@@ -27,19 +33,29 @@ class SeqSerializer[T](child: TypeSerializer[T], clazz: Class[T]) extends Mutabl
2733
override def createInstance(): Seq[T] = Seq.empty[T]
2834
override def getLength: Int = -1
2935
override def deserialize(source: DataInputView): Seq[T] = {
30-
val count = source.readInt()
31-
val result = for {
32-
_ <- 0 until count
33-
} yield {
34-
child.deserialize(source)
36+
val length = source.readInt()
37+
val array = new Array[T](length)
38+
var i = 0
39+
while (i < length) {
40+
array(i) = child.deserialize(source)
41+
i += 1
3542
}
36-
result
43+
ArraySeq.unsafeWrapArray(array)
3744
}
3845
override def serialize(record: Seq[T], target: DataOutputView): Unit = {
3946
target.writeInt(record.size)
4047
record.foreach(element => child.serialize(element, target))
4148
}
4249

50+
override def copy(source: DataInputView, target: DataOutputView): Unit = {
51+
var remaining = source.readInt()
52+
target.writeInt(remaining)
53+
while (remaining > 0) {
54+
child.copy(source, target)
55+
remaining -= 1
56+
}
57+
}
58+
4359
override def snapshotConfiguration(): TypeSerializerSnapshot[Seq[T]] =
4460
new CollectionSerializerSnapshot[Seq, T, SeqSerializer[T]](child, classOf[SeqSerializer[T]], clazz)
4561

modules/flink-common-api/src/main/scala/org/apache/flinkx/api/serializer/SetSerializer.scala

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,19 +27,29 @@ class SetSerializer[T](child: TypeSerializer[T], clazz: Class[T]) extends Mutabl
2727
override def createInstance(): Set[T] = Set.empty[T]
2828
override def getLength: Int = -1
2929
override def deserialize(source: DataInputView): Set[T] = {
30-
val count = source.readInt()
31-
val result = for {
32-
_ <- 0 until count
33-
} yield {
34-
child.deserialize(source)
30+
var remaining = source.readInt()
31+
val builder = Set.newBuilder[T]
32+
builder.sizeHint(remaining)
33+
while (remaining > 0) {
34+
builder.addOne(child.deserialize(source))
35+
remaining -= 1
3536
}
36-
result.toSet
37+
builder.result()
3738
}
3839
override def serialize(record: Set[T], target: DataOutputView): Unit = {
3940
target.writeInt(record.size)
4041
record.foreach(element => child.serialize(element, target))
4142
}
4243

44+
override def copy(source: DataInputView, target: DataOutputView): Unit = {
45+
var remaining = source.readInt()
46+
target.writeInt(remaining)
47+
while (remaining > 0) {
48+
child.copy(source, target)
49+
remaining -= 1
50+
}
51+
}
52+
4353
override def snapshotConfiguration(): TypeSerializerSnapshot[Set[T]] =
4454
new CollectionSerializerSnapshot[Set, T, SetSerializer[T]](child, classOf[SetSerializer[T]], clazz)
4555

modules/flink-common-api/src/main/scala/org/apache/flinkx/api/serializer/VectorSerializer.scala

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,19 +27,29 @@ class VectorSerializer[T](child: TypeSerializer[T], clazz: Class[T]) extends Mut
2727
override def createInstance(): Vector[T] = Vector.empty[T]
2828
override def getLength: Int = -1
2929
override def deserialize(source: DataInputView): Vector[T] = {
30-
val count = source.readInt()
31-
val result = for {
32-
_ <- 0 until count
33-
} yield {
34-
child.deserialize(source)
30+
var remaining = source.readInt()
31+
val builder = Vector.newBuilder[T]
32+
builder.sizeHint(remaining)
33+
while (remaining > 0) {
34+
builder.addOne(child.deserialize(source))
35+
remaining -= 1
3536
}
36-
result.toVector
37+
builder.result()
3738
}
3839
override def serialize(record: Vector[T], target: DataOutputView): Unit = {
3940
target.writeInt(record.size)
4041
record.foreach(element => child.serialize(element, target))
4142
}
4243

44+
override def copy(source: DataInputView, target: DataOutputView): Unit = {
45+
var remaining = source.readInt()
46+
target.writeInt(remaining)
47+
while (remaining > 0) {
48+
child.copy(source, target)
49+
remaining -= 1
50+
}
51+
}
52+
4353
override def snapshotConfiguration(): TypeSerializerSnapshot[Vector[T]] =
4454
new CollectionSerializerSnapshot[Vector, T, VectorSerializer[T]](child, classOf[VectorSerializer[T]], clazz)
4555

0 commit comments

Comments
 (0)