Monday, December 16, 2013

Introduction to Akka Persistence

Akka Persistence is a new module in Akka 2.3. At the time of writing this post, it is available as milestone release (2.3-M2). Akka Persistence adds actor state persistence and at-least-once message delivery semantics to Akka. It is inspired by and the successor of the eventsourced project. They share many high-level concepts but completely differ on API and implementation level.

To persist an actor's state, only changes to that actor's state are written to a journal, not current state directly. These changes are appended as immutable facts to a journal, nothing is ever mutated, which allows for very high transaction rates and efficient replication. Actor state can be recovered by replaying stored changes and projecting them again. This not only allows state recovery after an actor has been restarted by a supervisor but also after JVM or node crashes, for example. State changes are defined in terms of messages an actor receives (or generates).

Persistence of messages also forms the basis for supporting at-least-once message delivery semantics. This requires retries to counter transport losses, which means keeping state at the sending end and having an acknowledgement mechanism at the receiving end (see Message Delivery Guarantees in Akka). Akka Persistence supports that for point-to-point communications. Reliable point-to-point communications are an important part of highly scalable applications (see also Pet Helland's position paper Life beyond Distributed Transactions).

The following gives a high-level overview of the current features in Akka Persistence. Links to more detailed documentation are included.

Processors

Processors are persistent actors. They internally communicate with a journal to persist messages they receive or generate. They may also request message replay from a journal to recover internal state in failure cases. Processors may either persist messages 
  • before an actor's behavior is executed (command sourcing) or
  • during an actor's behavior is executed (event sourcing)
Command sourcing is comparable to using a write-ahead-log. Messages are persisted before it is known whether they can be successfully processed or not. In failure cases, they can be (logically) removed from the journal so that they won't be replayed during next recovery. During recovery, command sourced processors show the same behavior as during normal operation. They can achieve high throughput rates by dynamically increasing the size of write batches under high load.

Event sourced processors do not persist commands. Instead they allow application code to derive events from a command and atomically persist these events. After persistence, they are applied to current state. During recovery, events are replayed and only the state-changing behavior of an event sourced processor is executed again. Other side effects that have been executed during normal operation are not performed again.

Processors automatically recover themselves. Akka Persistence guarantees that new messages sent to a processor never interleave with replayed messages. New messages are internally buffered until recovery completes, hence, an application may send messages to a processor immediately after creating it.

Snapshots

The recovery time of a processor increases with the number of messages that have been written by that processor. To reduce recovery time, applications may take snapshots of processor state which can be used as starting points for message replay. Usage of snapshots is optional and only needed for optimization.

Channels

Channels are actors that provide at-least-once message delivery semantics between a sending processor and a receiver that acknowledges the receipt of messages on application level. They also ensure that successfully acknowledged messages are not delivered again to receivers during processor recovery (i.e. replay of messages). Applications that want to have reliable message delivery without application-defined sending processors should use persistent channels. A persistent channel is like a normal channel that additionally persists messages before sending them to a receiver.

Journals

Journals (and snapshot stores) are pluggable in Akka Persistence. The default journal plugin is backed by LevelDB which writes messages to the local filesystem. A replicated journal is planned but not yet part of the distribution. Replicated journals allow stateful actors to be migrated in a cluster, for example. For testing purposes, a remotely shared LevelDB journal can be used instead of a replicated journal to experiment with stateful actor migration. Application code doesn't need to change when switching to a replicated journal later.

Updates:
  • Views have been added after 2.3-M2.
  • A replicated journal backed by Apache Cassandra is available here.  
  • A complete list of community-contributed plugins is maintained here.

Wednesday, March 20, 2013

Eventsourced for Akka - A high-level technical overview

Eventsourced is an Akka extension that adds scalable actor state persistence and at-least-once message delivery guarantees to Akka. With Eventsourced, stateful actors
  • persist received messages by appending them to a log (journal)
  • project received messages to derive current state
  • usually hold current state in memory (memory image)
  • recover current (or past) state by replaying received messages (during normal application start or after crashes)
  • never persist current state directly (except optional state snapshots for recovery time optimization)
In other words, Eventsourced implements a write-ahead log (WAL) that is used to keep track of messages an actor receives and to recover its state by replaying logged messages. Appending messages to a log instead of persisting actor state directly allows for actor state persistence at very high transaction rates and supports efficient replication. In contrast to other WAL-based systems, Eventsourced usually keeps the whole message history in the log and makes usage of state snapshots optional.

Logged messages represent intended changes to an actor's state. Logging changes instead of updating current state is one of the core concept of event sourcing. Eventsourced can be used to implement event sourcing concepts but it is not limited to that. More details about Eventsourced and its relation to event sourcing can be found here.

Eventsourced can also be used to make message exchanges between actors reliable so that they can be resumed after crashes, for example. For that purpose, channels with at-least-once message delivery guarantees are provided. Channels also prevent that output messages, sent by persistent actors, are redundantly delivered during replays which is relevant for message exchanges between these actors and other services.

Building blocks

The core building blocks provided by Eventsourced are processors, channels and journals. These are managed by an Akka extension, the EventsourcingExtension.

Processor

A processor is a stateful actor that logs (persists) messages it receives. A stateful actor is turned into a processor by modifying it with the stackable Eventsourced trait during construction. A processor can be used like any other actor.

Messages wrapped inside Message are logged by a processor, unwrapped messages are not logged. Logging behavior is implemented by the Eventsourced trait, a processor's receive method doesn't need to care about that. Acknowledging a successful write to a sender can be done by sending a reply. A processor can also hot-swap its behavior by still keeping its logging functionality.

Processors are registered at an EventsourcingExtension. This extension provides methods to recover processor state by replaying logged messages. Processors can be registered and recovered at any time during an application run.

Eventsourced doesn't impose any restrictions how processors maintain state. A processor can use vars, mutable data structures or STM references, for example.

Channel

Channels are used by processors for sending messages to other actors (channel destinations) and receiving replies from them. Channels
  • require their destinations to confirm the receipt of messages for providing at-least-once delivery guarantees (explicit ack-retry protocol). Receipt confirmations are written to a log.
  • prevent redundant delivery of messages to destinations during processor recovery (replay of messages). Replayed messages with matching receipt confirmations are dropped by the corresponding channels.
A channel itself is an actor that decorates a destination with the aforementioned functionality. Processors usually create channels as child actors for decorating destination actor references.

A processor may also sent messages directly to another actor without using a channel. In this case that actor will redundantly receive messages during processor recovery.

Eventsourced provides three different channel types (more are planned).
  • Default channel
    • Does not store received messages.
    • Re-delivers uncomfirmed messages only during recovery of the sending processor.
    • Order of messages as sent by a processor is not preserved in failure cases.
  • Reliable channel
    • Stores received messages.
    • Re-delivers unconfirmed messages based on a configurable re-delivery policy.
    • Order of messages as sent by a processor is preserved, even in failure cases.
    • Often used to deal with unreliable remote destinations.
  • Reliable request-reply channel
    • Same as reliable channel but additionally guarantees at-least-once delivery of replies.
    • Order of replies not guaranteed to correspond to the order of sent request messages.
Eventsourced channels are not meant to replace any existing messaging system but can be used, for example, to reliably connect processors to such a system, if needed. More generally, they are useful to integrate processors with other services, as described in another blog post.

Journal

A journal is an actor that is used by processors and channels to log messages and receipt confirmations. The quality of service (availability, scalability, ...) provided by a journal depends on the used storage technology. The Journals section in the user guide gives an overview of existing journal implementations and their development status.

References

Thursday, January 31, 2013

Event sourcing and external service integration

A frequently asked question when building event sourced applications is how to interact with external services. This topic is covered to some extend by Martin Fowler's Event Sourcing article in the sections External Queries and External Updates. In this blog post I'll show how to approach external service integration with the Eventsourced library for Akka. If you are new to this library, an overview is given in the user guide sections Overview and First steps.
The example application presented here was inspired by Fowler's LMAX article where he describes how event sourcing differs from an alternative transaction processing approach:

Imagine you are making an order for jelly beans by credit card. A simple retailing system would take your order information, use a credit card validation service to check your credit card number, and then confirm your order - all within a single operation. The thread processing your order would block while waiting for the credit card to be checked, but that block wouldn't be very long for the user, and the server can always run another thread on the processor while it's waiting.
In the LMAX architecture, you would split this operation into two. The first operation would capture the order information and finish by outputting an event (credit card validation requested) to the credit card company. The Business Logic Processor would then carry on processing events for other customers until it received a credit-card-validated event in its input event stream. On processing that event it would carry out the confirmation tasks for that order.
Although Fowler mentions the LMAX architecture, we don't use the Disruptor here for implementation. It's role is taken by an Akka dispatcher in the following example. Nevertheless, the described architecture and message flow remain the same:

The two components in the high-level architecture are:

  • OrderProcessor. An event sourced actor that maintains received orders and their validation state in memory. The OrderProcessor writes any received event message to an event log (journal) so that it's in-memory state can be recovered by replaying these events e.g. after a crash or during normal application start. This actor corresponds to the Business Logic Processor in Fowler's example.
  • CreditCardValidator. A plain remote, stateless actor that validates credit card information of submitted orders on receiving CreditCardValidationRequested events. Depending on the validation outcome it replies with CreditCardValidated or CreditCardValidationFailed event messages to the OrderProcessor.
The example application must meet the following requirements and conditions:

  • The OrderProcessor and the CreditCardValidator must communicate remotely so that they can be deployed separately. The CreditCardValidator is an external service from the OrderProcessor's perspective.
  • The example application must be able to recover from JVM crashes and remote communication errors. OrderProcessor state must be recoverable from logged event messages and running credit card validations must be automatically resumed after crashes. To overcome temporary network problems and remote actor downtimes, remote communication must be re-tried. Long-lasting errors must be escalated.
  • Event message replay during recovery must not redundantly emit validation requests to the CreditCardValidator and validation responses must be recorded in the event log (to solve the external queries problem). This will recover processor state in a deterministic way, making repeated recoveries independent from otherwise potentially different validation responses over time for the same validation request (a credit card may expire, for example).
  • Message processing must be idempotent. This requirement is a consequence of the at-least-once message delivery guarantee supported by Eventsourced.
The full example application code that meets these requirements is part of the Eventsourced project and can be executed with sbt.
The CreditCardValidator can be started with:
> project eventsourced-examples
> run-main org.eligosource.eventsourced.example.CreditCardValidator

The application that runs the OrderProcessor and sends OrderSubmitted events can be started with
> project eventsourced-examples
> run-main org.eligosource.eventsourced.example.OrderProcessor

The example application defines an oversimplified domain class Order
together with the domain events
Whenever the OrderProcessor receives a domain event it appends that event to the event log (journal) before processing it. To add event logging behavior to an actor it must be modified with the stackable Eventsourced trait during construction.
Eventsourced actors only write messages of type Message to the event log (together with the contained event). Messages of other type can be received by an Eventsourced actor as well but aren't logged. The Receiver trait allows the OrderProcessor's receive method to pattern-match against received events directly (instead of Message). It is not required for implementing an event sourced actor but can help to make implementations simpler.
On receiving an OrderSubmitted event, the OrderProcessor extracts the contained order object from the event, updates the order with an order id and stores it in the orders map. The orders map represents the current state of the OrderProcessor (which can be recovered by replaying logged event messages).
After updating the orders map, the OrderProcessor replies to the sender of an OrderSubmitted event with an OrderStored event. This event is a business-level acknowledgement that the received OrderSubmitted event has been successfully written to the event log. Finally, the OrderProcessor emits a CreditCardValidationRequested event message to the CreditCardValidator via reliable request-reply channel (see below). The emitted message is derived from the current event message which can be accessed via the message method of the Receiver trait. Alternatively, the OrderProcessor could also have used an emitter for sending the event (see also channel usage hints).
A reliable request-reply channel is pattern on top of a reliable channel with the following properties: It

  • persists request Messages for failure recovery and preserves message order.
  • extracts requests from received Messages before sending them to a destination.
  • wraps replies from a destination into a Message before sending them back to the request sender.
  • sends a special DestinationNotResponding reply to the request sender if the destination doesn't reply within a configurable timeout.
  • sends a special DestinationFailure reply to the request sender if the destination responds with Status.Failure.
  • guarantees at-least-once delivery of requests to the destination.
  • guarantees at-least-once delivery of replies to the request sender.
  • requires a positive receipt confirmation for a reply to mark a request-reply interaction as successfully completed.
  • redelivers requests, and subsequently replies, on missing or negative receipt confirmations.
  • sends a DeliveryStopped event to the actor system's event stream if the maximum number of delivery attempts has been reached (according to the channel's redelivery policy).
A reliable request-reply channel offers all the properties we need to reliably communicate with the remote CreditCardValidator. The channel is created as child actor of the OrderProcessor when the OrderProcessor receives a SetCreditCardValidator message.
The channel is created with the channelOf method of the actor system's EventsourcingExtension and configured with a ReliableRequestReplyChannelProps object. Configuration data are the channel destination (validator), a redelivery policy and a destination reply timeout. When sending validation requests via the created validationRequestChannel, the OrderProcessor must be prepared for receiving CreditCardValidated, CreditCardValidationFailed, DestinationNotResponding or DestinationFailure replies. These replies are sent to the OrderProcessor inside a Message and are therefore written to the event log. Consequently, OrderProcessor recoveries in the future will replay past reply messages instead of obtaining them again from the validator which ensures deterministic state recovery. Furthermore, the validationRequestChannel will ignore validation requests it receives during a replay, except those whose corresponding replies have not been positively confirmed yet. The following snippet shows how replies are processed by the OrderProcessor.

  • A CreditCardValidated reply updates the creditCardValidation status of the corresponding order to Success and stores the updated order in the orders map. Further actions, such as notifying others that an order has been accepted, are omitted here but are part of the full example code. Then, the receipt of the reply is positively confirmed (confirm(true)) so that the channel doesn't redeliver the corresponding validation request.
  • A CreditCardValidationFailed reply updates the creditCardValidation status of the corresponding order to Failure and stores the updated order in the orders map. Again, further actions are omitted here and the receipt of the reply is positively confirmed.
Because the validationRequestChannel delivers messages at-least-once, we need to detect duplicates in order to make reply processing idempotent. Here, we simply require that the order object to be updated must have a Pending creditCardValidation status before changing state (and notifying others). If the order's status is not Pending, the order has already been updated by a previous reply and the current reply is a duplicate. In this case, the methods onValidationSuccess and onValidationFailure don't have any effect (orders.get(orderId).filter(_.creditCardValidation == Pending) is None). The receipt of the duplicate is still positively confirmed. More general guidelines how to detect duplicates are outlined here.

  • A DestinationNotResponding reply is always confirmed negatively (confirm(false)) so that the channel is going redeliver the validation request to the CreditCardValidator. This may help to overcome temporary network problems, for example, but doesn't handle the case where the maximum number of redeliveries has been reached (see below).
  • A DestinationFailure reply will be negatively confirmed by default unless it has been delivered more than twice. This may help to overcome temporary CreditCardValidator failures i.e. cases where a Status.Failure is returned by the validator.
Should the CreditCardValidator be unavailable for a longer time and the validationRequestChannel reaches the maximum number of redeliveries, it will stop message delivery and publishes a DeliveryStopped event to the actor system's event stream. The channel still continues to accept new event messages and persists them so that the OrderProcessor can continue receiving OrderSubmitted events but the interaction with the CreditCardValidator is suspended. It is now up to the application to re-activate message delivery.
Subscribing to DeliveryStopped events allows an application to escalate a persisting network problem or CreditCardValidator outage by alerting a system administrator or switching to another credit card validation service, for example. In our case, a simple re-activation of the validationRequestChannel is scheduled.
The OrderProcessor subscribes itself to the actor system's event stream. On receiving a DeliveryStopped event it schedules a re-activation of the validationRequestChannel by sending it a Deliver message.
This finally meets all the requirements stated above but there's a lot more to say about external service integration. Examples are external updates or usage of channels that don't preserve message order for optimizing concurrency and throughput. I also didn't cover processor-specific, non-blocking recovery as implemented by the example application. This is enough food for another blog post.



































Thursday, February 23, 2012

Using JAXB for XML and JSON APIs in Scala Web Applications

In the past, I already mentioned several times the implementation of RESTful XML and JSON APIs in Scala web applications using JAXB, without going into details. In this blog post I want to shed more light on this approach together with some links to more advanced examples. A JAXB-based approach to web APIs can be useful if you want to support both XML and JSON representations but only want to maintain a single binding definition for both representations. I should also say that I'm still investigating this approach, so see the following as rather experimental.

First of all, JAXB is a Java standard for binding XML schemas to Java classes. It allows you to convert Java objects to XML documents, and vice versa, based on JAXB annotations on the corresponding Java classes. JAXB doesn't cover JSON but there are libraries that allow you to convert Java objects to JSON (and vice versa) based on the very same JAXB annotations that are used for defining XML bindings. One such library is Jersey's JSON library (jersey-json) which internally uses the Jackson library.

As you'll see in the following, JAXB can also be used together with immutable domain or resource models based on Scala case classes. There's no need to pollute them with getters and setters or Java collections from the java.util package. Necessary conversions from Scala collections or other type constructors (such as Option, for example) to Java types supported by JAXB can be defined externally to the annotated model (and reused). At the end of this blog post, I'll also show some examples how to develop JAXB-based XML and JSON APIs with the Play Framework.

Model

In the following, I'll use a model that consists of the single case class Person(fullname: String, username: Option[String], age: Int). To define Person as root element in the XML schema, the following JAXB annotations should be added.


@XmlRootElement makes Person a root element in the XML schema and @XmlAccessorType(XmlAccessType.FIELD) instructs JAXB to access fields directly instead of using getters and setters. But before we can use the Person class with JAXB a few additional things need to be done.
  • A no-arg constructor or a static no-arg factory method must be provided, otherwise, JAXB doesn't know how to create Person instances. In our example we'll use a no-arg constructor.

  • A person's fullname should be mandatory in the corresponding XML schema. This can be achieved by placing an @XmlElement(required=true) annotation on the field corresponding to the fullname parameter.

  • A person's username should be an optional String in the corresponding XML schema i.e. the username element of the complex Person type should have an XML attribute minOccurs="0". Furthermore, it should be avoided that scala.Option appears as complex type in the XML schema. This can be achieved by providing a type adapter from Option[String] to String via the JAXB @XmlJavaTypeAdapter annotation.

We can implement the above requirements by defining and annotating the Person class as follows:


Let's dissect the above code a bit:
  • The no-arg constructor on the Person class is only needed by JAXB and should therefore be declared private so that it cannot be accessed elsewhere in the application code (unless you're using reflection like JAXB does).

  • Placing JAXB annotations on fields of a case class is a bit tricky. When writing a case class, usually only case class parameters are defined but not fields directly. The Scala compiler then generates the corresponding fields in the resulting .class file. Annotations that are placed on case class parameters are not copied to their corresponding fields, by default. To instruct the Scala compiler to copy these annotations, the Scala @field annotation must be used in addition. This is done in the custom annotation type definitions xmlElement and xmlTypeAdapter. They can be used in the same way as their dependent annotation types XmlElement and XmlJavaTypeAdapter, respectively. Placing the custom @xmlElement annotation on the fullname parameter will cause the Scala compiler to copy the dependent @XmlElement annotation (a JAXB annotation) to the generated fullname field where it can be finally processed by JAXB.

  • To convert between Option[String] (on Scala side) and String (used by JAXB on XML schema side) we implement a JAXB type adapter (interface XmlAdapter). The above example defines a generic OptionAdapter (that can also be reused elsewhere) and a concrete StringOptionAdapter used for the optional username parameter. Please note that annotating the username parameter with @xmlTypeAdapter(classOf[OptionAdapter[String]]) is not sufficient because JAXB will not be able to infer String as the target type (JAXB uses reflection) and will use Object instead (resulting in an XML anyType in the corresponding XML schema). Type adapters can also be used to convert between Scala and Java collection types. Since JAXB can only handle Java collection types you'll need to use type adapters should you want to use Scala collection types in your case classes (and you really should). You can find an example here.

We're now ready to use the Person class to generate an XML schema and to convert Person objects to and from XML or JSON. Please note that the following code examples require JAXB version 2.2.4u2 or higher, otherwise the OptionAdapter won't work properly. The reason is JAXB issue 415. Either use JDK 7u4 or higher which already includes this version or install the required JAXB version manually. The following will write an XML schema, generated from the Person class, to stdout:


The result is:


Marshalling a Person object to XML can be done with


which prints


Unmarshalling creates a Person object from XML.


We have implemented StringOptionAdapter in a way that an empty <username/> element or <username></username> in personXml1 would also yield None on Scala side. Creating JSON from Person objects can be done with the JSONJAXBContext from Jersey's JSON library.


which prints the following to stdout:


Unmarshalling can be done with the context.createJSONUnmarshaller.unmarshalFromJSON method. The JSONConfiguration object provides a number of configuration options that determine how JSON is rendered and parsed. Refer to the official documentation for details.

Play and JAXB

This section shows some examples how to develop JAXB-based XML and JSON APIs with the Play Framework 2.0. It is based on JAXB-specific body parsers and type class instances defined in trait JaxbSupport which is part of the event-sourcing example application (Play-specific work is currently done on the play branch). You can reuse this trait in other applications as is, there are no dependencies to the rest of the project (update: except to SysError). To enable JAXB-based XML and JSON processing for a Play web application, add JaxbSupport to a controller object as follows:


An implicit JSONJAXBContext must be in scope for both XML and JSON processing. For XML processing alone, it is sufficient to have an implicit JAXBContext.

XML and JSON Parsing

JaxbSupport provides Play-specific body parsers that convert XML or JSON request body to instances of JAXB-annotated classes. The following action uses a JAXB body parser that expect an XML body and tries to convert it to a Person instance (using a JAXB unmarshaller).


If the unmarshalling fails or the request Content-Type is other than text/xml or application/xml, a 400 status code (bad request) is returned. Converting a JSON body to a Person instance can be done with the jaxb.parse.json body parser.


If the body parser should be chosen at runtime depending on the Content-Type header, use the dynamic jaxb.parse body parser. The following action is able to process both XML and JSON bodies and convert them to a Person instance.


JaxbSupport also implements the following related body parsers
  • jaxb.parse.xml(maxLength: Int) and jaxb.parse.json(maxLength: Int)

  • jaxb.parse(maxLength: Int) and jaxb.parse(maxLength: Int)

  • jaxb.parse.tolerantXml and jaxb.parse.tolerantJson

  • jaxb.parse.tolerantXml(maxLength: Int) and jaxb.parse.tolerantJson(maxLength: Int)


XML and JSON Rendering

For rendering XML and JSON, JaxbSupport provides the wrapper classes JaxbXml, JaxbJson and Jaxb. The following action renders an XML response from a Person object (using a JAXB marshaller):


whereas


renders a JSON response from a Person object. If you want to do content negotiation based on the Accept request header, use the Jaxb wrapper.


Jaxb requires an implicit request in context for obtaining the Accept request header. If the Accept MIME type is application/xml or text/xml an XML representation is returned, if it is application/json a JSON representation is returned. Further JaxbSupport application examples can be found here.

Friday, January 20, 2012

Building an Event-Sourced Web Application - Part 2: Projections, Persistence, Consumers and Web Interface

UPDATE: A successor of the example application described in this blog post is available here. It is based on the eventsourced library, a library for building reliable, scalable and distributed event-sourced applications in Scala.

A few weeks ago I started a blog post series to summarize my experiences in building an event-sourced web application using Scala and Akka. This was done based on an example application (see branch part-1). There, I gave an overview of the application architecture and presented some details of the immutable domain model and the service layer. Since then, the example application has been extended (see branch part-2) with a number of new features and enhancements. Here's an overview:

Enhancements are:
  • The STM-based state management was completely revised and generalized into the traits UpdateProjection and EventProjection. An UpdateProjection is similar to an Akka Agent: it applies state transition functions asynchronously and can participate in STM transactions. The major difference is that an UpdateProjection is specialized on domain object updates and can log captured events to a persistent event log. By default, update events are logged before state changes are visible via STM references (this is an important difference to the service layer of part 1). UpdateProjection implementors (such as the example application's InvoiceService) are domain event producers. EventProjection implementors, on the other hand, are domain event consumers. They internally use plain Akka Agents to manage state and derive new state values from received domain events (using an application-defined event projection function). EventProjection implementors are usually components that manage read models or coordinate business processes in event-driven architectures, for example.
  • The domain model was enhanced by introducing the domain classes DraftInvoice, SentInvoice and PaidInvoice to represent the states an invoice can have. Valid state transitions are defined by the methods on these domain classes and can therefore be checked by the compiler. This approach is explained more detailed here although the implementation used in our example application slightly differs. In part 1, we only had a single Invoice class and valid state transitions had to be checked at runtime.

New features include:
  • A revised EventLog trait together with two implementations: JournalioEventLog is based on Journal.IO and BookkeeperEventLog on Apache BookKeeper. An EventLog supports synchronous and asynchronous appending of events as well as iterating over stored events, either from the beginning or from an application-defined position in the event history. Event log entries are also assigned sequence numbers so that event consumers can detect gaps in event streams or re-order (resequence) them, if needed.
  • Event consumers. One example is InvoiceStatistics, an EventProjection that derives invoice update statistics from domain events. Here, a separate read model is used (following the CQRS pattern) to serve invoice statistic queries. Another example is InvoiceReplicator, an EventProjection that simply reconstructs the invoice map (as maintained by the InvoiceService) from invoice events at a different location. It can be used to replicate application state across different nodes and to serve (eventually consistent) reads. The replicated state could also be used by a snapshot service to take snapshots of application state. InvoiceReplicator needs to receive events in the correct order and is therefore configured to resequence the received event stream. A third example is the PaymentProcess. It coordinates the activities of InvoiceService and PaymentService. Instead of having these services sending commands to each other, it is the PaymentProcess that sends commands to (i.e. calls methods on) these services in reaction to domain events. This event-driven approach to implementing business processes not only decouples the services from each other but also lets other components extend (or monitor) the business process by subscribing to and reacting on the relevant domain events. The PaymentProcess is currently stateless. Processes that need to maintain state should implement EventProjection and recover the process state during application start (or failover) from the event history.
  • A RESTful web interface for invoices and invoice statistics with support for HTML, XML and JSON representations. These can be negotiated with the HTTP Accept header. The web layer is based on the Jersey web framework (the JAX-RS reference implementation). HTML representations are rendered with the Scalate template engine. The mapping between XML and JSON representations and immutable domain classes is based on JAXB annotations. A JAXB-based XML provider must be supported by any JAX-RS implementation but Jersey additionally comes with a JAXB-based JSON provider so that the same metadata (JAXB annotations) can be used to generate both XML and JSON representations. Following some simple rules, it is possible to JAXB-annotate Scala case classes without polluting them with Java collection types or getters and setters. One major drawback of the current JAX-RS specification is that it doesn't support asynchronous responses yet. This will change with JAX-RS 2.0 and then we can make full use of the asynchronous InvoiceService responses.
  • A communication Channel for connecting domain event producers to consumers. The example application provides a SimpleChannel implementation for local communication. Alternative implementations, for example, could connect to a distributed event bus to communicate events across components of a distributed application. 

Running the example application

The example application can be started with

sbt run-main dev.example.eventsourcing.server.Webserver

Two classes relevant for starting the application are:
  • Appserver: configures the event log, services, read models and processes and connects them via an event channel. It also recovers application state from the event history.
  • Webserver: configures the web service layer and starts an embedded web server (Jetty).
To experiment with the BookKeeper based event log, you need to replace JournalioEventLog with BookkeeperEventLog in Appserver and additionally start a test BookKeeper instance with

sbt run-main dev.example.eventsourcing.server.Zookeeper
sbt run-main dev.example.eventsourcing.server.Bookkeeper

Examples how to interact with the RESTful web interface can be found here.


Service Layer Enhancements

In the service layer implementation from part 1 we've seen how to keep the order of logged events in correspondence with the order of updates to the application state. We used a transient event log that could participate in STM transactions. After the transaction commits, the events from the transient event log have been transferred to a persistent event log. A drawback of this approach is that one can loose updates in case of crashes after an STM reference has been updated but before the changes have been written to the persistent event log. This can lead to situations where clients can already see application state that cannot be recovered from the event log. While some applications may tolerate this, others may require that any visible application state must be fully recoverable from the event log. Therefore, an alternative approach must be chosen.

We need a way to write events, captured during domain object updates, to a persistent event log before the STM reference is updated. But writing to the persistent event log must be done outside an STM transaction for reasons explained in part 1. Updates must also be based on the current (i.e. latest) application state. We therefore need to
  1. Get the current state value from a transactional reference (STM transaction 1)
  2. Update domain object(s) obtained from the current state (no STM transaction)
  3. Write the captured update event(s) to a persistent event log (no STM transaction)
  4. Compute a new state value from the domain object update and write it to the transactional reference (STM transaction 2)
Since steps 1-4 are not a single transaction we must prevent their concurrent execution. This can be achieved using a single actor, for example. This actor is the single writer to the transactional reference. This is actually very similar to how Akka Agents work internally. The main difference in our case is that the computation of a new state value occurs outside an STM transaction, whereas Akka Agents apply update functions within STM transactions. This difference allows us to have side effects, such as writing to a persistent event log. The example application provides the above functionality in the UpdateProjection trait:
  • Instances of UpdateProjection manage (part of) application state with a transactional ref of type Ref[S] where S is the state value type. Clients concurrently read application state via currentState. Sequential writes to the transactional ref are done exclusively by the updater actor (more on write concurrency below).
  • UpdateProjection implementors change application state with the transacted method. The update parameter is a function that computes a domain object Update[Event, B] from current state S where B is a domain object type. The update result (either Success[B] or Failure[DomainError]) is returned as future value from the transacted method.
  • The update function and the underlying Future implementation object (promise) are sent to the updater actor with an ApplyUpdate message. The updater then reads the current state and applies the update function to it. If the update succeeds, it writes the captured events to an EventLog and projects the update result onto the current state. The projection is done with the project function. It creates a new state value from the current state and the update result. The new state value is then finally set on the transactional ref and the promise is completed with the update result.
  • Furthermore, the transacted method can participate in STM transactions. If there's an enclosing transaction, the updater actor will only be triggered if the enclosing transaction successfully commits. If there's no enclosing transaction the updater actor will always be triggered.
The example application uses UpdateProjection to implement the stateful InvoiceService. The state is of type Map[String, Invoice] i.e. a single map containing draft, sent and paid invoices. Here's a simplified version of InvoiceService:

The updateInvoice method uses the transacted method of the UpdateProjection trait. It tries to get an invoice with given invoiceId from the current state and applies the supplied update function f to it. The updateInvoice method is used by updateDraftInvoice for updating draft invoices in the invoice map. The updateDraftInvoice method is used by the service methods addInvoiceItem and sendInvoiceTo. Adding an item to an existing draft invoice yields a future value of an updated draft invoice (return type Future[DomainValidation[DraftInvoice]]). Sending an existing draft invoice, on the other hand, causes a state transition of that invoice to a sent invoice (return type Future[DomainValidation[SentInvoice]]) i.e. the service methods make use of the newly introduced domain object types. The InvoiceService must also implement the abstract members project, initialState and eventLog (declared by Projection and UpdateProjection).
  • The project implementation projects an updated invoice onto the current state by simply adding it to the map of invoices (replacing an old invoice if present).
  • An initialState (empty map by default) and an EventLog instance are provided during construction of an InvoiceService.
The above InvoiceService implementation supports concurrent reads but only sequential writes to the whole invoice map. This may be sufficient for many applications but if a higher degree of write concurrency is needed, one could choose to have a separate UpdateProjection instance per invoice object (which is comparable to have a separate Akka Agent for each invoice object). This allows both concurrent reads and writes to the invoices of an application. The following snippet shows the general idea (not part of the example project).

Here, an InvoiceService maintains a map of PersistentInvoice instances where a PersistentInvoice is an UpdateProjection that contains a reference to a single (draft, sent or paid) invoice. Consequently, updates to different invoices can now be made concurrently. The projection function degenerates to a function that simply returns the updated invoice.

Event Logs

TODO

CQRS and Consistency

TODO

Business Processes

TODO

Web Interface

TODO (see also this blog post)

Tuesday, November 29, 2011

Building an Event-Sourced Web Application - Part 1: Domain Model, Events and State

UPDATE: A successor of the example application described in this blog post is available here. It is based on the eventsourced library, a library for building reliable, scalable and distributed event-sourced applications in Scala.

Over the last few months I was working for a project in which we built an event-sourced healthcare web application from scratch with Scala and Akka. This is the first of a series of blog posts where I want to share some of my experiences and design decisions.

The application architecture follows an architectural pattern that has been described as memory image or system prevalence: application state is kept in memory rather than written to a database. Only state changes are written to a persistent store in terms of domain events. Snapshots of application state are saved to disk at regular intervals. Application state can be reconstructed by replaying the logged events (either beginning from scratch or from a saved snapshot). We've chosen this architecture because

  • The state of our application easily fits within 2 GB of memory. Should this become a limitation in the future, we can easily partition the application state across several nodes (which can also be used to scale writes, if needed).

  • Very low latency can be achieved for read and write access to application state. To ensure atomicity, consistency and isolation, we use a Software Transactional Memory (STM). Durability is achieved with a persistent event log.

  • The application must be able to answer queries not only based on its current state but also based on the history of state changes. Requirements for such queries are often not known from the beginning and can therefore not be considered during initial database design. Using a persistent event log instead, one can build suitable read models any time later by replaying the whole event history.

  • Multiple copies of the application state can be created on other nodes by consuming domain events that have been published by a writer (or leader) node. Should the current writer go down, one can easily switch to another node to achieve high-availability.

From the memory image approach, we excluded all data that are written only once and are not modified later, such as large medical image files or clinical documents. They are stored directly on a (distributed) file system and only references to it are kept in memory.

Application Overview

The following list briefly summarizes some of the concepts and technologies used to implement the different parts of the application. In this blog post I'll focus on the domain model and the service layer. The other parts will be described in follow-up posts.

  • The domain model is an immutable domain model. One advantage of immutable domain objects is that you can safely share with other application components, for example, by sending them along with event messages. Immutable domain objects are also needed when using them together with Software Transactional Memory (see also this article).

  • The service layer provides transactional access to the application state. Application state is managed by transactional references. State values are (immutable) domain objects. State changes occur by updating the transactional references with new domain object values. For transaction management, Akka's Software Transactional Memory (STM) is used. The application's approach to state, identity and concurrency mainly follows the concepts described in State and Identity (which is part of the Clojure documentation).

  • The persistence layer comprises a persistent event log and snapshots of the application state. In the production system, we use Apache BookKeeper as distributed, highly-available and scalable event log. Snapshots are stored on a distributed file system.

  • The web layer provides a RESTful service interface to application resources (domain objects) with support for HTML, XML and JSON as representation formats. We use Jersey as web framework for implementing the RESTful service interface together with Scalate to render the HTML representations of domain objects. For XML and JSON bindings we use JAXB annotations on Scala case classes - seriously :)

  • Read models are used to serve more complex queries that cannot be (efficiently) answered by using the current application state. Read models are also event-sourced. Their structure is optimized to answer complex queries in a very efficient way. The approach to separate write models from read models is called Command Query Responsibility Segregation (CQRS). Read models can be stored in-memory (and reconstructed by replaying events) or persisted to a database. We use Akka agents to maintain in-memory read models.

  • Business process executors are stateful event listeners that implement long-running business processes. In reaction to domain events, they may change application state (via the service layer), coordinate changes across aggregate roots or interact with external services. We use Akka actors to implement business process executors.

  • Versioning of domain objects is used to support conditional updates. Only if the current version of a domain object matches a client-supplied version number, an update can proceed. We added this form of optimistic concurrency control in addition to that of STM transactions.

  • For distributed coordination, we use Apache Zookeeper. There's one node in our distributed system that performs state changes (the current writer or leader). Should the current leader go down, another leader is elected using a leader election algorithm implemented on top of Zookeeper. All nodes in the distributed system can serve (eventually consistent) reads. Strongly consistent reads can only be served by the current leader. We can therefore easily scale eventually consistent reads which is the majority of reads in our application.


Example application

There's also an example project on github that demonstrates how to combine the contents of the following sections to a running application. I'll extend the application as I make progress on this blog post series. The branch referred to by this blog post is part-1.

The example application will cover some elements of our production system but not all of them. To keep things simple, I omitted performance optimizations and decided to use an over-simplified domain model from an other than the healthcare domain.

Domain Model

The approach to implement an immutable domain model was taken from the excellent article series Towards an Immutable Domain Model by Erik Rozendaal. In the following I'll briefly summarize this approach (with some modifications). For a more detailed description, please read through these articles series.

The domain model of the example application is defined by the case classes Invoice, InvoiceItem and InvoiceAddress (see Invoice.scala). Invoice is the aggregate root with methods to add an InvoiceItem, set a discount and send an Invoice (to a destination defined by InvoiceAddress).


The methods addItem, setDiscount and sendTo generate the domain events InvoiceItemAdded, InvoiceDiscountSet and InvoiceSent, respectively (see later). These are then handled by the handle method. The handle method creates Invoice copies with updated members.

In addition to generating domain events, the methods addItem, setDiscount and sendTo not only execute business logic (by checking preconditions, for example) but also capture the generated events. Generated events are captured with the Update monad (a state monad) which is the return type of these methods. This is shown for sendTo.


A successful update adds the generated InvoiceSent event and the updated Invoice to the returned Update instance (using the update method from the EventSourced trait). A failed update is reported with a DomainError (using Update.reject). Update results are instances of Validation (either Success or Failure) which can be obtained by calling result() on the monad


where Success contains the updated Invoice and Failure contains the reported error. To get access to the captured events, call the result method with a callback function.


This function is only called for successful updates with the captured events and the updated Invoice. We will see a concrete usage example later. Since Update is a monad, we can also chain domain object updates with a for-comprehension.


Only if all individual updates are successful, the overall update will be successful. If one or more individual updates fail, the overall update will fail. Finally, we want to be able to reconstruct an Invoice object from the history of events. This can be done with the Invoice.handle(events: List[InvoiceEvent]) method on the Invoice companion object, as shown in the following example.


Having these domain model properties in place, we can now use immutable Invoice objects to define application state and use captured events to write them to a (persistent) event log.

Service Layer

Update: The service layer implementation presented here has been completely revised. The changes are described in the Service Layer Enhancements section of a follow-up blog post.
Application state is managed in the application's service layer. Here, InvoiceService maintains a map of Invoice objects where map keys are invoice ids.


To control concurrent access to the invoices map, a transactional reference (akka.stm.Ref) is used. It is part of Akka's STM (Multiverse) but rewriting the example using Scala's STM shouldn't be a big deal. Using a single transactional reference for all Invoice objects is a rather naive approach because it can easily lead to high contention on the invoicesRef, but it keeps the example as simple as possible. (Using a single transactional reference for all Invoice objects is a reasonable approach only if updates to different Invoice objects depend on each other which is usually not the case, or if there aren't many concurrent writes. We will see better approaches, causing less contention, later in this section). An alternative approach for managing state is using actors but I'll leave that for another blog post.

Changes to the application state are made inside an atomic {...} block, as shown for the addInvoiceItem method (see also InvoiceService.scala).


If any two updates to the Invoice map are conflicting, one update will succeed, the other one will be retried by re-executing the atomic block. Since we are using immutable domain objects, retrying updates is not an issue. What's still missing is to log the captured events (obtained from the Update monad) to a persistent event log. In the following, we require that the order of logged events must correspond to the order of updates to the application state (otherwise we could get problems during a replay of events). We also make the assumption that our persistent event log cannot directly participate in STM transactions (which is the case for Apache BookKeeper, for example). We could try to:

  • Write captured events to the persistent event log within a transaction. Writing to a persistent event log involves IO operations (side-effects) that cannot be rolled back. Should the STM retry the transaction, the atomic block is re-executed and the captured events would be written to the event log again and this is not what we want.

  • Write the captured events after the STM transaction commits with the thread that started the transaction. This would solve the problem of redundantly logged events but then we cannot guarantee any more that the order of logged events still corresponds to the order of updates to the application state. This is because the writes could be done by different threads which introduces a race.

So these two approaches don't work. One possible solution is the following approach:

  1. Write the captured events to a transient, transactional event log inside the atomic block. This ensures that events are not redundantly logged and the order of events corresponds to the order of updates to the application state. The simplest possible transient, transactional event log is a Ref[List[Event]].

  2. Transfer the logged events from the transactional event log to a persistent event log by preserving the order of events. Preserving the order of events can be achieved with a single thread (or actor) that reads from the transactional event log and writes events to the persistent event log. Any time an STM transaction commits we need to trigger this thread (or actor) to do the transfer.

Let's say this functionality is available through an EventLog trait:


where

  • the log method adds events to the transactional eventsRef (must be called within a transaction) and

  • the store* methods transfer the events from the eventsRef to the persistent event log (must be called after the transaction commits). The store method waits for the transfer to complete whereas storeAsync returns immediately.
Having an implementation of EventLog in place, it can be used as follows:


Within the transaction, captured events are added to the transactional event log. After the transaction successfully commits, the events are transferred to the persistent event log. This is done within the deferred block which is executed only once after commit. Here, we don't wait for the events being persisted (storeAsync). We could also extend the storeAsync and addInvoiceItem method signatures to let clients provide an asynchronous completion callback function in order to be notified when events have been written successfully (or an error occurred during writing). A production-ready implementation of EventLog should also provide strategies to recover from errors writing to the persistent event log. In a follow-up blog post I'll show an implementation of EventLog that uses Apache BookKeeper (update: I'll also show how to do write-ahead logging which first writes events to the event log and then, if writing was successful, updates the invoices map. This can be done by queueing up updates). In its current state the example application has an EventLog implementation that stores events in memory, for testing purposes (see TestEventLog.scala)

The InvoiceService also provides methods for reading invoices. Supporting consistent reads is as easy as


Here invoicesRef() implicitly starts a new transaction, so we don't need to make the read operation within an atomic {...} block. There are some situations where consistent reads are needed by clients (for example, in a web application during a post/redirect/get cycle where invoices must be immediately available for reading after their creation). In this case, the InvoiceService should be used. In other situations, eventually consistent reads are sufficient. In that case, we wouldn't use the InvoiceService to obtain Invoice objects, we'd rather obtain it from a separate (event-sourced) read model that is asynchronously generated from published events (using CQRS). This will be shown in a follow-up blog post.

Finally, lets look at some options how contention on the invoicesRef (and the transactional event log) can be reduced (in situations with many concurrent writes). We can say that

  • Updates to different Invoice entities are independent, so we don't need to care about ordering of events in this case.

  • Updates to the same Invoice entity are dependent, so the order of logged events must correspond to the order of updates to that entity.
This means that we can use transactional references for each individual Invoice entity. Consequently, updates to different invoice entities do not interfere with each other.


The transactional reference for the whole map is only needed for the case that new invoices are concurrently added to the map (and to ensure consistent reads, of course). We could also use a separate event log for each Ref[Invoice], so that there's no contention on the transactional event log for independent updates.


In this case, different transactional event logs would share the same persistent event log backend.

Summary

We started to build an event-sourced application whose state is kept entirely in memory, only state changes are persisted to an event log in terms of domain events. Application state is defined by immutable domain objects which are accessed through transactional references. A state change means updating the transactional references with new domain objects (new state value) within a STM transaction. Events that have been generated during domain object updates are written to a transient, transactional event log. Transfer to a persistent event log occurs once the transaction commits. It was also shown how the order of logged events can be kept consistent with the order of state changes. This is important when application state must be recovered from the event history (for example, during application start or failover to another node).

Monday, February 28, 2011

Akka Producer Actors: New Features and Best Practices

In a previous post I wrote about new features and best practices for Akka consumer actors. In this post, I'll cover Akka producer actors. For the following examples to compile and run, you'll need the current Akka 1.1-SNAPSHOT.

Again, I assume that you already have a basic familiarity with Akka, Apache Camel and the akka-camel integration module. If you are new to it, you may want to read the Akka and Camel chapter (free pdf) of the Camel in Action book or the Introduction section of the official akka-camel documentation first.

Basic usage


Akka producer actors can send messages to any Camel endpoint, provided that the corresponding Camel component is on the classpath. This allows Akka actors to interact with external systems or other components over a large number of protocols and APIs.

Let's start with a simple producer actor that sends all messages it receives to an external HTTP service and returns the response to the initial sender. For sending messages over HTTP we can use the Camel jetty component which features an asynchronous HTTP client.


Concrete producer actors inherit a default implementation of Actor.receive from the Producer trait. For simple use cases, only an endpoint URI must be defined. Producer actors also require a started CamelContextManager for working properly. A CamelContextManager is started when an application starts a CamelService e.g. via CamelServiceManager.startCamelService or when starting the CamelContextManager directly via


The latter approach is recommended when an application uses only producer actors but no consumer actors. This slightly reduces the overhead when starting actors. After starting the producer actor, clients can interact with the HTTP service via the actor API.
kra

Here, !! is used for sending the message and waiting for a response. Alternatively, one can also use ! together with an implicit sender reference.


In this case the sender will receive an asynchronous reply from the producer actor. Before, the producer actor itself receives an asynchronous reply from the jetty endpoint. The asynchronous jetty endpoint doesn't block a thread waiting for a response and the producer actor doesn't do that either. This is important from a scalability perspective, especially for longer-running request-response cycles.

By default, a producer actor initiates an in-out message exchange with its Camel endpoint i.e. it expects a response from it. If a producer actor wants to initiate an in-only message exchange then it must override the oneway method to return true. The following example shows a producer actor that initiates an in-only message exchange with a JMS endpoint.


This actor adds any message it receives to the test JMS queue. By default, producer actors that are configured with oneway = true don't reply. This behavior is defined in the Producer.receiveAfterProduce method which is implemented as follows.


The receiveAfterProduce method has the same signature as Actor.receive and is called with the result of the message exchange with the endpoint (please note that in-only message exchanges with Camel endpoints have a result as well). The result type for successful message exchanges is Message, for failed message exchanges it is Failure (see below).

Concrete producer actors can override this method. For example, the following producer actor overrides onReceiveAfterProduce to reply with a constant "done" message.


The result of the message exchange with the JMS endpoint is ignored (case _).

Failures


Messages exchanges with a Camel endpoint can fail. In this case, onReceiveAfterProduce is called with a Failure message containing the cause of the failure (a Throwable). Let's extend the HttpProducer usage example to deal with failure responses.


In addition to a failure cause, a Failure message can also contain endpoint-specific headers with failure details such as the HTTP response code, for example. When using ! instead of !!, together with an implicit sender reference (as shown in the previous section), that sender will then receive the Failure message asynchronously. The JmsReplyingProducer example can also be extended to return more meaningful responses: a "done" message only on success and an error message on failure.


Failed message exchanges never cause the producer actor to throw an exception during execution of receive. Should Producer implementations want to throw an exception on failure (for whatever reason) they can do so in onReceiveAfterProduce.


In this case failure handling should be done in combination with a supervisor (see below).

Let's look at another example. What if we want


to throw an exception on failure (instead of returning a Failure message) but to respond with a normal Message on success? In this case, we need to use self.senderFuture inside onReceiveAfterProduce and complete it with an exception.



Forwarding results


Another option to deal with message exchange results inside onReceiveAfterProduce is to forward them to another actor. Forwarding a message also forwards the initial sender reference. This allows the receiving actor to reply to the initial sender.


With producer actors that forward message exchange results to other actors (incl. other producer actors) one can build actor-based message processing pipelines that integrate external systems. In combination with consumer actors, this could be extended towards a scalable and distributed enterprise service bus (ESB) based on Akka actors ... but this is a topic for another blog post.

Correlation identifiers


The Producer trait also supports correlation identifiers. This allows clients to correlate request messages with asynchronous response messages. A correlation identifier is a message header that can be set by clients. The following example uses the correlation identifier (or message exchange identifier) 123.


An asynchronous response (Message or Failure) from httpProducer will contain that correlation identifier as well.

Fault-tolerance


A failed message exchange by default does not cause a producer actor to throw an exception. However, concrete producer actors may decide to throw an exception inside onReceiveAfterProduce, for example, or there can be a system-level Camel problem that causes a runtime exception. An application that wants to handle these exceptions should supervise its producer actors.

The following example shows how to implement a producer actor that replies to the initial sender with a Failure message when it is restarted or stopped by a supervisor.


To handle restart callbacks, producer actors must override the preRestartProducer method instead of preRestart. The preRestart method is implemented by the Producer trait and does additional resource de-allocation work after calling preRestartProducer. More information about replies within preRestart and postStop can be found in my previous blog post about consumer actors.