Skip to content

Commit

Permalink
Merge pull request #538 from jordiolivares/bugfix/fix-deadlock
Browse files Browse the repository at this point in the history
Fix problem with fibers being cancelled due to timeout and causing fetch deadlock
  • Loading branch information
juanpedromoreno authored Sep 24, 2021
2 parents 7b9f6d0 + 42ee937 commit d56b9ba
Showing 1 changed file with 13 additions and 8 deletions.
21 changes: 13 additions & 8 deletions fetch/src/main/scala/datasource.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package fetch
import cats.data.NonEmptyList
import cats.effect._
import cats.effect.implicits._
import cats.effect.std.Queue
import cats.effect.std.{Queue, Supervisor}
import cats.kernel.{Hash => H}
import cats.syntax.all._

Expand Down Expand Up @@ -75,8 +75,10 @@ object DataSource {
implicit F: Temporal[F]
): F[List[T]] = {
Ref[F].of(List.empty[T]).flatMap { ref =>
val takeAndBuffer = queue.take.flatMap { x =>
ref.updateAndGet(list => x :: list)
val takeAndBuffer = F.uncancelable { poll =>
poll(queue.take).flatMap { x =>
ref.updateAndGet(list => x :: list)
}
}
val bufferUntilNumElements = takeAndBuffer.iterateUntil { buffer =>
buffer.size == maxElements
Expand Down Expand Up @@ -112,14 +114,16 @@ object DataSource {
): Resource[F, DataSource[F, I, A]] = {
type Callback = Either[Throwable, Option[A]] => Unit
for {
queue <- Resource.eval(Queue.unbounded[F, (I, Callback)])
queue <- Resource.eval(Queue.unbounded[F, (I, Callback)])
supervisor <- Supervisor[F]
workerFiber = upToWithin(
queue,
dataSource.maxBatchSize.getOrElse(Int.MaxValue),
delayPerBatch
).flatMap {
case Nil => F.start(F.unit)
case x =>
).flatMap { x =>
if (x.isEmpty) {
supervisor.supervise(F.unit)
} else {
val asMap = x.groupBy(_._1).mapValues(callbacks => callbacks.map(_._2))
val batchResults = dataSource.batch(NonEmptyList.fromListUnsafe(asMap.keys.toList))
val resultsHaveBeenSent = batchResults.map { results =>
Expand All @@ -132,7 +136,8 @@ object DataSource {
callbacks.foreach(cb => cb(Left(ex)))
}
}
F.start(fiberWork)
supervisor.supervise(fiberWork)
}
}.foreverM[Unit]
_ <- F.background(workerFiber)
} yield {
Expand Down

0 comments on commit d56b9ba

Please sign in to comment.