Last active
March 23, 2017 10:51
-
-
Save filosganga/0c5a3c6decf3cce26609d31dec0849cd to your computer and use it in GitHub Desktop.
A map stage that control a resource lifecycle.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import akka.stream.ActorAttributes.SupervisionStrategy | |
import akka.stream._ | |
import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler} | |
import scala.util.control.NonFatal | |
class ResourceMapStage[In, Out, R](opener: () => R, closer: R => Unit, f: R => In ⇒ Out) extends GraphStage[FlowShape[In, Out]] { | |
val in = Inlet[In]("ResourceMap.in") | |
val out = Outlet[Out]("ResourceMap.out") | |
override val shape = FlowShape(in, out) | |
override def initialAttributes: Attributes = Attributes.name("resourceMap") | |
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = | |
new GraphStageLogic(shape) with InHandler with OutHandler { | |
private def decider = inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider) | |
private val r = opener() | |
override def postStop(): Unit = { | |
closer(r) | |
super.postStop() | |
} | |
override def onPush(): Unit = { | |
try { | |
push(out, f(r)(grab(in))) | |
} catch { | |
case NonFatal(ex) ⇒ decider(ex) match { | |
case Supervision.Stop ⇒ failStage(ex) | |
case _ ⇒ pull(in) | |
} | |
} | |
} | |
override def onPull(): Unit = pull(in) | |
setHandlers(in, out, this) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment