Skip to content

Commit

Permalink
Merge pull request #504 from jordiolivares/feature/add-batch-across-f…
Browse files Browse the repository at this point in the history
…etch

Add support for batching requests across unrelated fetches
  • Loading branch information
juanpedromoreno authored Sep 14, 2021
2 parents 0b659b5 + 776d06d commit c96f74a
Show file tree
Hide file tree
Showing 2 changed files with 167 additions and 1 deletion.
85 changes: 85 additions & 0 deletions fetch/src/main/scala/datasource.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,13 @@ package fetch
import cats.data.NonEmptyList
import cats.effect._
import cats.effect.implicits._
import cats.effect.std.Queue
import cats.kernel.{Hash => H}
import cats.syntax.all._

import scala.collection.mutable
import scala.concurrent.duration.FiniteDuration

/**
* `Data` is a trait used to identify and optimize access to a `DataSource`.
*/
Expand Down Expand Up @@ -66,6 +70,87 @@ trait DataSource[F[_], I, A] {
def batchExecution: BatchExecution = InParallel
}

object DataSource {
private def upToWithin[F[_], T](queue: Queue[F, T], maxElements: Int, interval: FiniteDuration)(
implicit F: Temporal[F]
): F[List[T]] = {
Ref[F].of(List.empty[T]).flatMap { ref =>
val takeAndBuffer = queue.take.flatMap { x =>
ref.updateAndGet(list => x :: list)
}
val bufferUntilNumElements = takeAndBuffer.iterateUntil { buffer =>
buffer.size == maxElements
}
F.timeoutTo(bufferUntilNumElements, interval, ref.get)
}
}

/**
* Returns a new DataSource that will batch Fetch requests across executions within a given
* interval.
*
* As an example, if we have a Fetch request A, and a fetch request B that are being executed
* simultaneously without knowledge of the other within some milliseconds of the other, the
* datasource will transparently batch the two requests in a single batch call execution.
*
* This is useful if you want to treat each fetch individually from the others, for example in an
* HTTP server processing requests.
*
* The original DataSource limits will be respected
*
* @param dataSource
* the original datasource to be wrapped
* @param delayPerBatch
* the interval for processing Fetch requests as a single Batch call
* @return
*/
def batchAcrossFetches[F[_], I, A](
dataSource: DataSource[F, I, A],
delayPerBatch: FiniteDuration
)(implicit
F: Async[F]
): Resource[F, DataSource[F, I, A]] = {
type Callback = Either[Throwable, Option[A]] => Unit
for {
queue <- Resource.eval(Queue.unbounded[F, (I, Callback)])
workerFiber = upToWithin(
queue,
dataSource.maxBatchSize.getOrElse(Int.MaxValue),
delayPerBatch
).flatMap {
case Nil => F.start(F.unit)
case x =>
val asMap = x.groupBy(_._1).mapValues(callbacks => callbacks.map(_._2))
val batchResults = dataSource.batch(NonEmptyList.fromListUnsafe(asMap.keys.toList))
val resultsHaveBeenSent = batchResults.map { results =>
asMap.foreach { case (identity, callbacks) =>
callbacks.foreach(cb => cb(Right(results.get(identity))))
}
}
val fiberWork = F.handleError(resultsHaveBeenSent) { ex =>
asMap.foreach { case (_, callbacks) =>
callbacks.foreach(cb => cb(Left(ex)))
}
}
F.start(fiberWork)
}.foreverM[Unit]
_ <- F.background(workerFiber)
} yield {
new DataSource[F, I, A] {
override def data: Data[I, A] = dataSource.data

override implicit def CF: Concurrent[F] = dataSource.CF

override def fetch(id: I): F[Option[A]] = {
F.async { cb =>
queue.offer((id, cb)) *> F.pure(None)
}
}
}
}
}
}

sealed trait BatchExecution extends Product with Serializable
case object Sequentially extends BatchExecution
case object InParallel extends BatchExecution
83 changes: 82 additions & 1 deletion fetch/src/test/scala/FetchBatchingTests.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@ import cats.data.NonEmptyList
import cats.instances.list._
import cats.syntax.all._
import cats.effect._

import fetch._

import java.util.concurrent.atomic.AtomicInteger
import scala.concurrent.duration.{DurationInt, FiniteDuration}

class FetchBatchingTests extends FetchSpec {
import TestHelper._

Expand Down Expand Up @@ -94,6 +96,51 @@ class FetchBatchingTests extends FetchSpec {
}
}

case class BatchAcrossFetchData(id: Int)

object BatchAcrossFetches extends Data[BatchAcrossFetchData, String] {
def name = "Batch across Fetches"

private val batchesCounter = new AtomicInteger(0)
private val fetchesCounter = new AtomicInteger(0)

def reset(): Unit = {
batchesCounter.set(0)
fetchesCounter.set(0)
}

def counters: (Int, Int) =
(fetchesCounter.get(), batchesCounter.get())

def unBatchedSource[F[_]: Concurrent]: DataSource[F, BatchAcrossFetchData, String] =
new DataSource[F, BatchAcrossFetchData, String] {
override def data = BatchAcrossFetches

override def CF = Concurrent[F]

override def fetch(request: BatchAcrossFetchData): F[Option[String]] = {
fetchesCounter.incrementAndGet()
CF.pure(Some(request.toString))
}

override def batch(
ids: NonEmptyList[BatchAcrossFetchData]
): F[Map[BatchAcrossFetchData, String]] = {
batchesCounter.incrementAndGet()
CF.pure(
ids.map(id => id -> id.toString).toList.toMap
)
}

override val batchExecution = InParallel
}

def batchedSource[F[_]: Async](
interval: FiniteDuration
): Resource[F, DataSource[F, BatchAcrossFetchData, String]] =
DataSource.batchAcrossFetches(unBatchedSource, interval)
}

def fetchBatchedDataSeq[F[_]: Concurrent](id: Int): Fetch[F, Int] =
Fetch(BatchedDataSeq(id), SeqBatch.source)

Expand Down Expand Up @@ -207,4 +254,38 @@ class FetchBatchingTests extends FetchSpec {
result shouldEqual ids.map(_.toString)
}.unsafeToFuture()
}

"Fetches produced across unrelated fetches to a DataSource that is NOT batched across fetch executions should NOT be bundled together" in {
BatchAcrossFetches.reset()
val dataSource = BatchAcrossFetches.unBatchedSource[IO]
val id1 = BatchAcrossFetchData(1)
val id2 = BatchAcrossFetchData(2)
val execution1 = Fetch.run[IO](Fetch(id1, dataSource))
val execution2 = Fetch.run[IO](Fetch(id2, dataSource))
val singleExecution = (execution1, execution2).parMapN { (_, _) =>
val (fetchRequests, batchRequests) = BatchAcrossFetches.counters
fetchRequests shouldEqual 2
batchRequests shouldEqual 0
}
singleExecution.unsafeToFuture()
}

"Fetches produced across unrelated fetches to a DataSource that is batched across fetch executions should be bundled together" in {
BatchAcrossFetches.reset()
val dataSource = BatchAcrossFetches.batchedSource[IO](500.millis)
val id1 = BatchAcrossFetchData(1)
val id2 = BatchAcrossFetchData(2)
dataSource
.use { dataSource =>
val execution1 = Fetch.run[IO](Fetch(id1, dataSource))
val execution2 = Fetch.run[IO](Fetch(id2, dataSource))
val singleExecution = (execution1, execution2).parMapN { (_, _) =>
val (fetchRequests, batchRequests) = BatchAcrossFetches.counters
fetchRequests shouldEqual 0
batchRequests shouldEqual 1
}
singleExecution
}
.unsafeToFuture()
}
}

0 comments on commit c96f74a

Please sign in to comment.