Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
277 changes: 146 additions & 131 deletions src/main/scala/com/evolution/resourcepool/ResourcePool.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.evolution.resourcepool

import cats.Functor
import cats.Monad
import cats.effect.{Async, Deferred, MonadCancel, MonadCancelThrow, Poll, Ref, Resource, Sync, Temporal}
import cats.effect.syntax.all._
import cats.syntax.all._
Expand Down Expand Up @@ -150,7 +151,10 @@ object ResourcePool {

final case class Entry(value: A, release: F[Unit], timestamp: FiniteDuration)

sealed trait State
sealed trait State {
def entryReleased(id: Id, entry: Entry): (State, F[Unit])
def cancelTask(task: Task): State
}

object State {

Expand Down Expand Up @@ -181,11 +185,43 @@ object ResourcePool {
entries: Map[Id, Option[Entry]],
stage: Allocated.Stage,
releasing: Set[Id]
) extends State
) extends State {

def incrementId: Allocated =
this.copy(
id = id + 1,
entries = this.entries.updated(id, none))

def entryReleased(id: Id, entry: Entry): (State, F[Unit]) = {

val (stage, task) = this.stage.putId(id)

val state = this.copy(
entries = this.entries.updated(id, entry.some),
stage = stage
)

val executeNextTask = task.traverse_ { task =>
task.complete((id, entry).asRight)
}

(state, executeNextTask)
}

def resourceReleased(id: Id): Allocated =
this.copy(releasing = releasing - id)

def cancelTask(task: Task): State =
this.copy(stage = stage.cancelTask(task))

}

object Allocated {

sealed trait Stage
sealed trait Stage {
def putId(id: Id): (Stage, Option[Task])
def cancelTask(task: Task): Stage
}

object Stage {

Expand All @@ -200,14 +236,43 @@ object ResourcePool {
* could be equal to `Nil` if all resources are busy, but there
* are no tasks waiting in queue.
*/
final case class Free(ids: Ids) extends Stage
final case class Free(ids: Ids) extends Stage {

def putId(id: Id): (Stage, Option[Task]) =
(this.copy(ids = id :: ids), None)

def takeId: Option[(Free, Id)] = ids match {
case id :: ids => Some((this.copy(ids = ids), id))
case Nil => None
}

def cancelTask(task: Task): Stage = this

}

/** No more free resources to use, and tasks are waiting in queue.
*
* @param tasks
* List of tasks waiting for resources to free up.
*/
final case class Busy(tasks: Tasks) extends Stage
final case class Busy(tasks: Tasks) extends Stage {

def putId(id: Id): (Stage, Option[Task]) =
this
.tasks
.dequeueOption
.fold {
(free(List(id)), none[Task])
} { case (task, tasks) =>
(this.copy(tasks = tasks), task.some)
}

def cancelTask(task: Task): Stage =
this.copy(
tasks = this
.tasks
.filter { _ ne task })
}
}
}

Expand All @@ -230,7 +295,59 @@ object ResourcePool {
releasing: Set[Id],
tasks: Tasks,
released: Deferred[F, Either[Throwable, Unit]]
) extends State
) extends State {

def tryRelease: F[Boolean] =
if (releasing.isEmpty && this.allocated.isEmpty) {
// this was the last resource in a pool,
// we can release the pool itself now
this
.released
.complete(().asRight)
.as(true)
} else {
false.pure[F]
}

def failRelease(e: Throwable): F[Unit] =
this
.released
.complete(e.asLeft)
.void

def waitForRelease: F[Either[Throwable, Unit]] =
this
.released
.get

def entryReleased(id: Id, entry: Entry): (State, F[Unit]) =
this
.tasks
.dequeueOption
.fold {
(
this.copy(
allocated = this.allocated - id,
releasing = this.releasing + id),
entry.release
)
} { case (task, tasks) =>
(
this.copy(tasks = tasks),
task.complete((id, entry).asRight).void
)
}

def resourceReleased(id: Id): Released =
this.copy(releasing = releasing - id)

def cancelTask(task: Task): State =
this.copy(tasks =
this
.tasks
.filter { _ ne task })

}
}

for {
Expand All @@ -248,27 +365,19 @@ object ResourcePool {
.flatMap { released =>

def apply(allocated: Set[Id], releasing: Set[Id], tasks: Tasks)(effect: => F[Unit]) = {
val state1 = State.Released(allocated = allocated, releasing = releasing, tasks, released)
set
.apply { State.Released(allocated = allocated, releasing = releasing, tasks, released) }
.apply(state1)
.flatMap {
case true =>
for {
result <- {
if (allocated.isEmpty && releasing.isEmpty) {
// the pool is empty now, we can safely release it
released
.complete(().asRight)
.void
} else {
success <- state1.tryRelease
result <-
Monad[F].whenA(!success) {
// the pool will be released elsewhere when all resources in `allocated` or
// `releasing` get released
effect.productR {
released
.get
.rethrow
}
effect.productR(state1.waitForRelease.rethrow)
}
}
} yield {
result.asRight[Int]
}
Expand Down Expand Up @@ -336,8 +445,7 @@ object ResourcePool {

case (state: State.Released, _) =>
state
.released
.get
.waitForRelease
.rethrow
.map { _.asRight[Int] }
}
Expand Down Expand Up @@ -416,70 +524,15 @@ object ResourcePool {
new ResourcePool[F, A] {
def get = {

def releaseOf(id: Id, entry: Entry): Release = {
def releaseOf(id: Id, entry: Entry): Release =
for {
timestamp <- now
entry <- entry.copy(timestamp = timestamp).pure[F]
result <- ref
.modify {
case state: State.Allocated =>

def stateOf(stage: State.Allocated.Stage) = {
state.copy(
entries = state.entries.updated(id, entry.some),
stage = stage)
}

state
.stage match {
case stage: State.Allocated.Stage.Free =>
(
stateOf(stage.copy(ids = id :: stage.ids)),
().pure[F]
)
case stage: State.Allocated.Stage.Busy =>
stage
.tasks
.dequeueOption
.fold {
(
stateOf(State.Allocated.Stage.free(List(id))),
().pure[F]
)
} { case (task, tasks) =>
(
stateOf(stage.copy(tasks = tasks)),
task
.complete((id, entry).asRight)
.void
)
}
}

case state: State.Released =>
state
.tasks
.dequeueOption
.fold {
(
state.copy(
allocated = state.allocated - id,
releasing = state.releasing + id),
entry.release
)
} { case (task, tasks) =>
(
state.copy(tasks = tasks),
task
.complete((id, entry).asRight)
.void
)
}
}
.modify(_.entryReleased(id, entry))
.flatten
.uncancelable
} yield result
}

0.tailRecM { count =>
ref
Expand Down Expand Up @@ -514,25 +567,7 @@ object ResourcePool {
poll
.apply { task.get }
.onCancel {
ref.update {
case state: State.Allocated =>
state.stage match {
case _: State.Allocated.Stage.Free =>
state
case stage: State.Allocated.Stage.Busy =>
state.copy(
stage = stage.copy(
tasks = stage
.tasks
.filter { _ ne task }))
}

case state: State.Released =>
state.copy(tasks =
state
.tasks
.filter { _ ne task })
}
ref.update(_.cancelTask(task))
}
.rethrow
.map { case (id, entry) =>
Expand All @@ -544,9 +579,9 @@ object ResourcePool {

state.stage match {
case stage: State.Allocated.Stage.Free =>
stage.ids match {
stage.takeId match {
// there are free resources to use
case id :: ids =>
case Some((stage, id)) =>
state
.entries
.get(id)
Expand All @@ -560,7 +595,7 @@ object ResourcePool {
val entry = entry0.copy(timestamp = timestamp)
apply {
state.copy(
stage = stage.copy(ids),
stage = stage,
entries = state.entries.updated(
id,
entry0
Expand All @@ -574,16 +609,12 @@ object ResourcePool {
}

// no free resources found
case Nil =>
case None =>
val entries = state.entries
if (entries.sizeCompare(maxSize) < 0) {
// pool is not full, create a new resource
val id = state.id
apply {
state.copy(
id = id + 1,
entries = state.entries.updated(id, none))
} { _ =>
apply(state.incrementId) { _ =>
resource
.apply(id.toString)
.allocated
Expand All @@ -600,33 +631,17 @@ object ResourcePool {
result <- release.attempt
result <- ref
.modify {
case state: State.Allocated =>
(
state.copy(releasing = state.releasing - id),
().pure[F]
)
case state0: State.Allocated =>
val state1 = state0.resourceReleased(id)
(state1, ().pure[F])

case state: State.Released =>
val releasing = state.releasing - id
case state0: State.Released =>
val state1 = state0.resourceReleased(id)
(
state.copy(releasing = releasing),
state1,
result match {
case Right(a) =>
if (releasing.isEmpty && state.allocated.isEmpty) {
// this was the last resource in a pool,
// we can release the pool itself now
state
.released
.complete(a.asRight)
.void
} else {
().pure[F]
}
case Left(a) =>
state
.released
.complete(a.asLeft)
.void
case Right(_) => state1.tryRelease.void
case Left(a) => state1.failRelease(a)
}
)
}
Expand Down