Last active
May 25, 2017 09:53
-
-
Save denistex/56fdd22a890be0559b69e84e4afa46a2 to your computer and use it in GitHub Desktop.
Concise content of "Akka in Action" book
- Scale up - more resources (CPU).
- Scale out - more servers in a cluster.
- Traditional scaling (thread, locks, RPC, etc.) is complex and inflexible.
- Actors - programming model for scaling up and out.
- Akka is centered on actors. Reactive Manifesto - base ideas of Akka.
- Actors executed asynchronously.
- Actors can receive and send messages. Messages are immutable.
- No type safety (yet). Any message can be send to the actor.
- Actor operations:
- send: sending messages;
- create: creating child actors;
- become: moving between states like state machines;
- supervise: keeping track of child actors.
- Actors decoupled on three axes:
- space/location: no expectation about where other components are located;
- time: no expectation about when the work will be done;
- interface: no expectation about what messages other components can understand.
ActorSystem
- container for all actors.- New actor is created by
ActorSystem.actorOf
withProps
object passed as argument. Props
object describes how the actor should be created. It eventually calls the actor constructor.ActorSystem
returnsActorRef
for created actor (notActor
itself).ActorRef
is used to send messages to the actor.- Every actor has a name (unique per level in the hierarchy).
ActorPath
- path to the actor in the hierarchy (like URL). Path can be relative or absolute.- Every actor has a mailbox - messages queue.
- Dispatcher pushes down the mailbox, make the actor to process next message.
- Example projects repo.
- You have to be familiar with sbt.
- httpie is good tool to test HTTP servers.
- Actor messages usually bundled together in the companion object.
Actor.sender()
is used to send reply to the message sender.ActorRef.forward(message)
is used to forward the message (original sender of the message is not changed).- Actors use
ActorContext.actorOf(props, name)
instead ofActorSystem.actorOf(props, name)
to create child actors. ActorContext.children
andActorContext.child(name)
returns children of the actor.- Heroku can be used to deploy actors applications.
git push
is used to deploy the project to Heroku.
- Actors unit testing can be:
- synchronous or asynchronous;
- single-threaded, multithreaded or multiple JVM.
- Messages can be:
- one-way: fire and forget;
- two-way: request-response.
- Three variations of actor can be tested:
- silent actor: changes the state on new message, but doesn't send any messages or producing any side effects; changes are not directly observable from the outside;
- sending actor: send message(s) to other actors; this includes: mutating copy actor, forwarding actor, transforming actor, filtering actor and sequencing actor;
- side effecting actor: produces side-effects on new message (writes to log, for example).
- Test classes should extend
TestKit
. TestKit.testActor
is internal actor that can be used as receiver in test environment.TestKit
provides method to assert messages received bytestActor
:expectMsg
,expectMsgPF
,expectNoMsg
,receiveWhile
andignoreMsg
.TestProbe
class can be used to work with several test actors.TestProbe
can be instantiated, no need to extend it.- Silent actors in single thread are tested using
TestActorRef[SilentActor]
that provides direct access to the actor and its state withunderlyingActor
. - Silent actors must support
GetState
message to be able for testing in multithreaded environment. - Sending actors are tested using
TestKit
method (expectMsg
,ignoreMsg
, etc.). - Side effecting actors are hard to test and usually it's better to add some extra functional to the actor and make it sending message to the optional receiver with side effect data. After that side effecting actors can be tested like sending ones.
- Test class can extend
ImplicitSender
to implicitly replacesender()
ref withtestActor
. Can be useful when testing two-way messages.
- Let-it-crash principle.
- Fault avoidance strategies:
- isolation: isolate failed component;
- redundancy: backup components existence;
- replacement: failed component can be easily replaced with backup;
- reboot: failed component can be restarted;
- suspend: calls to failed component should be suspended util backup is ready to process them;
- separation of concerns: fault-recovery code separated from the normall processing.
- Two separate flows of application:
- normal logic: regular actors;
- fault-recovery: supervisors.
- Two states of an actor:
- started;
- terminated.
- Three events:
- start;
- stop;
- restart.
- An actor can be stopped using
ActorSystem.stop(actor)
,ActorContext.stop(actor)
or by sendingPoisonPill
message to it. - Four hooks:
preStart
,postStop
,preRestart
(optionally callspostStop
) andpostRestart
(optionally callspreStart
). - Failing message passed to
preRestart
hook as a parameter. - Restarted actor replaces crashed one,
ActorRef
automatically switches to the new instance. - Stopped actor doesn't process messages and its
ActorRef
switches to specialdeadLettersActorRef
. - Actor can monitor any other (not only its child) with
ActorContext.watch(actor)
. In this case monitor receiveTerminated
message if monitored actor is stopped. - Monitor is not receive any messages if monitored actor is restarted.
- User space - supervisor hierarchy under the
/user
actor path. - Most dangerous actors should be as low down the hierarchy as possible.
- Two way to define a hierarchy of supervisors:
- one supervisor creates all actors in the application and supervises them (only restart of actors can be used);
- parent actor supervises its children and decide what to do with crashed child.
- Two predefined supervisor strategies:
- default: stops actor when it's failed to initialize or was killed, restarts in other cases;
- stopping: stops actor on every exception.
- Predefined strategies catch
Exception
instances only (notThrowable
). - Unhandled exceptions automatically escalades to parent of the supervisor.
- Four decisions are available for a supervisor:
- restart: child will be recreated from its props; the failing message is removed from the mailbox (can be reprocessed in restart hooks);
- resume: error is ignored, same actor instance continues to process messages;
- stop: child will be terminated, message processign stopped;
- escalade: the problem will be escaladed to the parent of the supervisor.
- Two strategies available for supervisor decisions:
OneForOneStrategy
: applies the decision to the crashed child only;AllForOneStrategy
: applies the decision to all children of the supervisor.
- Both
OneForOneStrategy
andAllForOneStrategy
havemaxNrOfRetries
andwithinTimeRange
parameters.
- A future makes it possible to process the result of a function without ever waiting in the current thread for the result.
- A future is read-only placeholder for a function result that will be available at some point in the future.
- Futures can be combined with other futures in many ways.
- Futures and actors can be used together.
- Scala futures is not a wrapper around Plain Old Java Futures (
java.util.concurrent.Future
). Future.apply(block)
executesblock
on another thread.Future.foreach
asynchronously processes the future result when it becomes available.Future.map
andFuture.flatMap
call a passed function when the future contains a successfull result and returns new Future.- Implicit
ExecutionContext
must be provided to use futures.scala.concurrent.Implicits.global
is a global execution context. - The dispatcher of an actor system can be used as an
ExecutionContext
. Better than the global one. Promise
is write side of the Future/Promise model.- A promise can only be completed once. (
IllegalStateException
is thrown). DefaultPromise[T]
extends bothFuture[T]
andPromise[T]
, thread-safe.Future.onCompleted
receivesTry
that can beSuccess
orFailure
.Try
supports pattern matching.- Fatal exceptions (like
OutOfMemoryError
) never handled by a future. They are thrown straight through the JVM. Future.onFailure
method can be used instead ofFuture.onCompleted
if only exceptions has to be processed.Future.recover
andFuture.recoverWith
methods are used to provide default future value in case the future is failed.- Code block passed to
Future.recover
andFuture.recoverWith
methods executed synchronously after the error has been returned. Keep this block simple. Future.firstCompletedOf
returns first completed future (successed or failed).Future.find
can be used to find first successed future.Future.zip
combines two futures and returns a future of tuple.- For comprehension can be used instead of
Future.map
. Future.sequence
converts sequence of futures to future of sequence (see alsoFuture.traverse
).Future.fold
can be used to collect data from sequence of futures.akka.pattern.ask
returns a future that wraps actor answer.akka.pattern.pipe
sends message to an actor when it become available in the future.- Calling
sender()
from the future body is not safe due to mutable nature of actors. Capture it inval
or usepipe
. - The value contained in the future should be immutable to avoid sharing of mutable state.
- Node - a running application that communicates across the network.
- A node has a specific role in the distributed system.
- A node uses a specific network transport protocol (TCP, UDP) to communicate with other nodes.
- Messages between nodes are encoded into network-specific protocol data units.
- Messages need to be translated to and from bytes, respectively known as serialization and deserialization.
- When nodes are part of the same distributed system, they share a group membership.
- Membership can be static or dynamic or a mix of both.
- Some kind of discovery mechanizm needed to support nodes in a dynamic membership.
- Common network topologies:
- centralized/local;
- client-server;
- star;
- ring;
- peer-to-peer/mesh;
- tree.
- Local programming differs from distributed one in four important areas:
- latency;
- partial failure;
- memory access;
- concurrency.
- Akka provides distributed model both for local and distributed programming.
- Two ways to get a reference to a remote actor: lookup and deploy.
- Akka-remote module should be configured in
src/main/resources/application.conf
(transports, host, port). ActorSystem.actorSelection
is used to lookup remote actors.- Java serialization should not be used. Akka will log a warning.
ActorSystem.actorFor
is deprecated.RemoteLookupProxy
is used to lookup remote actor and keep correctActorRef
to it.- Built-in
Identify
message andActorIdentity
reply are used to getActorRef
to remote actor by path. - Two ways to deploy an actor remotely:
- through configuration: no code changes needed;
- programmatically:
Props.withDeploy(Deploy(scope = RemoteScope(AddressFromURIString(uri))))
.
RemoteBoxOfficeForwarder
is used to deploy remote actor (through configuration), watch it, keep actualActorRef
and forward (proxy) messages to it.RemoteLookupProxy
andRemoteBoxOfficeForwarder
lost messages during reconnect to the crashed remote actor.- To use multi-JVM testing:
sbt-multi-jvm
plugin should be registered in theproject/plugins.sbt
file;- multi-JVM configuration file should be added into the
project/
dir; - node roles should be defined in object that extends
MultiNodeConfig
class; - test class should extend
MultiNodeSpec
,MultiNodeSpecCallbacks
andImplicitSender
.
MultiNodeSpec.enterBarrier
is used to synchronize nodes in test.
- The
ConfigFactory
is used to get the configuration. - Following configuration formats are supported:
application.properties
: the Java property file format;application.json
: the JSON style;application.conf
: the HOCON format;
Config.getConfig(path)
is used to get subtree of the config.- Variables substitution is allowed in the config with familiar syntax:
connectstr = "jdbc:mysql://${hostname}/mydata"
. - System properties or environment variables allowed in the config:
hostname = ${?HOST_NAME}
. - Following values for the same key overrides previous value (if the new value is not empty).
- Exception is thrown when trying to get value that isn't set before.
- The
reference.conf
file contains default values for the configuration. - The configuration library will find all the
reference.conf
files in all applicaiton components and integrate them into the configuration fallback structure. - Following configurations are used by default (upper configurations overrides lower ones):
- system properties;
application.conf
;application.json
;application.properties
;reference.conf
.
ConfigFactory.load("myapp")
will loadmyapp.{conf,json,properties}
instead ofapplication.{...}
.- Config file name can be overriden with Java system properties (name should include the extension):
config.resource
: resource name;config.file
: file path;config.url
: file URL.
Config
object can be passed toActorSystem
as a second parameter:ActorSystem("front", config)
.ActorSystem
usesConfigFactory.load()
by default (loadsapplication.{conf,json,properties}
or config that specified in Java system properties).- Loaded config can be accessed through
ActorSystem
:actorSystem.settings.config
. - Configuration file can include other configs with
include "baseConfig"
. - Lifting a configuration can be implemented using
withFallback
method:child.withFallback(parent)
. - Akka logger is an actor that receives log messages and forwards them to the preferred logging framework.
- Two built-in loggers available by default:
akka.event.Logging$DefaultLogger
: sends messages toSTDOUT
;akka.event.slf4j.Slf4jLogger
: uses SLF4j as logging framework.
- Custom logger can be used (just an actor with specialized messages interface).
ActorLogging
trait should be mixed to actor to use logging.- Placeholders available in logs:
log.debug("two args: {}, {}", "one", "two")
. - Lot of config options available for Akka system logs, check docs.
LoggingReceive
trait should be used to track actor received messages.- Register
sbt-native-packaged
plugin inproject/plugins.sbt
to build application bundle withsbt
. - Distribution is created with
sbt stage
.
- There are three enterprise integration patterns (EIPs) discussed:
- pipes and filters;
- scatter-gather;
- routing slip;
- Pipes and filters introduce independent processing units with same interface (filters), so the pipeline can be constructed from them. Units can be reordered or replaced in any way and this won't change logic of rest units. In most cases units are filters, but it's not mandatory.
- Scatter-gather can be applied in two different scenarios:
- competing tasks: the processing tasks are all focused on one thing (but they may be doing it in different ways), the gather selects one result following specified criteria;
- parallel cooperative processing: the tasks are performing a subtask, the gather combines results into a single message.
- Scatter-gather introduces three components:
- scatter: recevies one message and scatters the job to several processing tasks;
- processing tasks: doing the same jobs in parallel or doing some parts of the main job concurrently;
- gather: filters or combines tasks result.
- Recipient list pattern can be used as simple implementation of scatter component.
- Gather timeout event can be implemented with Akka scheduler:
context.system.scheduler.scheduleOnce
, that will send message to the actor after specified time. - To keep buffer of already received messages on restart, gather can use
preRestart
hook to resend messages to itself. - Aggregator trait provided by Akka implements Aggregator pattern and can be used as gather component.
- Pipes and filters can be combined with scatter-gather components in two ways:
- scatter-gather implementation presents one filter in the pipeline;
- the pipeline is used in the processing tasks of scatter-gather pattern.
- Routing slip is a dynamic version of pipes and filters patterns.
- In routing slip units have the same interface and can be combined in many ways.
- The messages sent between units has
routeSlip
field that contains the list of following recepients. Each actor takes head from the list and send result to it with tail ofrouteSlip
list. - It's better to implement route slip functions in separate trait that will send message to next task. In this case task actors won't contain copy-paste of code responsible for messages transferring.
- Three reasons for using routing to control message flow:
- performance;
- message content;
- state.
- In Akka a separation is made between the routing logic and the actor that represents the router.
- The built-in routers come in two varieties:
- pool: manage the routees;
- group: don't manage the routees (creation, adding, watching, removing and termination has to be done by the client code).
- Available routers within Akka:
RoundRobinRoutingLogic
/RoundRobinPool
/RoundRobinGroup
;RandomRoutingLogic
/RandomPool
/RandomGroup
;SmallestMailboxRoutingLogic
/SmallestMailboxPool
: group is not available because it can't check mailboxes of the routees;BalancingPool
: one mailbox for all the routees, distributes the messages to the idle routees;BroadcastRoutingLogic
/BroadcastPool
/BroadcastGroup
: recipient list;ScatterGatherFirstCompletedRoutingLogic
/ScatterGatherFirstCompletedPool
/ScatterGatherFirstCompletedGroup
: scatter-gather pattern implementation;ConsistentHashingRoutingLogic
/ConsistentHashingPool
/ConsistentHashingGroup
.
- Two different ways to configure the router:
- through configuration file;
- in source code.
- Some messages are processed by the router itself (instead of redirect to routees):
Kill
: terminates the router (pool routees will terminated as well);PoisonPill
: terminates the router (pool routees will terminated as well);Broadcast
: sends the content of the message to all the routees (DO NOT use withBalancingPool
).
RemoteRouteConfig
configures router to use the routees on remote servers.- Resizer can be configured to dynamically change size of the pool:
- should be enabled with
enabled = on
; lower-bound
,upper-bound
: min and max number of the routees;pressure-treshold
: mailbox size of router that considered to be under pressure (special value0
means that when the routee is processing a message, it's under pressure);rampup-rate
: how fast routees should be added (0.25
means +25% of current routees number, value rounded up);backoff-treshold
: when decrease number of routees (0.3
means decrease number when there are less than 30% of non-idle routees);backoff-rate
: how fast routees should be removed (0.1
means -10% of current routees number);messages-per-resize
: number of messages received before another resize action is allowed.
- should be enabled with
- Default supervisor strategy of the pool always escalade fails to its own supervisor. That can lead to unexpected restart of the pool with all the routees.
- Supervisor strategy can be passed as a parameter to the pool constructor (default strategy:
SupervisorStrategy.defaultStrategy
). - If the resizer of the pool is not configured the pool won't spawn a new route to replace terminated one (will just remove them from the list). Use resizer to keep the specified minimum number of routees.
- Groups configures with the routees paths instead of routees count.
- Paths to remote routees can be set in the configuration to make group work with remote routees. No additional changes needed.
- Group doesn't watch the routees - it will send messages to the terminated routee (it hopes routee becomes available on this path sometimes).
- Special messages to the groups:
GetRoutees
: returns routees sequence (Java collection);AddRoutee
: adds specified routee to the group;RemoveRoutee
: removes specified routee from the group.
- Three implementations of the
Routee
trait:ActorRefRoutee
: DO NOT use it to add the routee to the group, because in this case the group will watch the route and will terminate itself if the routee termiates;ActorSelectionRoutee
: usesActorSelection
;SeveralRoutees
: a list ofRoutees
.
- To remove a route the same
Route
instance as sent inAddRoute
should be sent inRemoveRoute
message. So useActorSelectionRoute
. - The consistent hashing routers use virtual nodes before the routees to get a bigger chance to equally spread all the messages over the routees.
- The consistent hashing routers support three ways to translate the message into a message key (it's possible to use the three solutions in one router):
- router-specific: a partial function is passed to the router;
- message-specific: message should implement
ConsistentHashable
; - sender-specific: message should be wrapped into
ConsistentHashableEnvelope
by the sender.
- Content-based routing can be implemented with regular actors, no Akka routers required.
- State-based routers can use
become
andunbecome
methods:become
replacesreceive
function with specified one;unbecome
restores originalreceive
function of the router.
- If state-based router fails and restarts, original
receive
function is restored.
- Two channel types:
- point-to-point;
- publish-subscribe.
- The point-to-point channel sends the message to one receiver.
- The point-to-point channel can have multiple receivers, but every message is delivered to just one receiver (example: round-robin router).
- When multiple messages are sent through point-to-point channel, the order of these messages isn't changed.
- The publish-subscribe channel has dynamic nature and decouples the receivers and the sender.
- The publisher actor shouldn't know anything about the subscribers of publish-subscribe channel.
- Every
ActorSystem
haseventStream
that can manage multiple publish-subscribe channels (classified by message type). - The actor can subscribe to a specific message type in event stream.
EventBus
can be implemented to create custom public-subscribe channel.EventBus
is generalized, there are three entities:Event
: type of all events in the channel;Subscriber
: type of subscriber allowed to register on that event bus;Classifier
: the classifier to be used in selecting subscribers for dispatching events.
- Three auxiliary traits to keep track of the subscribers:
LookupClassification
: keeps a set of subscribers for each possible classifier, usingclassify
method, which should be implemented;SubchannelClassification
: used when classifiers form a hierarchy and it's possible to subscribe at the parent nodes as well (example: message types inEventStream
implementation);ScanningClassification
: can be used when classifiers have an overlap (oneEvent
can be part of more classifiers).
ActorEventBus
trait definesSubscriber
entity asActorRef
, also implementscompareSubscribers
method needed byLookupClassification
.- Dead-letter channel (or dead-letter queue or dead-message queue) is a special channel that contain all the messages that can't be processed or delivered.
EventStream
is used to implement dead-letter channel.- Messages in dead-letter channel are wrapped into a
DeadLetter
object. - The actor can subscribe to dead-letter channel with
system.eventStream.subscribe(actor, classOf[DeadLetter])
. - A message can be sent to
system.deadLetter
actor to be published in dead-letter channel. In this case initial receiver becomesDeadLetter
actor. To avoid this the original message can be wrapped into aDeadLetter
object manually before sending to theDeadLetter
actor. - Messages from dead-letter channel can be reinserted to the mailbox to keep them from dropping.
- Akka can't guarantee message delivery in all cases (no system can).
- For local actors, the delivery is guaranteed as long as there are no critical VM errors.
- For remote actors Akka guarantees that messages are delivered at most once (a message is delivered once or it's not delivered and it's lost).
- Three reasons why Akka doesn't implement fully guaranteed delivery:
- fully guaranteed delivery results in a performance penalty even when the system don't need that level of guarantees;
- systems need guaranteed processing not delivery only, but this is system dependent and Akka can't deduce this;
- it's always possible to add stricter guarantees on top of basic ones, but inverse is not possible.
ReliableProxy
can be used to increase reliability of sending messages using remote actors.ReliableProxy
established a tunnel between sender and receiver, tracks received messages and repeat failed ones until they are delivered as well.ReliableProxy
tunnel is only one-way and for one receiver. When the receiver replies to the sender, the tunnel is not used (another tunnel has to be made in this case).
FSM
trait is used to build finite-state machines.FSM
trait takes two type parameters:State
: the super type of all states;StateData
: the super type of all possible state data types that's tracked by the FSM.
FSM
useful methods:startWith
defines the initial state and the initial state data;when(State) { PartialFunction }
declares transitions for the state;whenUnhandled { PartialFunction }
declares default behavior for unhandled events;goto(State) { PartialFunction }
declares next state during event processing;goto(State) using StateData
declares next state and updates data;stay
keeps the current state during event processing;stay using StateData
keeps the current state and updates data;onTransition { PartialFunction }
declares entry and exit actions for states (stateData
andnextStateData
variables available);initialize
starts the FSM.
- An actor can subscribe to FSM transition events by sending
SubscribeTransitionCallback(actor)
message to the FSM. FSM will reply withCurrentState
message and will sendTransition
message on each transition event. - The state timeout can be send in two ways:
- as
when
second parameter:when(State, stateTimeout: FiniteDuration)
; - by calling
forMax
method:goto(State) using (StateData) forMax (FiniteDuration)
.
- as
- State receives
StateTimeout
message on timeout. The timer is cancelled upon receipt of any other message while in the current state. - Akka has support for sending messages using timers within FSM. API calls:
setTimer(name: String, msg: Any, timeout: FiniteDuration, repeat: Boolean)
;cancelTimer(name: String)
: cancels the timer immediately, even if the timer already fired and equeued the message, the message won't be processed after this call;isTimerActive(name: String): Boolean
.
- Handler
onTermination { PartialFunction }
is used to handle FSM termination. It receivesStopEvent(Reason, State, Data)
object. There are three possible reasons:Normal
: FSMstop
method has been called;Shutdown
: the actor has been terminated;Failure(cause: Any)
: an error occurred.
- Akka
Agent
allows multiple actors work with shared state:Agent.apply
orAgent.get
returns the current state;Agent.send(State)
orAgent.send(State => State)
updates the state (use second version to combine new state with current);Agent.alter(State => State)
updates the state and returnsFuture
with new state;Agent.future
returnsFuture
that finishes when the pending state changes are all processed.
- New agents can be created witn
map
orflatMap
methods.
- Endpoints are the glue between the external service and the rest of the system. The endpoint encapsulates the interface between two services.
- Consumer endpoint receives messages from the external service and translates them to internal format of the system.
- Producer endpoint converts internal format of the message and produce it to the external service.
- Normalizer pattern translates different messages (from different services and endpoints) to common internal format to allow general processing.
- In complex cases normalizer can be splitted to three sub-components:
- a set of transport implementations (for example, EMail, REST and MQueue);
- a set of format translators (from plain text, JSON and XML);
- a router that selects translator for each received message (it should know how to distinguish between all the message types).
- There is a trade-off between flexibility and complexity: some transport implementations can be connected directly to translators if just one message type is transferred through this transport - this decreases complexety but also decreases flexibility.
- Canonical data model pattern can be used when lot of systems has to be connected together.
- In canonical data model each system provides endpoint that converts messages between internal system format and common canonical format that is used in communication bus.
- Apache Camel and Akka Camel module provide support for a great variety of transport layers and makes it possible to implement the standard EIPs in a few lines of code.
- Apache Camel allows to select transport layer implementation at runtime. This can be used in tests.
- Apache Camel uses URI to define the transport protocol and its properties.
akka.camel.Consumer
should be extended to implement a consumer:endpointUri
should be implemented to specify URI of the desired component;receive
should be implemented to process messages ofCamelMessage
type.
- To send a response in the consumer just reply to the sender actor as usual.
CamelExtension.activationFutureFor(Consumer)
is used to "wait" a consumer to become ready.CamelContext.addComponent
is used to add parametrized component (for example,ActiveMQComponent
).BrokerRegistry
is used to stop ActiveMQ message brokers:BrokerRegistry.getInstance().getBrokers.foreach { case (_, b) => b.stop() }
.akka.camel.Producer
should be extended to implement a producer:transformOutgoingMessage
can be implemented to convert message before sending;transformResponse
can be implemented to convert received response (fromCamelMessage
);routeResponse
can be implemented to route received response to other receiver (default implementation routes response to the original sender); note:transformResponse
should be called manually from overridenrouteResponse
.
- Akka HTTP module allows to define routes with directives.
- Generic directive form is this:
name(arguments) { extractions => ... // inner route }
. - Directive examples:
get { ... }
match on GET requests;post { ... }
match on POST requests;path(PathMatcher)
match request path;pathPrefix(PathMatcher)
match prefix of the path;complete(value)
completes the request with the value;
- The values can be extracted from the request path with path matchers:
pathPrefix("orders" / IntNumber) { id => ... }
. - Akka HTTP module provides a test kit to test routes.
ScalatestRouteTest
should be extended in test class to use the kit. RequestTimeout
trait can be extended to automatically readakka.http.server.request-timeout
value from the configuration.- Entity marshallers should be provided in implicit scope to enable custom type marshallers in akka-http (for example, to enable XML marshalling
akka.http.scaladsl.marshallers.xml.ScalaXmlSupport._
should be imported).
- A stream of data is a sequence of elements that could have no end.
- Streams provided by akka-stream library.
- Using akka-stream involves two steps:
- define a blueprint: how streams need to be processed;
- execute the blueprint: the graph is turned into actors that actually stream the data.
Source
andSink
are stream endpoints.- The
Source
andSink
formRunnableGraph
when connected together. - All inputs and outputs in graph should be connected to form
RunnableGraph
. RunnableGraph.run
materialize the graph - it creates needed resources and do actual work. It requiresMaterializer
in implicit scope.ActorMaterializer
convertsRunnableGraph
into actors.- A stream can be cancelled using
take
,takeWhile
andtakeWithin
. - Streams use special protocol between data publisher and subscriber.
- Data subscriber use nonblocking back pressure to signal the publisher how much data it can process.
- Back pressure is traversed from the end of stream to the beginning to ensure no publisher sends more messages than slowest consumer can process.
- Reactive Streams Initiative is a standard for asynchronous stream processing with nonblocking back pressure.
- Akka-stream uses buffers internaly, so it requests batches of elements instead of requesting every single one.
- Akka-stream performs operator fusion to remove unnecessary asynchronous boundaries in the graph. As many stages in a graph as possible are run on a single actor. This behavior is configurable.
- Sources and sinks can provide an auxiliary value in a
Future
. It's possible to configure which value should be kept in each transition (left, right or both). Flow
component is used to perform processing on a stream data. AFlow
has one input port and one output.- Akka-stream has a couple of predefined
Flow
s for framing that can be used to identify frames of data in a stream. - The
Flow
has many collection-like operations, such asmap
andfilter
. - A stream is not a collection. Big difference: the size of stream is not known.
- By default, stream processing is stopeed when an exception occurs. Supervisor strategy can be set for every graph component or for the complete graph to avoid this.
- Another option: catch exceptions and pass them through the stream as special messages.
- Bidirectional flow has two open inputs and two open outputs.
BidiFlow
can be stacked on top of a flow as an adapter.- In other words:
BidiFlow
provides two flows that can be connected before and after the existing flow to adapt input and output of the flow. - Akka-http internally uses akka-stream.
- HTTP request entity has a
dataBytes
field which isSource
of data in the HTTP stream. Source
can be send to the client asHttpEntity(ContentType, Source)
.- Akka-http makes it possible to create custom marshallers and unmarshallers for entity data of different content types.
- Akka-stream provides graph DSL.
- Graph DSL provides
GraphDSL.Builder
to create the nodes and a~>
method to connect nodes. Flow
s can be merged withMergeGraphStage
orMergePreferredGraphStage
.Source.combine
can be used to mergeSource
s.- Buffers can be used in streams.
Flow.buffer
method requires two arguments: buffer size and overflow strategy. - Buffer overflow strategies:
dropHead
;dropTail
;dropBuffer
;dropNew
;backpressure
;fail
.
Flow.groupedWithin
groups stream elements that arrived during specified time. Can be used to decrease load to the consumer.Flow.expand
registers iterator that will be used when there are no elements available from the flow but fast consumer is ready to process. It will pull elements from the iterator in this case.
- A
cluster
is a dynamic group of nodes, it makes it possible to dynamically grow and shrink the number of nodes. - Each node is a JVM with actor system started in it.
- All actor systems in the cluster should have the same name (this is the name of the cluster as well).
- Each cluster should contain one or more seed nodes. Seed nodes is a founders of the cluster.
- New node should know a list of seed nodes to join to the cluster. The list can be set in the configuration file.
- New node sends join message to each seed node; the first seed node to respond will get to handle the join command.
- One of the nodes in the cluster is the leader. The leader decides if a member node is up or down.
- The first node, in sort order, that is
Up
orLeaving
automatically becomes the leader. - If node in the cluster is down, it's flagged as
UNREACHABLE
. - The leader can't execute any leader actions as long as any of the nodes are unreachable. No node can leave or join the cluster in this case.
- The unreachable node have to be taken down (use
down
method orakka.cluster.auto-down-unreachable-after
setting). - An actor can use
Cluster.subscribe
method to subscribe to cluster events:MemberUp
,MemberExited
,MemberRemoved
,UnreachableMember
,ReachableMember
andCurrentClusterState
. ActorRef.watch
method can be used as usual to watch actors in the cluster.- Routers can be used with
ClusterRouterPool
andClusterRouterGroup
wrappers. - Cluster can be tested on local machine or in multi-JVM environment with
sbt-multi-jvm
plugin in usual way.
- Event sourcing captures a sequence of immutable events in a journal.
- A persistent actor (
PersistentActor
trait) works in two modes: it recovers from events (receiveRecover
method) or it processes commands (receiveCommand
method). - Every persistent actor requires a
persistentId
to identify the events in the journal for that actor. persist
orpersistAsync
methods is used inreceiveCommand
to store events in the journal:persist(Event)(Event => Unit)
.- Commands are messages that are sent to the actor to execute some logic.
- Events provide evidence that the actor has executed the logic correctly.
- Snapshots can reduce the required storage space and speed up recovery of the state.
saveSnapshot
method is used to save snapshots.- Recovery process can be customized by overriding the
recovery
method. - Persistence query is a module for querying a journal.
PersistenceQuery.readJournalFor
returns a specific read journal which is used to query the data.- Two types of queries:
- methods starting with
current
: returns aSource
with all currently stored events; - methods that don't start with
current
: returns aSource
with current events and continuously provide "live" events as they arrive.
- methods starting with
- Custom serializer is the best choice in most cases, but it's possible to use third-party libraries: akka-kryo-serialization and Stamina.
- Custom serializer has to extend
Serializer
trait. - Serializers can be bound to the classes in configuration file using
akka.actor.serializers
andakka.actor.serialization-binding
sections. - Akka-persistence doesn't just serialize the events and snapshots directly into the
Journal
orSnapshotStore
. The serialized objects are wrapped into internal format.EventAdapter
can help when it's needed to query the backend database of a journal plugin. - Sharding is the distribution of the actor state across servers.
- Cluster singleton extension guarantees that there's only one specified actor at any point in time in the cluster.
ClusterSingletonManager
actor ensures singleton guarantee andClusterSingletonProxy
actor always points to the current singleton in the cluster.- Default LevelDB journal is not safe for production use - akka-persistence-cassandra should be used.
ClusterSharding
extension divides actors to nodes in shards.Shard
is basically a group of sharded actors.- A
ShardRegion
manages a number ofShard
s and forwards messages to the sharded actors. - A
ShardingCoordinator
(which is a cluster singleton) determines whichShardRegion
will own theShard
behind the scenes. - Special functions (
extractEntityId
andextractShardId
) should be implemented by sharded actor's companion object to create IDs of commands and shards. The implementation should ensure that there will be no duplicate shared actors running in the cluster. ClusterSharding
module will automatically start a sharded actor once it tries to forward a command. User code shouldn't start sharded actors.- A sharded actor can ask
Shard
to be passivated when it's not used. This can help to control memory usage.
- 80% of performance improvements can be made by addressing only 20% of the system (Pareto principle):
- it's possible to make minor changes to improve performance;
- only changes to 20% of the system will have any effect on the performance;
- These 20% are called bottlenecks.
- Solving the first bottleneck gives the biggest improvement. Solving the next bottleneck will result in a lesser improvement (the concept of diminishing returns).
- Two types of performance problems:
- the throughput is too low: the number of requests that can be served is too low, usually solved by scaling;
- the latency is too long: each request takes too long to be processed, generally require design changes.
- Performance terms:
- arrival rate: number of messages arriving during a period;
- throughput: number of completions during a period;
- service time: the time needed to process a single job (or service rate - average number of jobs serviced during a period);
- the latency: the time between the entry and the exit;
- the utilization: the percentage of the time the node is busy processing messages.
- Messages queue size is an important metric indicating that there's a problem.
- Optimal performance: each time a task is completed, there's another one to do, but the wait time is vanishingly small.
- The queue size can be retrieved from the mailbox, and the utilization needs the statistics of the processing unit.
- The following data is needed from the Akka actor:
- when a message is received and added to the mailbox;
- when it was sent to be processed, removed from the mailbox and handed over to the processing unit;
- when the message was done processing and left the processing unit.
- These metrics can be retrieved by using custom mailbox and actor trait with overriden
receive
method (both should send statistics). - To create a custom mailbox
MailboxType
andMessageQueue
traits should be implemented. MailboxType
that is used by a dispatcher should be set in the configuration file (akka.actor.default-mailbox
for the default dispatcher).- To improve the performace three parameters can be changed:
- number of services: actually a scaling-up, increases the possible throughput of the node;
- arrival rate: reducing number of messages to be processed;
- service time: making processing faster, also improves the throughput.
- Available buit-in dispatchers:
Dispatcher
: default dispatcher, binds its actors to a thread pool, has fixed thread pool size;PinnedDispatcher
: binds an actor to a single and unique thread, thread isn't shared between actors;BalancingDispatcher
: redistributes the messages from busy actors to idle actors;CallingThreadDispatcher
: uses the current thread to process the messages of an actor, only used for testing.
Props.withDispatcher
is used to select the dispatcher defined in the configuration file.- When the CPU utilization is 80% or higher, increasing the number of threads will probably not increase the performance.
- When the CPU utilization is low this means that the processing of messages is mainly waiting, freezings and blocking calls should be removed. If this is impossible the number of threads can be increased.
- Three parameters to configure the number of threads:
parallelism-factor
: used to calculate number of threads from available processors;parallelism-min
: min number of threads;parallelism-max
: max number of threads.
- The executore used by the dispatcher can be changed to use a dynamic thread pool.
- Three possible values of the executor configuration item:
fork-join-executor
: default executor;thread-pool-executor
: used when a dynamic thread pool is needed;- fully qualified class name (FQCN): custom executor (should extend Java
ExecutorService
).
- Two important parameters of
thread-pool-executor
:task-queue-size
: size of waiting thread requests before the pool is increased, -1 disables pool increase (how quickly the pool size will grow);keep-alive-time
: time a thread can be idle before it's cleaned up (how quickly the pool size will decrease).
- There is
throughput
dispatcher parameter that defines the maximum number of messages an actor may process before it has to release the thread back to the pool (default value: 5). - Increasing the
throughput
will improve performance when there are a lot of messages but message processing is fast. - The
throughput
can negatively impact performance if message processing is long. Thethroughput-deadline-time
parameter bounds time that actor can keep a thread. - Whether it's needed to increase or decrease the
throughput
configuration is completely dependent on the arrival rate and the function of the system.
- Akka-typed module provides typed Actor API.
- There is no
sender()
method in akka-typed. - Actors are defined in terms of typed behaviors.
- Every message is passed to an immutable behavior.
- The behavior of an actor can be swithced over time or stay the same.
preStart
,preRestart
and other methods are replaced by special signal messages.- The akka-typed API is highly likely to change and shouldn't be used in production.
- Akka Distributed Data provides conflict-free replicated data types (CRDTs) in an Akka cluster.
- CRDTs always have a
merge
function that merges data entries into one consistent view without any coordination between the nodes. - The types of data structures that can be used as CRDT are limited.
- It's possible to build custom CRDT data structure (it should implement
merge
function according to the rules of CRDT). - Akka Distributed Data provides
Replicator
actor to replicate a data structure throughout the Akka cluster. - It's possible to subscribe to updates of the data structure.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment