Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
package com.evolutiongaming.kafka.flow

import cats.effect.{Ref, Resource}
import cats.mtl.Stateful
import cats.syntax.all._
import cats.{Applicative, Monad}
import com.evolutiongaming.catshelper.Log
import com.evolutiongaming.kafka.flow.effect.CatsEffectMtlInstances._
import com.evolutiongaming.skafka.Offset

/** Key specific metainformation inside of parititon.
Expand All @@ -32,11 +30,11 @@ object KeyContext {

def of[F[_]: Ref.Make: Monad: Log](removeFromCache: F[Unit]): F[KeyContext[F]] =
Ref.of[F, Option[Offset]](None) map { storage =>
KeyContext(storage.stateInstance, removeFromCache)
KeyContext(storage, removeFromCache)
}

def apply[F[_]: Monad: Log](
storage: Stateful[F, Option[Offset]],
storage: Ref[F, Option[Offset]],
removeFromCache: F[Unit]
): KeyContext[F] = new KeyContext[F] {
def holding = storage.get
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
package com.evolutiongaming.kafka.flow.key

import cats.effect.{Ref, Sync}
import cats.mtl.Stateful
import cats.syntax.all._
import cats.{Applicative, Monad}
import com.evolutiongaming.catshelper.LogOf
import com.evolutiongaming.kafka.flow.effect.CatsEffectMtlInstances._
import com.evolutiongaming.skafka.TopicPartition
import com.evolutiongaming.sstream.Stream

Expand All @@ -27,31 +25,31 @@ object KeyDatabase {

/** Creates in-memory database implementation */
def memory[F[_]: Sync, K]: F[KeyDatabase[F, K]] =
Ref.of[F, Set[K]](Set.empty[K]) map { storage =>
memory(storage.stateInstance)
}
Ref.of[F, Set[K]](Set.empty).map(s => memory(s))

/** Creates in-memory database implementation */
def memory[F[_]: Monad, K](storage: Stateful[F, Set[K]]): KeyDatabase[F, K] =
new KeyDatabase[F, K] {
def memory[F[_]: Monad, K](storage: Ref[F, Set[K]]): KeyDatabase[F, K] = new FromMemory(storage)

def persist(key: K) =
storage modify (_ + key)
def empty[F[_]: Applicative, K]: KeyDatabase[F, K] = new Empty

def delete(key: K) =
storage modify (_ - key)
private final class FromMemory[F[_]: Monad, K](storage: Ref[F, Set[K]]) extends KeyDatabase[F, K] {

def all(applicationId: String, groupId: String, topicPartition: TopicPartition) =
Stream.lift(storage.get) flatMap { keys =>
Stream.from(keys.toList)
}
def persist(key: K) = storage.update(_ + key)

}
def delete(key: K) = storage.update(_ - key)

def empty[F[_]: Applicative, K]: KeyDatabase[F, K] =
new KeyDatabase[F, K] {
def persist(key: K) = ().pure
def delete(key: K) = ().pure
def all(applicationId: String, groupId: String, topicPartition: TopicPartition) = Stream.empty
}
def all(applicationId: String, groupId: String, topicPartition: TopicPartition) =
Stream.lift(storage.get) flatMap { keys =>
Stream.from(keys.toList)
}
}

private final class Empty[F[_]: Applicative, K] extends KeyDatabase[F, K] {

def persist(key: K) = ().pure

def delete(key: K) = ().pure

def all(applicationId: String, groupId: String, topicPartition: TopicPartition) = Stream.empty
}
}
Original file line number Diff line number Diff line change
@@ -1,77 +1,47 @@
package com.evolutiongaming.kafka.flow.key

import cats.data.State
import cats.mtl.Stateful
import cats.effect._
import cats.effect.kernel.Ref
import com.evolutiongaming.catshelper.Log
import com.evolutiongaming.kafka.flow.key.KeysSpec._
import munit.FunSuite

class KeysSpec extends FunSuite {

test("Keys add key to a database on flush") {

val f = new ConstFixture

// Given("empty database")
val database = KeyDatabase.memory(f.database)
val keys = Keys("key1", database)
implicit val log: Log[IO] = Log.empty[IO]

// When("Keys is flushed")
val program = keys.flush

val result = program.runS(Set.empty).value
test("Keys add key to a database on flush") {
for {
ref <- Ref.of[IO, Set[String]](Set.empty)
db = KeyDatabase.memory(ref)

// Then("state gets into database")
assert(result == Set("key1"))
keys = Keys("key1", db)
_ <- keys.flush

state <- ref.get
} yield assertEquals(state, Set("key1"))
}

test("Keys delete a key from a database when requested") {
for {
ref <- Ref.of[IO, Set[String]](Set("key1"))
db = KeyDatabase.memory(ref)
snapshots = Keys("key1", db)

val f = new ConstFixture

// Given("database with contents")
val database = KeyDatabase.memory(f.database)
val snapshots = Keys("key1", database)
val context = Set("key1")

// When("delete is requested")
val program = snapshots.delete(true)
val result = program.runS(context).value

// Then("key is deleted")
assert(result.isEmpty)
_ <- snapshots.delete(true)

state <- ref.get
} yield assert(state.isEmpty)
}

test("Keys do not delete a key from a database when not requested") {
for {
ref <- Ref.of[IO, Set[String]](Set("key1"))
db = KeyDatabase.memory(ref)
snapshots = Keys("key1", db)

val f = new ConstFixture

// Given("database with contents")
val database = KeyDatabase.memory(f.database)
val snapshots = Keys("key1", database)
val context = Set("key1")

// When("delete is requested")
val program = snapshots.delete(false)
val result = program.runS(context).value

// Then("key is not deleted")
assert(result.nonEmpty)
_ <- snapshots.delete(false)

state <- ref.get
} yield assertEquals(state, Set("key1"))
}

}

object KeysSpec {

type F[T] = State[Set[String], T]

class ConstFixture {
val database = Stateful[F, Set[String]]
}

implicit val log: Log[F] = Log.empty[F]

}
Original file line number Diff line number Diff line change
Expand Up @@ -527,12 +527,13 @@ class TimerFlowOfSpec extends FunSuite {
_ <- f.contextRef.set(context)
_ <- program
result <- f.contextRef.get
contextHolding <- f.contextHoldingRef.get
} yield {
// Then("flush and remove never happen")
assertEquals(result.flushed, 0)
assertEquals(result.removed, 0)
// And("the offset of the last successful persist will be held")
assertEquals(result.holding, Some(Offset.unsafe(100)))
assertEquals(contextHolding, Some(Offset.unsafe(100)))
}

testIO.unsafeRunSync()
Expand Down Expand Up @@ -567,9 +568,14 @@ object TimerFlowSpec {
val contextRef: Ref[IO, Context] =
Ref.unsafe[IO, Context](Context(timestamps = TimestampState(current = timestamp)))

val contextHoldingRef = Ref.lens(contextRef)(
_.holding,
(context: Context) => (offset: Option[Offset]) => context.copy(holding = offset)
)

implicit val keyContext: KeyContext[IO] =
KeyContext(
storage = contextRef.stateInstance.focus(Context.lens(_.holding)),
storage = contextHoldingRef,
removeFromCache = contextRef.update(ctx => ctx.copy(removed = ctx.removed + 1))
)

Expand Down