Last active
December 25, 2015 17:49
-
-
Save pauljm/7015546 to your computer and use it in GitHub Desktop.
Actor system example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** Reads queue of new users to index */ | |
class NewUserQueue extends Actor with ActorLogging { ... } | |
object NewUserQueue { | |
case class Listen(ref: ActorRef) | |
case class NewUsers(users: Seq[User]) | |
} | |
/** Indexes documents for a new user */ | |
class UserIndexer extends Actor with ActorLogging { ... } | |
object UserIndexer { | |
case class Start(user: User) | |
case class Success(user: User) | |
case class Failure(user: User) | |
} | |
/** Updates indexed timestamp and writes user to persistent storage */ | |
class UserPersister extends Actor with ActorLogging { ... } | |
object UserPersister { | |
case class UpdateTimestampAndWrite(user: User) | |
case class Success(user: User) | |
case class Failure(user: User) | |
} | |
/** Coordinates indexing of all new users */ | |
class UserPipeline( | |
val queue: ActorRef // This is an actor that produces a stream of users to be processed | |
) extends Actor with ActorLogging { | |
// Persister to write users to DB | |
val persister = context.actorOf(Props[UserPersister]) | |
// Map of lightweight, single-use indexer actors | |
val indexers = Map.empty[User, ActorRef] | |
// Handle potential failures in our child actors | |
override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 5, withinTimeRange = 1 minute) { | |
case _: java.sql.SQLException => Resume | |
case _: NullPointerException => Restart | |
case _: Exception => Escalate | |
} | |
// Tell the queue to send new users our way | |
queue ! NewUserQueue.Listen(self) | |
def receive = { | |
// We've got users to process! | |
case NewUserQueue.NewUsers(users: Seq[User]) => | |
users.foreach { user => | |
val indexer = context.actorOf(Props[UserIndexer]) // Create a worker actor for each user | |
context.watch(indexer) // Watch the new worker for failure handling | |
indexers += user -> worker // Add the worker to our map | |
indexer ! UserIndexer.Start(user) // Tell the new worker to start working on this user | |
} | |
// UserIndexer succeeded. Update timestamp and write to DB. | |
case UserIndexer.Success(user) => | |
context.unwatch(sender) // Stop watching this actor | |
context.stop(sender) // Stop the actor | |
indexers -= user // Clear from map | |
persister ! UserPersister.UpdateTimestampAndWrite(user) | |
// Persister succeeded. We're done! | |
case UserPersister.Success(user) => | |
log.info("Done!") | |
// UserIndexer failed | |
case UserIndexer.Failure(user) => | |
context.unwatch(sender) // stop watching this actor | |
context.stop(sender) // Stop the actor | |
indexers -= user | |
log.error("Indexer failed") | |
// Persister failed | |
case UserPersister.Failure(user) => | |
log.error("Persist failed") | |
// Terminated messages are generated when an actor dies and sent to any actor watching the dead actor | |
case Terminated(deadActor) => | |
// handle an unexpected failure in the pipeline | |
indexers.find(_._2 == deadActor) foreach { case (user, _) => | |
indexers -= user | |
log.error("Worker died processing user {}", user) | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment