Skip to content

Instantly share code, notes, and snippets.

@hanishi
Last active November 20, 2023 09:18
Show Gist options
  • Save hanishi/239036c97225d149039216fe68985781 to your computer and use it in GitHub Desktop.
Save hanishi/239036c97225d149039216fe68985781 to your computer and use it in GitHub Desktop.
Parent Child using Akka Typed
package example
import akka.Done
import akka.actor.typed.scaladsl.{
ActorContext,
Behaviors,
StashBuffer,
TimerScheduler
}
import akka.actor.typed.{ActorRef, Behavior, SupervisorStrategy}
import akka.pattern.StatusReply
import play.api.Configuration
import scala.concurrent.duration.{DurationInt, FiniteDuration}
import scala.concurrent.{ExecutionContext, Future}
import scala.util.chaining.scalaUtilChainingOps
import scala.util.{Failure, Success}
case class Child(id: String)
object Child {
def load(id: String): Future[Child] = Future.successful(Child(id))
sealed trait Command
case class Get(replyTo: ActorRef[StatusReply[Child]]) extends Command
case class Delete(replyTo: ActorRef[StatusReply[Child]]) extends Command
case class Hello(
replyTo: ActorRef[StatusReply[Hi]]
) extends Command
private case class HelloResponded(hi: Hi) extends Command
private case class Loaded(child: Child) extends Command
private case class CommandFailed(throwable: Throwable) extends Command
private case class InitializationFailed(throwable: Throwable) extends Command
private class Actor(
context: ActorContext[Command],
buffer: StashBuffer[Command],
timers: TimerScheduler[Command],
idleTimeout: FiniteDuration
) {
def initializing(): Behavior[Command] = Behaviors.receiveMessage {
case Loaded(child) =>
idle(child)
case InitializationFailed(throwable) =>
context.log.error("Initialization failed.", throwable)
Behaviors.stopped
case other =>
buffer.stash(other)
Behaviors.same
}
private def greeting(
child: Child,
replyTo: ActorRef[StatusReply[Hi]]
): Behavior[Command] =
Behaviors.receiveMessage {
case HelloResponded(hi) =>
replyTo ! StatusReply.Success(hi)
idle(child)
case CommandFailed(throwable) =>
replyTo ! StatusReply.Error(throwable)
idle(child)
case other =>
buffer.stash(other)
Behaviors.same
}
private def deleting(
child: Child,
replyTo: ActorRef[StatusReply[Child]]
): Behavior[Command] =
Behaviors.receiveMessage {
case Deleted =>
replyTo ! StatusReply.Success(child)
Behaviors.stopped
case CommandFailed(throwable) =>
replyTo ! StatusReply.Error(throwable)
idle(child)
case other =>
buffer.stash(other)
Behaviors.same
}
private def idle(child: Child): Behavior[Command] = {
if (timers.isTimerActive(TimeoutKey)) timers.cancel(TimeoutKey)
timers.startSingleTimer(
TimeoutKey,
Unload,
idleTimeout
)
buffer.unstashAll(active(child))
}
private def active(child: Child): Behavior[Command] =
Behaviors.receiveMessagePartial {
case Get(replyTo) =>
replyTo ! StatusReply.Success(child)
Behaviors.same
case Hello(replyTo) =>
hello().pipe(context.pipeToSelf) {
case Success(hi) =>
HelloResponded(hi)
case Failure(throwable) =>
CommandFailed(throwable)
}
greeting(child, replyTo)
case Delete(replyTo) =>
delete().pipe(context.pipeToSelf) {
case Success(_) =>
Deleted
case Failure(throwable) =>
CommandFailed(throwable)
}
deleting(child, replyTo)
case Unload =>
context.log.info(s"unloading $child")
Behaviors.stopped
}
private def hello(): Future[Hi] =
Future.successful(Hi("Hello my friend!"))
private def delete(): Future[Done] = Future.successful(Done)
}
case object Deleted extends Command
case object Unload extends Command
object Actor {
def apply(id: String, configuration: Configuration)(implicit
ec: ExecutionContext
): Behavior[Command] =
Behaviors
.supervise[Command](
Behaviors.withStash(configuration.get[Int](STASH_BUFFER_SIZE)) {
buffer =>
Behaviors.setup { context =>
Behaviors.withTimers { timers =>
load(id).pipe(context.pipeToSelf) {
case Success(child) => Loaded(child)
case Failure(throwable) =>
InitializationFailed(throwable)
}
new Actor(
context,
buffer,
timers,
configuration.get[Int](IDLE_TIMEOUT).minutes
).initializing()
}
}
}
)
.onFailure(SupervisorStrategy.stop)
}
private case object TimeoutKey
}
case class Hi(message: String)
class Module extends AbstractModule with AkkaGuiceSupport {
override def configure(): Unit = {
bindTypedActor(Parent.Actor, "parentActor")
}
}
package object models {
val STASH_BUFFER_SIZE = "actor.stash-buffer-size"
val TIMEOUT = "actor.future-timeout"
val IDLE_TIMEOUT = "actor.idle-timeout"
}
package example
import akka.actor.typed.scaladsl.{ActorContext, Behaviors, StashBuffer}
import akka.actor.typed.{ActorRef, Behavior, SupervisorStrategy}
import akka.pattern.StatusReply
import akka.util.Timeout
import com.google.inject.Provides
import models.Parent.Command
import play.api.Configuration
import play.api.libs.concurrent.ActorModule
import scala.concurrent.duration.DurationInt
import scala.concurrent.{ExecutionContext, Future}
import scala.util.chaining.scalaUtilChainingOps
import scala.util.{Failure, Success}
case class Parent(children: Map[String, ActorRef[Child.Command]] = Map.empty)
extends Command
object Parent {
def initialize: Future[String] = Future.successful("Initialized OK")
sealed trait Command
sealed trait Request[A] extends Command {
def id: String
def replyTo: ActorRef[StatusReply[A]]
}
case class CreateChild(
id: String,
replyTo: ActorRef[StatusReply[Child]]
) extends Request[Child]
case class DeleteChild(
id: String,
replyTo: ActorRef[StatusReply[Child]]
) extends Request[Child]
case class Hello(
id: String,
replyTo: ActorRef[StatusReply[Hi]]
) extends Request[Hi]
private case class HiReceived(
hi: Hi,
replyTo: ActorRef[StatusReply[Hi]]
) extends Command
private final case class Initialized(message: String) extends Command
private final case class InitializationFailed(
throwable: Throwable
) extends Command
private final case class RequestFailed[A](
throwable: Throwable,
request: Request[A]
) extends Command
private case class ChildDeleted(
child: Child,
replyTo: ActorRef[StatusReply[Child]]
) extends Command
private case class ChildLoaded[A](
child: Child,
request: Request[A]
) extends Command
private case class ChildTerminated(id: String) extends Command
private class Actor(
configuration: Configuration,
context: ActorContext[Command],
buffer: StashBuffer[Command]
)(implicit timeout: Timeout, ec: ExecutionContext) {
def initializing(): Behavior[Command] =
Behaviors.receiveMessage {
case Initialized(message) =>
context.log.info(message)
idle()
case InitializationFailed(throwable) =>
context.log.error("Initialization Failed")
throw throwable
case other =>
buffer.stash(other)
Behaviors.same
}
def idle(
parent: Parent,
actorRef: ActorRef[Child.Command]
): Behavior[Command] =
Behaviors.receiveMessage {
case ChildLoaded(child @ Child(id), request) =>
if (!parent.children.contains(id)) {
context.log.info(s"created $child")
context.self ! request
idle(parent.copy(children = parent.children + (id -> actorRef)))
} else {
request.replyTo ! StatusReply.Success(child)
idle(parent)
}
case HiReceived(value, replyTo) =>
replyTo ! StatusReply.Success(value)
idle(parent)
case ChildDeleted(child @ Child(id), replyTo) =>
context.log.info(s"deleted child with $id")
context.stop(actorRef)
replyTo ! StatusReply.Success(child)
idle(parent.copy(children = parent.children - id))
case RequestFailed(throwable, request) =>
context.log.error(s"failed to load child for ${request.id}")
context.stop(actorRef)
request.replyTo ! StatusReply.Error(throwable)
if (!parent.children.contains(request.id)) {
idle(parent.copy(children = parent.children - request.id))
} else idle(parent)
case other =>
buffer.stash(other)
Behaviors.same
}
def active(parent: Parent = Parent()): Behavior[Command] =
Behaviors.receiveMessagePartial {
case request @ CreateChild(id, _) =>
context.log.info(s"create child request for $id")
handleRequest(parent, request)(loadingChild(parent, request, _))
case request @ DeleteChild(id, replyTo) =>
context.log.info(s"delete child request for $id")
handleRequest(parent, request) { child =>
context.askWithStatus(child, Child.Delete) {
case Success(child) =>
ChildDeleted(child, replyTo)
case Failure(throwable) =>
RequestFailed(throwable, request)
}
idle(parent, child)
}
case request @ Hello(id, replyTo) =>
handleRequest(parent, request) { child =>
context.askWithStatus(child, Child.Hello) {
case Success(value) =>
HiReceived(value, replyTo)
case Failure(throwable) =>
RequestFailed(throwable, request)
}
idle(parent, child)
}
case ChildTerminated(id) =>
context.log.info(s"child with $id terminated")
idle(parent.copy(children = parent.children - id))
}
def handleRequest[A](parent: Parent, request: Request[A])(
behavior: ActorRef[Child.Command] => Behavior[Command]
): Behavior[Command] =
parent.children
.get(request.id)
.fold(loadChild(parent, request))(behavior(_))
private def idle(parent: Parent = Parent()): Behavior[Command] = {
context.log.info(s"current children: ${parent.children}")
buffer.unstashAll(active(parent))
}
private def loadChild[A](
parent: Parent,
request: Request[A]
): Behavior[Command] = {
context.log.info(s"creating child for ${request.id}")
val child = context.spawnAnonymous(Child.Actor(request.id, configuration))
context.watchWith(child, ChildTerminated(request.id))
loadingChild(parent, request, child)
}
private def loadingChild[A](
parent: Parent,
request: Request[A],
actorRef: ActorRef[Child.Command]
): Behavior[Command] = {
context.askWithStatus(actorRef, Child.Get) {
case Success(child) =>
ChildLoaded(child, request)
case Failure(throwable) =>
RequestFailed(throwable, request)
}
idle(parent, actorRef)
}
}
object Actor extends ActorModule {
override type Message = Parent.Command
@Provides
def apply(
configuration: Configuration
)(implicit ec: ExecutionContext): Behavior[Command] =
Behaviors
.supervise[Command](
Behaviors.withStash(configuration.get[Int](STASH_BUFFER_SIZE)) {
buffer =>
Behaviors
.setup { context =>
Behaviors.logMessages {
implicit val timeout: Timeout =
configuration.get[Int](TIMEOUT).seconds
initialize.pipe(context.pipeToSelf) {
case Success(value) =>
Initialized(value)
case Failure(throwable) =>
InitializationFailed(throwable)
}
new Actor(configuration, context, buffer).initializing()
}
}
}
)
.onFailure(SupervisorStrategy.restart)
}
}
package controllers
import akka.actor.typed.Scheduler
import com.google.inject.Singleton
import models.{Child, Hi}
import play.api.libs.json.Json
import play.api.mvc.{AbstractController, ControllerComponents}
import service.{Error, ParentChildService}
import javax.inject.Inject
import scala.concurrent.ExecutionContext
@Singleton class ParentChildController @Inject() (
cc: ControllerComponents,
service: ParentChildService
)(implicit ec: ExecutionContext, scheduler: Scheduler)
extends AbstractController(cc) {
def create(id: String) = Action.async { request =>
service
.createChild(id)
.map { case Child(id) =>
Ok(Json.toJson(Json.obj("id" -> id)))
}
.recover { case e: Throwable =>
e.printStackTrace()
InternalServerError(Json.toJson(Error(e.getMessage)))
}
}
def delete(id: String) = Action.async { request =>
service
.deleteChild(id)
.map { case Child(id) =>
Ok(Json.toJson(Json.obj("id" -> id)))
}
.recover { case e: Throwable =>
e.printStackTrace()
InternalServerError(Json.toJson(Error(e.getMessage)))
}
}
def greet(id: String) = Action.async { request =>
service
.greet(id)
.map { case Hi(message) =>
Ok(Json.toJson(Json.obj("id" -> id, "message" -> message)))
}
.recover { case e: Throwable =>
e.printStackTrace()
InternalServerError(Json.toJson(Error(e.getMessage)))
}
}
}
class ParentChildService @Inject() (
configuration: Configuration,
parent: ActorRef[Parent.Command]
)(implicit scheduler: Scheduler, ex: ExecutionContext) {
implicit val timeout: Timeout = configuration.get[Int](TIMEOUT).seconds
implicit val duration: FiniteDuration =
configuration.get[Int](TIMEOUT).seconds
def greet(id: String): Future[Hi] =
parent.askWithStatus(Parent.Hello(id, _))
def createChild(id: String): Future[Child] =
parent.askWithStatus(Parent.CreateChild(id, _))
def deleteChild(id: String): Future[Child] =
parent.askWithStatus(Parent.DeleteChild(id, _))
}
@hanishi
Copy link
Author

hanishi commented Dec 20, 2021

I should make all of the methods private

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment