Skip to content

Commit 3ea037e

Browse files
Scala 2: Recursive type info is not supported: fail-fast with a comprehensive message instead (#280)
1 parent 321f702 commit 3ea037e

File tree

4 files changed

+53
-8
lines changed

4 files changed

+53
-8
lines changed

modules/flink-common-api/src/main/scala-2/org/apache/flinkx/api/LowPrioImplicits.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,9 @@ import magnolia1.{CaseClass, Magnolia, SealedTrait}
44
import org.apache.flink.api.common.serialization.SerializerConfig
55
import org.apache.flink.api.common.typeinfo.TypeInformation
66
import org.apache.flink.api.java.typeutils.runtime.NullableSerializer
7+
import org.apache.flink.util.FlinkRuntimeException
78
import org.apache.flinkx.api.serializer.{CaseClassSerializer, CoproductSerializer, ScalaCaseObjectSerializer, nullable}
8-
import org.apache.flinkx.api.typeinfo.{CaseClassTypeInfo, CoproductTypeInformation}
9+
import org.apache.flinkx.api.typeinfo.{CaseClassTypeInfo, CoproductTypeInformation, MarkerTypeInfo}
910
import org.apache.flinkx.api.util.ClassUtil.isCaseClassImmutable
1011

1112
import scala.collection.mutable
@@ -26,8 +27,11 @@ private[api] trait LowPrioImplicits {
2627
): TypeInformation[T] = {
2728
val cacheKey = typeName[T]
2829
cache.get(cacheKey) match {
30+
case Some(MarkerTypeInfo) =>
31+
throw new FlinkRuntimeException(s"Unsupported: recursivity detected in '$cacheKey'.")
2932
case Some(cached) => cached.asInstanceOf[TypeInformation[T]]
3033
case None =>
34+
cache.put(cacheKey, MarkerTypeInfo)
3135
val clazz = classTag[T].runtimeClass.asInstanceOf[Class[T]]
3236
val serializer = if (typeOf[T].typeSymbol.isModuleClass) {
3337
new ScalaCaseObjectSerializer[T](clazz)
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package org.apache.flinkx.api.typeinfo
2+
3+
import org.apache.flink.api.common.ExecutionConfig
4+
import org.apache.flink.api.common.serialization.SerializerConfig
5+
import org.apache.flink.api.common.typeinfo.TypeInformation
6+
import org.apache.flink.api.common.typeutils.TypeSerializer
7+
8+
case object MarkerTypeInfo extends TypeInformation[Nothing] {
9+
10+
override def createSerializer(config: SerializerConfig): TypeSerializer[Nothing] = null
11+
// override modifier removed to satisfy both implementation requirement of Flink 1.x and removal in 2.x
12+
def createSerializer(config: ExecutionConfig): TypeSerializer[Nothing] = null
13+
14+
override def isBasicType: Boolean = false
15+
override def isTupleType: Boolean = false
16+
override def isKeyType: Boolean = false
17+
override def getTotalFields: Int = 1
18+
override def getTypeClass: Class[Nothing] = classOf[Nothing]
19+
override def getArity: Int = 1
20+
override def toString: String = "MarkerTypeInfo"
21+
override def equals(obj: Any): Boolean = obj.isInstanceOf[MarkerTypeInfo.type]
22+
override def hashCode(): Int = 0
23+
24+
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package org.apache.flinkx.api
2+
3+
import org.apache.flink.api.common.typeinfo.TypeInformation
4+
import org.apache.flink.util.FlinkRuntimeException
5+
import org.apache.flinkx.api.RecursiveTest.Node
6+
import org.apache.flinkx.api.serializers._
7+
import org.scalatest.Inspectors
8+
import org.scalatest.flatspec.AnyFlatSpec
9+
import org.scalatest.matchers.should.Matchers
10+
11+
class RecursiveTest extends AnyFlatSpec with Matchers with Inspectors with TestUtils {
12+
13+
it should "fail fast with a comprehensive message when the derivation is recursive" in {
14+
val exception = intercept[FlinkRuntimeException](implicitly[TypeInformation[Node]])
15+
exception.getMessage shouldBe "Unsupported: recursivity detected in 'org.apache.flinkx.api.RecursiveTest.Node'."
16+
}
17+
18+
}
19+
20+
object RecursiveTest {
21+
22+
case class Node(left: Option[Node], right: Option[Node])
23+
24+
}

modules/flink-common-api/src/test/scala/org/apache/flinkx/api/SerializerTest.scala

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -86,11 +86,6 @@ class SerializerTest extends AnyFlatSpec with Matchers with Inspectors with Test
8686
roundtrip(ser, Nil)
8787
}
8888

89-
it should "derive recursively" in {
90-
// recursive is broken
91-
// val ti = implicitly[TypeInformation[Node]]
92-
}
93-
9489
it should "derive list" in {
9590
val ser = implicitly[TypeInformation[List[Simple]]].createSerializer(ec)
9691
all(ser, List(Simple(1, "a")))
@@ -286,8 +281,6 @@ object SerializerTest {
286281
case class P1(a: String) extends Param[String]
287282
case class P2(a: Int) extends Param[Int]
288283

289-
case class Node(left: Option[Node], right: Option[Node])
290-
291284
case class SimpleOption(a: Option[String])
292285

293286
case class SimpleEither(a: Either[String, Int])

0 commit comments

Comments
 (0)