Tuesday, November 29, 2011

Building an Event-Sourced Web Application - Part 1: Domain Model, Events and State

UPDATE: A successor of the example application described in this blog post is available here. It is based on the eventsourced library, a library for building reliable, scalable and distributed event-sourced applications in Scala.

Over the last few months I was working for a project in which we built an event-sourced healthcare web application from scratch with Scala and Akka. This is the first of a series of blog posts where I want to share some of my experiences and design decisions.

The application architecture follows an architectural pattern that has been described as memory image or system prevalence: application state is kept in memory rather than written to a database. Only state changes are written to a persistent store in terms of domain events. Snapshots of application state are saved to disk at regular intervals. Application state can be reconstructed by replaying the logged events (either beginning from scratch or from a saved snapshot). We've chosen this architecture because

  • The state of our application easily fits within 2 GB of memory. Should this become a limitation in the future, we can easily partition the application state across several nodes (which can also be used to scale writes, if needed).

  • Very low latency can be achieved for read and write access to application state. To ensure atomicity, consistency and isolation, we use a Software Transactional Memory (STM). Durability is achieved with a persistent event log.

  • The application must be able to answer queries not only based on its current state but also based on the history of state changes. Requirements for such queries are often not known from the beginning and can therefore not be considered during initial database design. Using a persistent event log instead, one can build suitable read models any time later by replaying the whole event history.

  • Multiple copies of the application state can be created on other nodes by consuming domain events that have been published by a writer (or leader) node. Should the current writer go down, one can easily switch to another node to achieve high-availability.

From the memory image approach, we excluded all data that are written only once and are not modified later, such as large medical image files or clinical documents. They are stored directly on a (distributed) file system and only references to it are kept in memory.

Application Overview

The following list briefly summarizes some of the concepts and technologies used to implement the different parts of the application. In this blog post I'll focus on the domain model and the service layer. The other parts will be described in follow-up posts.

  • The domain model is an immutable domain model. One advantage of immutable domain objects is that you can safely share with other application components, for example, by sending them along with event messages. Immutable domain objects are also needed when using them together with Software Transactional Memory (see also this article).

  • The service layer provides transactional access to the application state. Application state is managed by transactional references. State values are (immutable) domain objects. State changes occur by updating the transactional references with new domain object values. For transaction management, Akka's Software Transactional Memory (STM) is used. The application's approach to state, identity and concurrency mainly follows the concepts described in State and Identity (which is part of the Clojure documentation).

  • The persistence layer comprises a persistent event log and snapshots of the application state. In the production system, we use Apache BookKeeper as distributed, highly-available and scalable event log. Snapshots are stored on a distributed file system.

  • The web layer provides a RESTful service interface to application resources (domain objects) with support for HTML, XML and JSON as representation formats. We use Jersey as web framework for implementing the RESTful service interface together with Scalate to render the HTML representations of domain objects. For XML and JSON bindings we use JAXB annotations on Scala case classes - seriously :)

  • Read models are used to serve more complex queries that cannot be (efficiently) answered by using the current application state. Read models are also event-sourced. Their structure is optimized to answer complex queries in a very efficient way. The approach to separate write models from read models is called Command Query Responsibility Segregation (CQRS). Read models can be stored in-memory (and reconstructed by replaying events) or persisted to a database. We use Akka agents to maintain in-memory read models.

  • Business process executors are stateful event listeners that implement long-running business processes. In reaction to domain events, they may change application state (via the service layer), coordinate changes across aggregate roots or interact with external services. We use Akka actors to implement business process executors.

  • Versioning of domain objects is used to support conditional updates. Only if the current version of a domain object matches a client-supplied version number, an update can proceed. We added this form of optimistic concurrency control in addition to that of STM transactions.

  • For distributed coordination, we use Apache Zookeeper. There's one node in our distributed system that performs state changes (the current writer or leader). Should the current leader go down, another leader is elected using a leader election algorithm implemented on top of Zookeeper. All nodes in the distributed system can serve (eventually consistent) reads. Strongly consistent reads can only be served by the current leader. We can therefore easily scale eventually consistent reads which is the majority of reads in our application.


Example application

There's also an example project on github that demonstrates how to combine the contents of the following sections to a running application. I'll extend the application as I make progress on this blog post series. The branch referred to by this blog post is part-1.

The example application will cover some elements of our production system but not all of them. To keep things simple, I omitted performance optimizations and decided to use an over-simplified domain model from an other than the healthcare domain.

Domain Model

The approach to implement an immutable domain model was taken from the excellent article series Towards an Immutable Domain Model by Erik Rozendaal. In the following I'll briefly summarize this approach (with some modifications). For a more detailed description, please read through these articles series.

The domain model of the example application is defined by the case classes Invoice, InvoiceItem and InvoiceAddress (see Invoice.scala). Invoice is the aggregate root with methods to add an InvoiceItem, set a discount and send an Invoice (to a destination defined by InvoiceAddress).


The methods addItem, setDiscount and sendTo generate the domain events InvoiceItemAdded, InvoiceDiscountSet and InvoiceSent, respectively (see later). These are then handled by the handle method. The handle method creates Invoice copies with updated members.

In addition to generating domain events, the methods addItem, setDiscount and sendTo not only execute business logic (by checking preconditions, for example) but also capture the generated events. Generated events are captured with the Update monad (a state monad) which is the return type of these methods. This is shown for sendTo.


A successful update adds the generated InvoiceSent event and the updated Invoice to the returned Update instance (using the update method from the EventSourced trait). A failed update is reported with a DomainError (using Update.reject). Update results are instances of Validation (either Success or Failure) which can be obtained by calling result() on the monad


where Success contains the updated Invoice and Failure contains the reported error. To get access to the captured events, call the result method with a callback function.


This function is only called for successful updates with the captured events and the updated Invoice. We will see a concrete usage example later. Since Update is a monad, we can also chain domain object updates with a for-comprehension.


Only if all individual updates are successful, the overall update will be successful. If one or more individual updates fail, the overall update will fail. Finally, we want to be able to reconstruct an Invoice object from the history of events. This can be done with the Invoice.handle(events: List[InvoiceEvent]) method on the Invoice companion object, as shown in the following example.


Having these domain model properties in place, we can now use immutable Invoice objects to define application state and use captured events to write them to a (persistent) event log.

Service Layer

Update: The service layer implementation presented here has been completely revised. The changes are described in the Service Layer Enhancements section of a follow-up blog post.
Application state is managed in the application's service layer. Here, InvoiceService maintains a map of Invoice objects where map keys are invoice ids.


To control concurrent access to the invoices map, a transactional reference (akka.stm.Ref) is used. It is part of Akka's STM (Multiverse) but rewriting the example using Scala's STM shouldn't be a big deal. Using a single transactional reference for all Invoice objects is a rather naive approach because it can easily lead to high contention on the invoicesRef, but it keeps the example as simple as possible. (Using a single transactional reference for all Invoice objects is a reasonable approach only if updates to different Invoice objects depend on each other which is usually not the case, or if there aren't many concurrent writes. We will see better approaches, causing less contention, later in this section). An alternative approach for managing state is using actors but I'll leave that for another blog post.

Changes to the application state are made inside an atomic {...} block, as shown for the addInvoiceItem method (see also InvoiceService.scala).


If any two updates to the Invoice map are conflicting, one update will succeed, the other one will be retried by re-executing the atomic block. Since we are using immutable domain objects, retrying updates is not an issue. What's still missing is to log the captured events (obtained from the Update monad) to a persistent event log. In the following, we require that the order of logged events must correspond to the order of updates to the application state (otherwise we could get problems during a replay of events). We also make the assumption that our persistent event log cannot directly participate in STM transactions (which is the case for Apache BookKeeper, for example). We could try to:

  • Write captured events to the persistent event log within a transaction. Writing to a persistent event log involves IO operations (side-effects) that cannot be rolled back. Should the STM retry the transaction, the atomic block is re-executed and the captured events would be written to the event log again and this is not what we want.

  • Write the captured events after the STM transaction commits with the thread that started the transaction. This would solve the problem of redundantly logged events but then we cannot guarantee any more that the order of logged events still corresponds to the order of updates to the application state. This is because the writes could be done by different threads which introduces a race.

So these two approaches don't work. One possible solution is the following approach:

  1. Write the captured events to a transient, transactional event log inside the atomic block. This ensures that events are not redundantly logged and the order of events corresponds to the order of updates to the application state. The simplest possible transient, transactional event log is a Ref[List[Event]].

  2. Transfer the logged events from the transactional event log to a persistent event log by preserving the order of events. Preserving the order of events can be achieved with a single thread (or actor) that reads from the transactional event log and writes events to the persistent event log. Any time an STM transaction commits we need to trigger this thread (or actor) to do the transfer.

Let's say this functionality is available through an EventLog trait:


where

  • the log method adds events to the transactional eventsRef (must be called within a transaction) and

  • the store* methods transfer the events from the eventsRef to the persistent event log (must be called after the transaction commits). The store method waits for the transfer to complete whereas storeAsync returns immediately.
Having an implementation of EventLog in place, it can be used as follows:


Within the transaction, captured events are added to the transactional event log. After the transaction successfully commits, the events are transferred to the persistent event log. This is done within the deferred block which is executed only once after commit. Here, we don't wait for the events being persisted (storeAsync). We could also extend the storeAsync and addInvoiceItem method signatures to let clients provide an asynchronous completion callback function in order to be notified when events have been written successfully (or an error occurred during writing). A production-ready implementation of EventLog should also provide strategies to recover from errors writing to the persistent event log. In a follow-up blog post I'll show an implementation of EventLog that uses Apache BookKeeper (update: I'll also show how to do write-ahead logging which first writes events to the event log and then, if writing was successful, updates the invoices map. This can be done by queueing up updates). In its current state the example application has an EventLog implementation that stores events in memory, for testing purposes (see TestEventLog.scala)

The InvoiceService also provides methods for reading invoices. Supporting consistent reads is as easy as


Here invoicesRef() implicitly starts a new transaction, so we don't need to make the read operation within an atomic {...} block. There are some situations where consistent reads are needed by clients (for example, in a web application during a post/redirect/get cycle where invoices must be immediately available for reading after their creation). In this case, the InvoiceService should be used. In other situations, eventually consistent reads are sufficient. In that case, we wouldn't use the InvoiceService to obtain Invoice objects, we'd rather obtain it from a separate (event-sourced) read model that is asynchronously generated from published events (using CQRS). This will be shown in a follow-up blog post.

Finally, lets look at some options how contention on the invoicesRef (and the transactional event log) can be reduced (in situations with many concurrent writes). We can say that

  • Updates to different Invoice entities are independent, so we don't need to care about ordering of events in this case.

  • Updates to the same Invoice entity are dependent, so the order of logged events must correspond to the order of updates to that entity.
This means that we can use transactional references for each individual Invoice entity. Consequently, updates to different invoice entities do not interfere with each other.


The transactional reference for the whole map is only needed for the case that new invoices are concurrently added to the map (and to ensure consistent reads, of course). We could also use a separate event log for each Ref[Invoice], so that there's no contention on the transactional event log for independent updates.


In this case, different transactional event logs would share the same persistent event log backend.

Summary

We started to build an event-sourced application whose state is kept entirely in memory, only state changes are persisted to an event log in terms of domain events. Application state is defined by immutable domain objects which are accessed through transactional references. A state change means updating the transactional references with new domain objects (new state value) within a STM transaction. Events that have been generated during domain object updates are written to a transient, transactional event log. Transfer to a persistent event log occurs once the transaction commits. It was also shown how the order of logged events can be kept consistent with the order of state changes. This is important when application state must be recovered from the event history (for example, during application start or failover to another node).

Monday, February 28, 2011

Akka Producer Actors: New Features and Best Practices

In a previous post I wrote about new features and best practices for Akka consumer actors. In this post, I'll cover Akka producer actors. For the following examples to compile and run, you'll need the current Akka 1.1-SNAPSHOT.

Again, I assume that you already have a basic familiarity with Akka, Apache Camel and the akka-camel integration module. If you are new to it, you may want to read the Akka and Camel chapter (free pdf) of the Camel in Action book or the Introduction section of the official akka-camel documentation first.

Basic usage


Akka producer actors can send messages to any Camel endpoint, provided that the corresponding Camel component is on the classpath. This allows Akka actors to interact with external systems or other components over a large number of protocols and APIs.

Let's start with a simple producer actor that sends all messages it receives to an external HTTP service and returns the response to the initial sender. For sending messages over HTTP we can use the Camel jetty component which features an asynchronous HTTP client.


Concrete producer actors inherit a default implementation of Actor.receive from the Producer trait. For simple use cases, only an endpoint URI must be defined. Producer actors also require a started CamelContextManager for working properly. A CamelContextManager is started when an application starts a CamelService e.g. via CamelServiceManager.startCamelService or when starting the CamelContextManager directly via


The latter approach is recommended when an application uses only producer actors but no consumer actors. This slightly reduces the overhead when starting actors. After starting the producer actor, clients can interact with the HTTP service via the actor API.
kra

Here, !! is used for sending the message and waiting for a response. Alternatively, one can also use ! together with an implicit sender reference.


In this case the sender will receive an asynchronous reply from the producer actor. Before, the producer actor itself receives an asynchronous reply from the jetty endpoint. The asynchronous jetty endpoint doesn't block a thread waiting for a response and the producer actor doesn't do that either. This is important from a scalability perspective, especially for longer-running request-response cycles.

By default, a producer actor initiates an in-out message exchange with its Camel endpoint i.e. it expects a response from it. If a producer actor wants to initiate an in-only message exchange then it must override the oneway method to return true. The following example shows a producer actor that initiates an in-only message exchange with a JMS endpoint.


This actor adds any message it receives to the test JMS queue. By default, producer actors that are configured with oneway = true don't reply. This behavior is defined in the Producer.receiveAfterProduce method which is implemented as follows.


The receiveAfterProduce method has the same signature as Actor.receive and is called with the result of the message exchange with the endpoint (please note that in-only message exchanges with Camel endpoints have a result as well). The result type for successful message exchanges is Message, for failed message exchanges it is Failure (see below).

Concrete producer actors can override this method. For example, the following producer actor overrides onReceiveAfterProduce to reply with a constant "done" message.


The result of the message exchange with the JMS endpoint is ignored (case _).

Failures


Messages exchanges with a Camel endpoint can fail. In this case, onReceiveAfterProduce is called with a Failure message containing the cause of the failure (a Throwable). Let's extend the HttpProducer usage example to deal with failure responses.


In addition to a failure cause, a Failure message can also contain endpoint-specific headers with failure details such as the HTTP response code, for example. When using ! instead of !!, together with an implicit sender reference (as shown in the previous section), that sender will then receive the Failure message asynchronously. The JmsReplyingProducer example can also be extended to return more meaningful responses: a "done" message only on success and an error message on failure.


Failed message exchanges never cause the producer actor to throw an exception during execution of receive. Should Producer implementations want to throw an exception on failure (for whatever reason) they can do so in onReceiveAfterProduce.


In this case failure handling should be done in combination with a supervisor (see below).

Let's look at another example. What if we want


to throw an exception on failure (instead of returning a Failure message) but to respond with a normal Message on success? In this case, we need to use self.senderFuture inside onReceiveAfterProduce and complete it with an exception.



Forwarding results


Another option to deal with message exchange results inside onReceiveAfterProduce is to forward them to another actor. Forwarding a message also forwards the initial sender reference. This allows the receiving actor to reply to the initial sender.


With producer actors that forward message exchange results to other actors (incl. other producer actors) one can build actor-based message processing pipelines that integrate external systems. In combination with consumer actors, this could be extended towards a scalable and distributed enterprise service bus (ESB) based on Akka actors ... but this is a topic for another blog post.

Correlation identifiers


The Producer trait also supports correlation identifiers. This allows clients to correlate request messages with asynchronous response messages. A correlation identifier is a message header that can be set by clients. The following example uses the correlation identifier (or message exchange identifier) 123.


An asynchronous response (Message or Failure) from httpProducer will contain that correlation identifier as well.

Fault-tolerance


A failed message exchange by default does not cause a producer actor to throw an exception. However, concrete producer actors may decide to throw an exception inside onReceiveAfterProduce, for example, or there can be a system-level Camel problem that causes a runtime exception. An application that wants to handle these exceptions should supervise its producer actors.

The following example shows how to implement a producer actor that replies to the initial sender with a Failure message when it is restarted or stopped by a supervisor.


To handle restart callbacks, producer actors must override the preRestartProducer method instead of preRestart. The preRestart method is implemented by the Producer trait and does additional resource de-allocation work after calling preRestartProducer. More information about replies within preRestart and postStop can be found in my previous blog post about consumer actors.

Thursday, February 17, 2011

Akka Consumer Actors: New Features and Best Practices

In this blog post I want to give some guidance how to implement consumer actors with the akka-camel module. Besides basic usage scenarios, I will also explain how to make consumer actors fault-tolerant, redeliver messages on failure, deal with bounded mailboxes etc. The code examples shown below require the current Akka 1.1-SNAPSHOT to compile and run.

In the following, I assume that you already have a basic familiarity with Akka, Apache Camel and the akka-camel integration module. If you are new to it, you may want to read the Akka and Camel chapter (free pdf) of the Camel in Action book or the Introduction section of the official akka-camel documentation first.

Basic usage

Akka consumer actors can receive messages from any Camel endpoint, provided that the corresponding Camel component is on the classpath. This allows clients to interact with Akka actors over a large number of protocols and APIs.

Camel endpoints either initiate in-only (one-way) message exchanges with consumer actors or in-out (two-way) message exchanges. Replies from consumer actors are mandatory for in-out message exchanges but optional for in-only message exchanges. For replying to a Camel endpoint, the consumer actor uses the very same interface as for replying to any other sender (e.g. to another actor). Examples are self.reply or self.reply_?.

Let's start by defining a simple consumer actor that accepts messages via tcp on port 6200 and replies to the tcp client (tcp support is given by Camel's mina component).


For consumer actors to work, applications need to start a CamelService which is managed by the CamelServiceManager.


When starting a consumer actor, the endpoint defined for that actor will be activated asynchronously by the CamelService. If your application wants to wait for consumer endpoints to be finally activated you can do so with the awaitEndpointActivation method (which is especially useful for testing).


For sending a test message to the consumer actor, the above code uses a Camel ProducerTemplate which can be obtained from the CamelContextManager.

If Camel endpoints, such as the file endpoint, create in-only message exchanges then consumer actors need not reply, by default. The message exchange is completed once the input message has been added to the consumer actor's mailbox.


When placing a file into the data/input directory, the Camel file endpoint will pick up that file and send its content as message to the consumer actor. Once the message is in the actor's mailbox, the file endpoint will delete the corresponding file (see delete=true in the endpoint URI).

If you want to let the consumer actor decide when the file should be deleted, then you'll need to turn auto-acknowledgements off as shown in the following example (autoack = false). In this case the consumer actor must reply with a special Ack message when message processing is done. This asynchronous reply finally causes the file endpoint to delete the consumed file.


Turning auto-acknowledgements on and off is only relevant for in-only message exchanges because, for in-out message exchanges, consumer actors need to reply in any case with an (application-specific) message. Consumer actors may also reply with a Failure message to indicate a processing failure. Failure replies can be made for both in-only and in-out message exchanges. A Failure reply can be done inside receive method but there are better ways as shown in the next sections.

Fault-tolerance and message redelivery

Message processing inside receive may throw exceptions which usually requires a failure response to Camel (i.e. to the consumer endpoint). This is done with a Failure message that contains the failure reason (an instance of Throwable). Instead of catching and handling the exception inside receive, consumer actors should be part of supervisor hierarchies and send failure responses from within restart callback methods. Here's an example of a fault-tolerant file consumer.


The above file consumer overrides the preRestart and postStop callback methods to send reply messages to Camel. A reply within preRestart and postStop is possible after receive has thrown an exception (new feature since Akka 1.1). When receive returns normally it is expected that any necessary reply has already been done within receive.
  • If the lifecycle of the SupervisedFileConsumer is configured to be PERMANENT, a supervisor will restart the consumer upon failure with a call to preRestart. Within preRestart a Failure reply is sent which causes the file endpoint to redeliver the content of the consumed file and the consumer actor can try to process it again. Should the processing succeed in a second attempt, an Ack is sent within receive. A reply within preRestart must be a safe reply via self.reply_? because an unsafe self.reply will throw an exception when the consumer is restarted without having failed. This can be the case in context of all-for-one restart strategies.
  • If the lifecycle of the SupervisedFileConsumer is configured to be TEMPORARY, a supervisor will shut down the consumer upon failure with a call to postStop. Within postStop an Ack is sent which causes the file endpoint to delete the file. One can, of course, choose to reply with a Failure here, so that files that couldn't be processed successfully are kept in the input directory. A reply within postStop must be a safe reply via self.reply_? because an unsafe self.reply will throw an exception when the consumer has been stopped by the application (and not by a supervisor) after successful execution of receive.

Another frequently discussed consumer actor example is a fault-tolerant JMS consumer. A JMS consumer actor should acknowledge a message receipt upon successful message processing and trigger a message redelivery on failure. This is exactly the same pattern as described for the SupervisedFileConsumer above. You just need to change the file endpoint URI to a jms or activemq endpoint URI and you're done (of course, you additionally need to configure the JMS connection with a redelivery policy and, optionally, use transacted queues. An explanation how to do this would however exceed the scope of this blog post).

Simplifications and tradeoffs with blocking=true

In all the examples so far the internally created Camel routes use the ! (bang) operator to send the input message to the consumer actor. This means that the Camel route does not block a thread waiting for a response. It's an asynchronous reply will cause the Camel route to resume processing. That's also the reason why any exception thrown by receive isn't reported back to Camel directly but must be done explicitly with a Failure response.

If you want that exceptions thrown by receive are reported back to Camel directly (i.e. without sending Failure responses) then you'll need to set blocking = true for the consumer actor. This causes the Camel route to send the input message with the !! (bangbang) operator and to wait for a response. However, this will block a thread until the consumer sends a response or throws an exception within receive. The advantage of this approach is that error handling is strongly simplified in this case but scalability will likely decrease.

Here's an example of a consumer actor that uses the simplified approach to error handling. Any exception thrown by receive will still cause the file endpoint to redeliver the message but a thread will be blocked by Camel during the execution of receive.


No supervisor is needed here. It depends on the non-functional requirements of your application whether to go for this simple but blocking approach or to use a more scalable, non-blocking approach in combination with a supervisor.

Bounded mailboxes and error handling with custom Camel routes

For consumer actors that require a significant amount of time for processing a single message, it can make sense to install a bounded mailbox. A bounded mailbox throws an exception if its capacity is reached and the Camel route tries to add additional messages to the mailbox. Here's an example of a file consumer actor that uses a bounded mailbox with a capacity of 5. Processing is artificially delayed by 1 second using a Thread.sleep.


When, let's say, 10 files are put into the data/input directory, they will be picked up by the file endpoint and added to the actor's mailbox. The capacity of the mailbox will be reached soon because the file endpoint can send messages much faster than the consumer actor can process it. Exceptions thrown by the mailbox are directly reported to the Camel route which causes the file consumer to redeliver messages until they can be added to the mailbox. The same applies to JMS and other endpoints that support redelivery.

When dealing with endpoints that do not support redelivery, one needs to customize the Camel route to the consumer actor with a special error handler that does the redelivery. This is shown for a consumer actor that consumes messages from a direct endpoint.


Here we use onRouteDefinition to define how the Camel route should be customized during its creation. In this example, an error handler is defined that attempts max. 3 redeliveries with a delay of 1000 ms. For details refer to the intercepting route construction section in the akka-camel documentation. When using a producer template to send messages to this endpoint, some of them will be added to the mailbox on first attempt, some of them after a second attempt triggered by the error handler.


The examples presented in this post cover many of the consumer-actor-related questions and topics that have been asked and discussed on the akka-user mailing list. In another post I plan to cover best practices for implementing Akka producer actors.