Monday, December 16, 2013

Introduction to Akka Persistence

Akka Persistence is a new module in Akka 2.3. At the time of writing this post, it is available as milestone release (2.3-M2). Akka Persistence adds actor state persistence and at-least-once message delivery semantics to Akka. It is inspired by and the successor of the eventsourced project. They share many high-level concepts but completely differ on API and implementation level.

To persist an actor's state, only changes to that actor's state are written to a journal, not current state directly. These changes are appended as immutable facts to a journal, nothing is ever mutated, which allows for very high transaction rates and efficient replication. Actor state can be recovered by replaying stored changes and projecting them again. This not only allows state recovery after an actor has been restarted by a supervisor but also after JVM or node crashes, for example. State changes are defined in terms of messages an actor receives (or generates).

Persistence of messages also forms the basis for supporting at-least-once message delivery semantics. This requires retries to counter transport losses, which means keeping state at the sending end and having an acknowledgement mechanism at the receiving end (see Message Delivery Guarantees in Akka). Akka Persistence supports that for point-to-point communications. Reliable point-to-point communications are an important part of highly scalable applications (see also Pet Helland's position paper Life beyond Distributed Transactions).

The following gives a high-level overview of the current features in Akka Persistence. Links to more detailed documentation are included.

Processors

Processors are persistent actors. They internally communicate with a journal to persist messages they receive or generate. They may also request message replay from a journal to recover internal state in failure cases. Processors may either persist messages 
  • before an actor's behavior is executed (command sourcing) or
  • during an actor's behavior is executed (event sourcing)
Command sourcing is comparable to using a write-ahead-log. Messages are persisted before it is known whether they can be successfully processed or not. In failure cases, they can be (logically) removed from the journal so that they won't be replayed during next recovery. During recovery, command sourced processors show the same behavior as during normal operation. They can achieve high throughput rates by dynamically increasing the size of write batches under high load.

Event sourced processors do not persist commands. Instead they allow application code to derive events from a command and atomically persist these events. After persistence, they are applied to current state. During recovery, events are replayed and only the state-changing behavior of an event sourced processor is executed again. Other side effects that have been executed during normal operation are not performed again.

Processors automatically recover themselves. Akka Persistence guarantees that new messages sent to a processor never interleave with replayed messages. New messages are internally buffered until recovery completes, hence, an application may send messages to a processor immediately after creating it.

Snapshots

The recovery time of a processor increases with the number of messages that have been written by that processor. To reduce recovery time, applications may take snapshots of processor state which can be used as starting points for message replay. Usage of snapshots is optional and only needed for optimization.

Channels

Channels are actors that provide at-least-once message delivery semantics between a sending processor and a receiver that acknowledges the receipt of messages on application level. They also ensure that successfully acknowledged messages are not delivered again to receivers during processor recovery (i.e. replay of messages). Applications that want to have reliable message delivery without application-defined sending processors should use persistent channels. A persistent channel is like a normal channel that additionally persists messages before sending them to a receiver.

Journals

Journals (and snapshot stores) are pluggable in Akka Persistence. The default journal plugin is backed by LevelDB which writes messages to the local filesystem. A replicated journal is planned but not yet part of the distribution. Replicated journals allow stateful actors to be migrated in a cluster, for example. For testing purposes, a remotely shared LevelDB journal can be used instead of a replicated journal to experiment with stateful actor migration. Application code doesn't need to change when switching to a replicated journal later.

Updates:
  • Views have been added after 2.3-M2.
  • A replicated journal backed by Apache Cassandra is available here.  
  • A complete list of community-contributed plugins is maintained here.

Wednesday, March 20, 2013

Eventsourced for Akka - A high-level technical overview

Eventsourced is an Akka extension that adds scalable actor state persistence and at-least-once message delivery guarantees to Akka. With Eventsourced, stateful actors
  • persist received messages by appending them to a log (journal)
  • project received messages to derive current state
  • usually hold current state in memory (memory image)
  • recover current (or past) state by replaying received messages (during normal application start or after crashes)
  • never persist current state directly (except optional state snapshots for recovery time optimization)
In other words, Eventsourced implements a write-ahead log (WAL) that is used to keep track of messages an actor receives and to recover its state by replaying logged messages. Appending messages to a log instead of persisting actor state directly allows for actor state persistence at very high transaction rates and supports efficient replication. In contrast to other WAL-based systems, Eventsourced usually keeps the whole message history in the log and makes usage of state snapshots optional.

Logged messages represent intended changes to an actor's state. Logging changes instead of updating current state is one of the core concept of event sourcing. Eventsourced can be used to implement event sourcing concepts but it is not limited to that. More details about Eventsourced and its relation to event sourcing can be found here.

Eventsourced can also be used to make message exchanges between actors reliable so that they can be resumed after crashes, for example. For that purpose, channels with at-least-once message delivery guarantees are provided. Channels also prevent that output messages, sent by persistent actors, are redundantly delivered during replays which is relevant for message exchanges between these actors and other services.

Building blocks

The core building blocks provided by Eventsourced are processors, channels and journals. These are managed by an Akka extension, the EventsourcingExtension.

Processor

A processor is a stateful actor that logs (persists) messages it receives. A stateful actor is turned into a processor by modifying it with the stackable Eventsourced trait during construction. A processor can be used like any other actor.

Messages wrapped inside Message are logged by a processor, unwrapped messages are not logged. Logging behavior is implemented by the Eventsourced trait, a processor's receive method doesn't need to care about that. Acknowledging a successful write to a sender can be done by sending a reply. A processor can also hot-swap its behavior by still keeping its logging functionality.

Processors are registered at an EventsourcingExtension. This extension provides methods to recover processor state by replaying logged messages. Processors can be registered and recovered at any time during an application run.

Eventsourced doesn't impose any restrictions how processors maintain state. A processor can use vars, mutable data structures or STM references, for example.

Channel

Channels are used by processors for sending messages to other actors (channel destinations) and receiving replies from them. Channels
  • require their destinations to confirm the receipt of messages for providing at-least-once delivery guarantees (explicit ack-retry protocol). Receipt confirmations are written to a log.
  • prevent redundant delivery of messages to destinations during processor recovery (replay of messages). Replayed messages with matching receipt confirmations are dropped by the corresponding channels.
A channel itself is an actor that decorates a destination with the aforementioned functionality. Processors usually create channels as child actors for decorating destination actor references.

A processor may also sent messages directly to another actor without using a channel. In this case that actor will redundantly receive messages during processor recovery.

Eventsourced provides three different channel types (more are planned).
  • Default channel
    • Does not store received messages.
    • Re-delivers uncomfirmed messages only during recovery of the sending processor.
    • Order of messages as sent by a processor is not preserved in failure cases.
  • Reliable channel
    • Stores received messages.
    • Re-delivers unconfirmed messages based on a configurable re-delivery policy.
    • Order of messages as sent by a processor is preserved, even in failure cases.
    • Often used to deal with unreliable remote destinations.
  • Reliable request-reply channel
    • Same as reliable channel but additionally guarantees at-least-once delivery of replies.
    • Order of replies not guaranteed to correspond to the order of sent request messages.
Eventsourced channels are not meant to replace any existing messaging system but can be used, for example, to reliably connect processors to such a system, if needed. More generally, they are useful to integrate processors with other services, as described in another blog post.

Journal

A journal is an actor that is used by processors and channels to log messages and receipt confirmations. The quality of service (availability, scalability, ...) provided by a journal depends on the used storage technology. The Journals section in the user guide gives an overview of existing journal implementations and their development status.

References

Thursday, January 31, 2013

Event sourcing and external service integration

A frequently asked question when building event sourced applications is how to interact with external services. This topic is covered to some extend by Martin Fowler's Event Sourcing article in the sections External Queries and External Updates. In this blog post I'll show how to approach external service integration with the Eventsourced library for Akka. If you are new to this library, an overview is given in the user guide sections Overview and First steps.
The example application presented here was inspired by Fowler's LMAX article where he describes how event sourcing differs from an alternative transaction processing approach:

Imagine you are making an order for jelly beans by credit card. A simple retailing system would take your order information, use a credit card validation service to check your credit card number, and then confirm your order - all within a single operation. The thread processing your order would block while waiting for the credit card to be checked, but that block wouldn't be very long for the user, and the server can always run another thread on the processor while it's waiting.
In the LMAX architecture, you would split this operation into two. The first operation would capture the order information and finish by outputting an event (credit card validation requested) to the credit card company. The Business Logic Processor would then carry on processing events for other customers until it received a credit-card-validated event in its input event stream. On processing that event it would carry out the confirmation tasks for that order.
Although Fowler mentions the LMAX architecture, we don't use the Disruptor here for implementation. It's role is taken by an Akka dispatcher in the following example. Nevertheless, the described architecture and message flow remain the same:

The two components in the high-level architecture are:

  • OrderProcessor. An event sourced actor that maintains received orders and their validation state in memory. The OrderProcessor writes any received event message to an event log (journal) so that it's in-memory state can be recovered by replaying these events e.g. after a crash or during normal application start. This actor corresponds to the Business Logic Processor in Fowler's example.
  • CreditCardValidator. A plain remote, stateless actor that validates credit card information of submitted orders on receiving CreditCardValidationRequested events. Depending on the validation outcome it replies with CreditCardValidated or CreditCardValidationFailed event messages to the OrderProcessor.
The example application must meet the following requirements and conditions:

  • The OrderProcessor and the CreditCardValidator must communicate remotely so that they can be deployed separately. The CreditCardValidator is an external service from the OrderProcessor's perspective.
  • The example application must be able to recover from JVM crashes and remote communication errors. OrderProcessor state must be recoverable from logged event messages and running credit card validations must be automatically resumed after crashes. To overcome temporary network problems and remote actor downtimes, remote communication must be re-tried. Long-lasting errors must be escalated.
  • Event message replay during recovery must not redundantly emit validation requests to the CreditCardValidator and validation responses must be recorded in the event log (to solve the external queries problem). This will recover processor state in a deterministic way, making repeated recoveries independent from otherwise potentially different validation responses over time for the same validation request (a credit card may expire, for example).
  • Message processing must be idempotent. This requirement is a consequence of the at-least-once message delivery guarantee supported by Eventsourced.
The full example application code that meets these requirements is part of the Eventsourced project and can be executed with sbt.
The CreditCardValidator can be started with:
> project eventsourced-examples
> run-main org.eligosource.eventsourced.example.CreditCardValidator

The application that runs the OrderProcessor and sends OrderSubmitted events can be started with
> project eventsourced-examples
> run-main org.eligosource.eventsourced.example.OrderProcessor

The example application defines an oversimplified domain class Order
together with the domain events
Whenever the OrderProcessor receives a domain event it appends that event to the event log (journal) before processing it. To add event logging behavior to an actor it must be modified with the stackable Eventsourced trait during construction.
Eventsourced actors only write messages of type Message to the event log (together with the contained event). Messages of other type can be received by an Eventsourced actor as well but aren't logged. The Receiver trait allows the OrderProcessor's receive method to pattern-match against received events directly (instead of Message). It is not required for implementing an event sourced actor but can help to make implementations simpler.
On receiving an OrderSubmitted event, the OrderProcessor extracts the contained order object from the event, updates the order with an order id and stores it in the orders map. The orders map represents the current state of the OrderProcessor (which can be recovered by replaying logged event messages).
After updating the orders map, the OrderProcessor replies to the sender of an OrderSubmitted event with an OrderStored event. This event is a business-level acknowledgement that the received OrderSubmitted event has been successfully written to the event log. Finally, the OrderProcessor emits a CreditCardValidationRequested event message to the CreditCardValidator via reliable request-reply channel (see below). The emitted message is derived from the current event message which can be accessed via the message method of the Receiver trait. Alternatively, the OrderProcessor could also have used an emitter for sending the event (see also channel usage hints).
A reliable request-reply channel is pattern on top of a reliable channel with the following properties: It

  • persists request Messages for failure recovery and preserves message order.
  • extracts requests from received Messages before sending them to a destination.
  • wraps replies from a destination into a Message before sending them back to the request sender.
  • sends a special DestinationNotResponding reply to the request sender if the destination doesn't reply within a configurable timeout.
  • sends a special DestinationFailure reply to the request sender if the destination responds with Status.Failure.
  • guarantees at-least-once delivery of requests to the destination.
  • guarantees at-least-once delivery of replies to the request sender.
  • requires a positive receipt confirmation for a reply to mark a request-reply interaction as successfully completed.
  • redelivers requests, and subsequently replies, on missing or negative receipt confirmations.
  • sends a DeliveryStopped event to the actor system's event stream if the maximum number of delivery attempts has been reached (according to the channel's redelivery policy).
A reliable request-reply channel offers all the properties we need to reliably communicate with the remote CreditCardValidator. The channel is created as child actor of the OrderProcessor when the OrderProcessor receives a SetCreditCardValidator message.
The channel is created with the channelOf method of the actor system's EventsourcingExtension and configured with a ReliableRequestReplyChannelProps object. Configuration data are the channel destination (validator), a redelivery policy and a destination reply timeout. When sending validation requests via the created validationRequestChannel, the OrderProcessor must be prepared for receiving CreditCardValidated, CreditCardValidationFailed, DestinationNotResponding or DestinationFailure replies. These replies are sent to the OrderProcessor inside a Message and are therefore written to the event log. Consequently, OrderProcessor recoveries in the future will replay past reply messages instead of obtaining them again from the validator which ensures deterministic state recovery. Furthermore, the validationRequestChannel will ignore validation requests it receives during a replay, except those whose corresponding replies have not been positively confirmed yet. The following snippet shows how replies are processed by the OrderProcessor.

  • A CreditCardValidated reply updates the creditCardValidation status of the corresponding order to Success and stores the updated order in the orders map. Further actions, such as notifying others that an order has been accepted, are omitted here but are part of the full example code. Then, the receipt of the reply is positively confirmed (confirm(true)) so that the channel doesn't redeliver the corresponding validation request.
  • A CreditCardValidationFailed reply updates the creditCardValidation status of the corresponding order to Failure and stores the updated order in the orders map. Again, further actions are omitted here and the receipt of the reply is positively confirmed.
Because the validationRequestChannel delivers messages at-least-once, we need to detect duplicates in order to make reply processing idempotent. Here, we simply require that the order object to be updated must have a Pending creditCardValidation status before changing state (and notifying others). If the order's status is not Pending, the order has already been updated by a previous reply and the current reply is a duplicate. In this case, the methods onValidationSuccess and onValidationFailure don't have any effect (orders.get(orderId).filter(_.creditCardValidation == Pending) is None). The receipt of the duplicate is still positively confirmed. More general guidelines how to detect duplicates are outlined here.

  • A DestinationNotResponding reply is always confirmed negatively (confirm(false)) so that the channel is going redeliver the validation request to the CreditCardValidator. This may help to overcome temporary network problems, for example, but doesn't handle the case where the maximum number of redeliveries has been reached (see below).
  • A DestinationFailure reply will be negatively confirmed by default unless it has been delivered more than twice. This may help to overcome temporary CreditCardValidator failures i.e. cases where a Status.Failure is returned by the validator.
Should the CreditCardValidator be unavailable for a longer time and the validationRequestChannel reaches the maximum number of redeliveries, it will stop message delivery and publishes a DeliveryStopped event to the actor system's event stream. The channel still continues to accept new event messages and persists them so that the OrderProcessor can continue receiving OrderSubmitted events but the interaction with the CreditCardValidator is suspended. It is now up to the application to re-activate message delivery.
Subscribing to DeliveryStopped events allows an application to escalate a persisting network problem or CreditCardValidator outage by alerting a system administrator or switching to another credit card validation service, for example. In our case, a simple re-activation of the validationRequestChannel is scheduled.
The OrderProcessor subscribes itself to the actor system's event stream. On receiving a DeliveryStopped event it schedules a re-activation of the validationRequestChannel by sending it a Deliver message.
This finally meets all the requirements stated above but there's a lot more to say about external service integration. Examples are external updates or usage of channels that don't preserve message order for optimizing concurrency and throughput. I also didn't cover processor-specific, non-blocking recovery as implemented by the example application. This is enough food for another blog post.