Last active
October 11, 2021 13:06
-
-
Save ChristopherDavenport/8705c8401f05bba36a9bcf69e91f5d5c to your computer and use it in GitHub Desktop.
Set Once Cache
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import cats._ | |
import cats.implicits._ | |
import cats.effect._ | |
import cats.effect.implicits._ | |
import scala.concurrent.duration._ | |
import io.chrisdavenport.mapref._ | |
trait SetOnceCache[F[_], K, V]{ | |
def get(k: K): F[V] | |
def delete(k: K): F[Unit] | |
} | |
object SetOnceCache { | |
case class ClearedWhileWaitingException() extends RuntimeException("SetOnceCache evicted while waiting") | |
case class CanceledWhileWaitingException() extends RuntimeException("SetOnceCache canceled while waiting") | |
private sealed trait State[F[_], V] | |
private case class Waiting[F[_], V](identifier: Unique.Token, getWait: Deferred[F, Outcome[F, Throwable, V]]) extends State[F, V] | |
private case class Done[F[_], V](value: F[V]) extends State[F, V] | |
private sealed trait Operation[F[_], V] extends Product with Serializable | |
private case class Set[F[_], V](set: Deferred[F, Outcome[F, Throwable, V]]) extends Operation[F, V] | |
private case class Wait[F[_], V](getWait: Deferred[F, Outcome[F, Throwable, V]]) extends Operation[F, V] | |
private case class Go[F[_], V](value: F[V]) extends Operation[F, V] | |
def impl[F[_]: Async, K, V](f: K => F[V]): F[SetOnceCache[F, K, V]] = MapRef.ofScalaConcurrentTrieMap[F, K, State[F, V]].map(mapref => | |
new SetOnceCache[F, K, V]{ | |
def get(k: K): F[V] = (Async[F].unique, Deferred[F, Outcome[F, Throwable, V]]).tupled.flatMap{ case (unique, wait) => | |
Async[F].uncancelable{ poll => | |
mapref(k).modify[Operation[F, V]]{ | |
case None => (Waiting(unique, wait).some, Set(wait)) | |
case s@Some(Waiting(_, wait)) => (s, Wait(wait)) | |
case s@Some(Done(v)) => (s, Go(v)) | |
}.flatMap{ | |
case Set(set) => | |
poll(f(k)).guaranteeCase{ | |
case s@Outcome.Succeeded(fa) => | |
mapref(k).modify{ | |
case Some(Waiting(identifier, _)) => | |
(Done(fa).some, | |
if (identifier === unique) set.complete(s).void | |
else Applicative[F].unit | |
) | |
case s@Some(Done(value)) => (s, Applicative[F].unit) | |
case None => (None, Applicative[F].unit) | |
}.flatten | |
case s => mapref(k).modify{ | |
case Some(Waiting(identifier, _)) => | |
(None, | |
if (identifier === unique) set.complete(s).void | |
else Applicative[F].unit | |
) | |
case s@Some(Done(value)) => (s, Applicative[F].unit) | |
case None => (None, Applicative[F].unit) | |
}.flatten | |
} | |
case Wait(wait) => | |
def action = wait.get.flatMap{ | |
case Outcome.Succeeded(fa) => fa | |
case Outcome.Errored(e) => ApplicativeError[F, Throwable].raiseError[V](e) | |
case Outcome.Canceled() => ApplicativeError[F, Throwable].raiseError[V](CanceledWhileWaitingException()) | |
} | |
poll(action) | |
case Go(v) => poll(v) | |
} | |
} | |
} | |
def delete(k: K): F[Unit] = Async[F].uncancelable{_ => | |
mapref(k).modify{ | |
case Some(Waiting(_, wait)) => (None, wait.complete(Outcome.errored(ClearedWhileWaitingException())).void) | |
case Some(Done(_)) => (None, Applicative[F].unit) | |
case None => (None, Applicative[F].unit) | |
}.flatten | |
} | |
} | |
) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment