Created
October 16, 2019 22:13
-
-
Save D4v1X/9484d6e92b3ad055e0b6416cb9a795af to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.concurrent.atomic.AtomicInteger | |
import java.util.concurrent.{Executors, ThreadFactory} | |
import cats.effect._ | |
import cats.syntax.all._ | |
import io.chrisdavenport.log4cats.SelfAwareStructuredLogger | |
import io.chrisdavenport.log4cats.slf4j.Slf4jLogger | |
import fs2._ | |
import scala.concurrent.ExecutionContext | |
import scala.concurrent.duration._ | |
object Main extends IOApp { | |
val CSEXEC: ExecutionContext = { | |
// lower-bound of 2 to prevent pathological deadlocks on virtual machines | |
// val bound = math.max(2, Runtime.getRuntime().availableProcessors()) | |
val bound = 5 | |
val executor = Executors.newFixedThreadPool(bound, new ThreadFactory { | |
val ctr = new AtomicInteger(0) | |
def newThread(r: Runnable): Thread = { | |
val back = new Thread(r) | |
back.setName(s"io-thread-${ctr.getAndIncrement()}") | |
back.setDaemon(true) | |
back | |
} | |
}) | |
ExecutionContext.fromExecutor(executor) | |
} | |
implicit override val timer: Timer[IO] = IO.timer(CSEXEC) | |
implicit override val contextShift: ContextShift[IO] = IO.contextShift(CSEXEC) | |
// implicit val ioC: ConcurrentEffect[IO] = IO.ioConcurrentEffect(contextShift) | |
val LOGGER: SelfAwareStructuredLogger[IO] = Slf4jLogger.getLoggerFromName[IO]("MAIN") | |
def run(args: List[String]): IO[ExitCode] = { | |
val infiniteStream: Stream[IO, Int] = Stream.emit(1).repeat.covary[IO].map(_ + 3) | |
val launchMissiles = | |
LOGGER.info("Start launch!") *> | |
infiniteStream.evalMap(i => LOGGER.info(s"${i}")).compile.drain *> | |
IO.raiseError(new Exception("boom!")) | |
val runToBunker = | |
LOGGER.info("To the bunker!!!") *> | |
IO.sleep(10.seconds) *> | |
IO.raiseError(new Exception("Continue Running!")) *> | |
LOGGER.info("Safe into bunker") | |
val resultIO: IO[Nothing] = for { | |
fiber <- launchMissiles.start | |
_ <- runToBunker.handleErrorWith { error => | |
// Retreat failed, cancel launch (maybe we should | |
// have retreated to our bunker before the launch?) | |
LOGGER.info("Abort Launch!") *> fiber.cancel *> IO.raiseError(error) | |
} | |
aftermath <- fiber.join | |
} yield aftermath | |
resultIO.as(ExitCode.Success) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment