diff --git a/core/src/main/scala/com/evolutiongaming/kafka/flow/KeyContext.scala b/core/src/main/scala/com/evolutiongaming/kafka/flow/KeyContext.scala index f5a89de6..922f8a15 100644 --- a/core/src/main/scala/com/evolutiongaming/kafka/flow/KeyContext.scala +++ b/core/src/main/scala/com/evolutiongaming/kafka/flow/KeyContext.scala @@ -1,11 +1,9 @@ package com.evolutiongaming.kafka.flow import cats.effect.{Ref, Resource} -import cats.mtl.Stateful import cats.syntax.all._ import cats.{Applicative, Monad} import com.evolutiongaming.catshelper.Log -import com.evolutiongaming.kafka.flow.effect.CatsEffectMtlInstances._ import com.evolutiongaming.skafka.Offset /** Key specific metainformation inside of parititon. @@ -32,11 +30,11 @@ object KeyContext { def of[F[_]: Ref.Make: Monad: Log](removeFromCache: F[Unit]): F[KeyContext[F]] = Ref.of[F, Option[Offset]](None) map { storage => - KeyContext(storage.stateInstance, removeFromCache) + KeyContext(storage, removeFromCache) } def apply[F[_]: Monad: Log]( - storage: Stateful[F, Option[Offset]], + storage: Ref[F, Option[Offset]], removeFromCache: F[Unit] ): KeyContext[F] = new KeyContext[F] { def holding = storage.get diff --git a/core/src/main/scala/com/evolutiongaming/kafka/flow/key/KeyDatabase.scala b/core/src/main/scala/com/evolutiongaming/kafka/flow/key/KeyDatabase.scala index 36f571c1..47347fd2 100644 --- a/core/src/main/scala/com/evolutiongaming/kafka/flow/key/KeyDatabase.scala +++ b/core/src/main/scala/com/evolutiongaming/kafka/flow/key/KeyDatabase.scala @@ -1,11 +1,9 @@ package com.evolutiongaming.kafka.flow.key import cats.effect.{Ref, Sync} -import cats.mtl.Stateful import cats.syntax.all._ import cats.{Applicative, Monad} import com.evolutiongaming.catshelper.LogOf -import com.evolutiongaming.kafka.flow.effect.CatsEffectMtlInstances._ import com.evolutiongaming.skafka.TopicPartition import com.evolutiongaming.sstream.Stream @@ -27,31 +25,31 @@ object KeyDatabase { /** Creates in-memory database implementation */ def memory[F[_]: Sync, K]: F[KeyDatabase[F, K]] = - Ref.of[F, Set[K]](Set.empty[K]) map { storage => - memory(storage.stateInstance) - } + Ref.of[F, Set[K]](Set.empty).map(s => memory(s)) /** Creates in-memory database implementation */ - def memory[F[_]: Monad, K](storage: Stateful[F, Set[K]]): KeyDatabase[F, K] = - new KeyDatabase[F, K] { + def memory[F[_]: Monad, K](storage: Ref[F, Set[K]]): KeyDatabase[F, K] = new FromMemory(storage) - def persist(key: K) = - storage modify (_ + key) + def empty[F[_]: Applicative, K]: KeyDatabase[F, K] = new Empty - def delete(key: K) = - storage modify (_ - key) + private final class FromMemory[F[_]: Monad, K](storage: Ref[F, Set[K]]) extends KeyDatabase[F, K] { - def all(applicationId: String, groupId: String, topicPartition: TopicPartition) = - Stream.lift(storage.get) flatMap { keys => - Stream.from(keys.toList) - } + def persist(key: K) = storage.update(_ + key) - } + def delete(key: K) = storage.update(_ - key) - def empty[F[_]: Applicative, K]: KeyDatabase[F, K] = - new KeyDatabase[F, K] { - def persist(key: K) = ().pure - def delete(key: K) = ().pure - def all(applicationId: String, groupId: String, topicPartition: TopicPartition) = Stream.empty - } + def all(applicationId: String, groupId: String, topicPartition: TopicPartition) = + Stream.lift(storage.get) flatMap { keys => + Stream.from(keys.toList) + } + } + + private final class Empty[F[_]: Applicative, K] extends KeyDatabase[F, K] { + + def persist(key: K) = ().pure + + def delete(key: K) = ().pure + + def all(applicationId: String, groupId: String, topicPartition: TopicPartition) = Stream.empty + } } diff --git a/core/src/test/scala/com/evolutiongaming/kafka/flow/key/KeysSpec.scala b/core/src/test/scala/com/evolutiongaming/kafka/flow/key/KeysSpec.scala index bef4613b..b1a2e13a 100644 --- a/core/src/test/scala/com/evolutiongaming/kafka/flow/key/KeysSpec.scala +++ b/core/src/test/scala/com/evolutiongaming/kafka/flow/key/KeysSpec.scala @@ -1,77 +1,47 @@ package com.evolutiongaming.kafka.flow.key -import cats.data.State -import cats.mtl.Stateful +import cats.effect._ +import cats.effect.kernel.Ref import com.evolutiongaming.catshelper.Log -import com.evolutiongaming.kafka.flow.key.KeysSpec._ import munit.FunSuite class KeysSpec extends FunSuite { - test("Keys add key to a database on flush") { - - val f = new ConstFixture - - // Given("empty database") - val database = KeyDatabase.memory(f.database) - val keys = Keys("key1", database) + implicit val log: Log[IO] = Log.empty[IO] - // When("Keys is flushed") - val program = keys.flush - - val result = program.runS(Set.empty).value + test("Keys add key to a database on flush") { + for { + ref <- Ref.of[IO, Set[String]](Set.empty) + db = KeyDatabase.memory(ref) - // Then("state gets into database") - assert(result == Set("key1")) + keys = Keys("key1", db) + _ <- keys.flush + state <- ref.get + } yield assertEquals(state, Set("key1")) } test("Keys delete a key from a database when requested") { + for { + ref <- Ref.of[IO, Set[String]](Set("key1")) + db = KeyDatabase.memory(ref) + snapshots = Keys("key1", db) - val f = new ConstFixture - - // Given("database with contents") - val database = KeyDatabase.memory(f.database) - val snapshots = Keys("key1", database) - val context = Set("key1") - - // When("delete is requested") - val program = snapshots.delete(true) - val result = program.runS(context).value - - // Then("key is deleted") - assert(result.isEmpty) + _ <- snapshots.delete(true) + state <- ref.get + } yield assert(state.isEmpty) } test("Keys do not delete a key from a database when not requested") { + for { + ref <- Ref.of[IO, Set[String]](Set("key1")) + db = KeyDatabase.memory(ref) + snapshots = Keys("key1", db) - val f = new ConstFixture - - // Given("database with contents") - val database = KeyDatabase.memory(f.database) - val snapshots = Keys("key1", database) - val context = Set("key1") - - // When("delete is requested") - val program = snapshots.delete(false) - val result = program.runS(context).value - - // Then("key is not deleted") - assert(result.nonEmpty) + _ <- snapshots.delete(false) + state <- ref.get + } yield assertEquals(state, Set("key1")) } - -} - -object KeysSpec { - - type F[T] = State[Set[String], T] - - class ConstFixture { - val database = Stateful[F, Set[String]] - } - - implicit val log: Log[F] = Log.empty[F] - } diff --git a/core/src/test/scala/com/evolutiongaming/kafka/flow/timer/TimerFlowOfSpec.scala b/core/src/test/scala/com/evolutiongaming/kafka/flow/timer/TimerFlowOfSpec.scala index baad0b20..7b89af7c 100644 --- a/core/src/test/scala/com/evolutiongaming/kafka/flow/timer/TimerFlowOfSpec.scala +++ b/core/src/test/scala/com/evolutiongaming/kafka/flow/timer/TimerFlowOfSpec.scala @@ -527,12 +527,13 @@ class TimerFlowOfSpec extends FunSuite { _ <- f.contextRef.set(context) _ <- program result <- f.contextRef.get + contextHolding <- f.contextHoldingRef.get } yield { // Then("flush and remove never happen") assertEquals(result.flushed, 0) assertEquals(result.removed, 0) // And("the offset of the last successful persist will be held") - assertEquals(result.holding, Some(Offset.unsafe(100))) + assertEquals(contextHolding, Some(Offset.unsafe(100))) } testIO.unsafeRunSync() @@ -567,9 +568,14 @@ object TimerFlowSpec { val contextRef: Ref[IO, Context] = Ref.unsafe[IO, Context](Context(timestamps = TimestampState(current = timestamp))) + val contextHoldingRef = Ref.lens(contextRef)( + _.holding, + (context: Context) => (offset: Option[Offset]) => context.copy(holding = offset) + ) + implicit val keyContext: KeyContext[IO] = KeyContext( - storage = contextRef.stateInstance.focus(Context.lens(_.holding)), + storage = contextHoldingRef, removeFromCache = contextRef.update(ctx => ctx.copy(removed = ctx.removed + 1)) )