Created
January 1, 2023 17:56
-
-
Save djspiewak/a279eeb098e00413c16a21c18c5f2f1f to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def raceSuccessAll[F[_], G[_], E, A]( | |
fas: G[F[A]])( | |
implicit F: GenConcurrent[F, E], | |
G: Traverse[G]) | |
: F[Either[Chain[E], A]] = { | |
val permits = fas.size.toInt | |
F uncancelable { poll => | |
for { | |
complete <- CountDownLatch[F](permits) | |
success <- F.deferred[A] | |
errors <- F.ref[Chain[E]](Chain.nil) | |
fibers <- fas traverse { fa => | |
val staged = fa guaranteeCase { oc => | |
complete.release *> oc match { | |
case Outcome.Succeeded(ifa) => | |
ifa.flatMap(success.complete(_)).void | |
case Outcome.Errored(e) => | |
errors.update(_ :+ e) | |
case Outcome.Canceled() => | |
F.unit | |
} | |
} | |
staged.start | |
} | |
// wait for either the first success, or all fibers to complete | |
// regardless of which case we end up in, cancel everything when we're done | |
_ <- poll(F.race(complete.await, success.get).guarantee(fibers.parTraverse_(_.cancel))) | |
maybeA <- success.tryGet | |
} yield maybeA match { | |
case Some(a) => a.asRight[Chain[E]].pure[F] | |
case None => errors.get.map(_.asLeft[A]) | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment