Tuesday, November 29, 2011

Building an Event-Sourced Web Application - Part 1: Domain Model, Events and State

UPDATE: A successor of the example application described in this blog post is available here. It is based on the eventsourced library, a library for building reliable, scalable and distributed event-sourced applications in Scala.

Over the last few months I was working for a project in which we built an event-sourced healthcare web application from scratch with Scala and Akka. This is the first of a series of blog posts where I want to share some of my experiences and design decisions.

The application architecture follows an architectural pattern that has been described as memory image or system prevalence: application state is kept in memory rather than written to a database. Only state changes are written to a persistent store in terms of domain events. Snapshots of application state are saved to disk at regular intervals. Application state can be reconstructed by replaying the logged events (either beginning from scratch or from a saved snapshot). We've chosen this architecture because

  • The state of our application easily fits within 2 GB of memory. Should this become a limitation in the future, we can easily partition the application state across several nodes (which can also be used to scale writes, if needed).

  • Very low latency can be achieved for read and write access to application state. To ensure atomicity, consistency and isolation, we use a Software Transactional Memory (STM). Durability is achieved with a persistent event log.

  • The application must be able to answer queries not only based on its current state but also based on the history of state changes. Requirements for such queries are often not known from the beginning and can therefore not be considered during initial database design. Using a persistent event log instead, one can build suitable read models any time later by replaying the whole event history.

  • Multiple copies of the application state can be created on other nodes by consuming domain events that have been published by a writer (or leader) node. Should the current writer go down, one can easily switch to another node to achieve high-availability.

From the memory image approach, we excluded all data that are written only once and are not modified later, such as large medical image files or clinical documents. They are stored directly on a (distributed) file system and only references to it are kept in memory.

Application Overview

The following list briefly summarizes some of the concepts and technologies used to implement the different parts of the application. In this blog post I'll focus on the domain model and the service layer. The other parts will be described in follow-up posts.

  • The domain model is an immutable domain model. One advantage of immutable domain objects is that you can safely share with other application components, for example, by sending them along with event messages. Immutable domain objects are also needed when using them together with Software Transactional Memory (see also this article).

  • The service layer provides transactional access to the application state. Application state is managed by transactional references. State values are (immutable) domain objects. State changes occur by updating the transactional references with new domain object values. For transaction management, Akka's Software Transactional Memory (STM) is used. The application's approach to state, identity and concurrency mainly follows the concepts described in State and Identity (which is part of the Clojure documentation).

  • The persistence layer comprises a persistent event log and snapshots of the application state. In the production system, we use Apache BookKeeper as distributed, highly-available and scalable event log. Snapshots are stored on a distributed file system.

  • The web layer provides a RESTful service interface to application resources (domain objects) with support for HTML, XML and JSON as representation formats. We use Jersey as web framework for implementing the RESTful service interface together with Scalate to render the HTML representations of domain objects. For XML and JSON bindings we use JAXB annotations on Scala case classes - seriously :)

  • Read models are used to serve more complex queries that cannot be (efficiently) answered by using the current application state. Read models are also event-sourced. Their structure is optimized to answer complex queries in a very efficient way. The approach to separate write models from read models is called Command Query Responsibility Segregation (CQRS). Read models can be stored in-memory (and reconstructed by replaying events) or persisted to a database. We use Akka agents to maintain in-memory read models.

  • Business process executors are stateful event listeners that implement long-running business processes. In reaction to domain events, they may change application state (via the service layer), coordinate changes across aggregate roots or interact with external services. We use Akka actors to implement business process executors.

  • Versioning of domain objects is used to support conditional updates. Only if the current version of a domain object matches a client-supplied version number, an update can proceed. We added this form of optimistic concurrency control in addition to that of STM transactions.

  • For distributed coordination, we use Apache Zookeeper. There's one node in our distributed system that performs state changes (the current writer or leader). Should the current leader go down, another leader is elected using a leader election algorithm implemented on top of Zookeeper. All nodes in the distributed system can serve (eventually consistent) reads. Strongly consistent reads can only be served by the current leader. We can therefore easily scale eventually consistent reads which is the majority of reads in our application.


Example application

There's also an example project on github that demonstrates how to combine the contents of the following sections to a running application. I'll extend the application as I make progress on this blog post series. The branch referred to by this blog post is part-1.

The example application will cover some elements of our production system but not all of them. To keep things simple, I omitted performance optimizations and decided to use an over-simplified domain model from an other than the healthcare domain.

Domain Model

The approach to implement an immutable domain model was taken from the excellent article series Towards an Immutable Domain Model by Erik Rozendaal. In the following I'll briefly summarize this approach (with some modifications). For a more detailed description, please read through these articles series.

The domain model of the example application is defined by the case classes Invoice, InvoiceItem and InvoiceAddress (see Invoice.scala). Invoice is the aggregate root with methods to add an InvoiceItem, set a discount and send an Invoice (to a destination defined by InvoiceAddress).


The methods addItem, setDiscount and sendTo generate the domain events InvoiceItemAdded, InvoiceDiscountSet and InvoiceSent, respectively (see later). These are then handled by the handle method. The handle method creates Invoice copies with updated members.

In addition to generating domain events, the methods addItem, setDiscount and sendTo not only execute business logic (by checking preconditions, for example) but also capture the generated events. Generated events are captured with the Update monad (a state monad) which is the return type of these methods. This is shown for sendTo.


A successful update adds the generated InvoiceSent event and the updated Invoice to the returned Update instance (using the update method from the EventSourced trait). A failed update is reported with a DomainError (using Update.reject). Update results are instances of Validation (either Success or Failure) which can be obtained by calling result() on the monad


where Success contains the updated Invoice and Failure contains the reported error. To get access to the captured events, call the result method with a callback function.


This function is only called for successful updates with the captured events and the updated Invoice. We will see a concrete usage example later. Since Update is a monad, we can also chain domain object updates with a for-comprehension.


Only if all individual updates are successful, the overall update will be successful. If one or more individual updates fail, the overall update will fail. Finally, we want to be able to reconstruct an Invoice object from the history of events. This can be done with the Invoice.handle(events: List[InvoiceEvent]) method on the Invoice companion object, as shown in the following example.


Having these domain model properties in place, we can now use immutable Invoice objects to define application state and use captured events to write them to a (persistent) event log.

Service Layer

Update: The service layer implementation presented here has been completely revised. The changes are described in the Service Layer Enhancements section of a follow-up blog post.
Application state is managed in the application's service layer. Here, InvoiceService maintains a map of Invoice objects where map keys are invoice ids.


To control concurrent access to the invoices map, a transactional reference (akka.stm.Ref) is used. It is part of Akka's STM (Multiverse) but rewriting the example using Scala's STM shouldn't be a big deal. Using a single transactional reference for all Invoice objects is a rather naive approach because it can easily lead to high contention on the invoicesRef, but it keeps the example as simple as possible. (Using a single transactional reference for all Invoice objects is a reasonable approach only if updates to different Invoice objects depend on each other which is usually not the case, or if there aren't many concurrent writes. We will see better approaches, causing less contention, later in this section). An alternative approach for managing state is using actors but I'll leave that for another blog post.

Changes to the application state are made inside an atomic {...} block, as shown for the addInvoiceItem method (see also InvoiceService.scala).


If any two updates to the Invoice map are conflicting, one update will succeed, the other one will be retried by re-executing the atomic block. Since we are using immutable domain objects, retrying updates is not an issue. What's still missing is to log the captured events (obtained from the Update monad) to a persistent event log. In the following, we require that the order of logged events must correspond to the order of updates to the application state (otherwise we could get problems during a replay of events). We also make the assumption that our persistent event log cannot directly participate in STM transactions (which is the case for Apache BookKeeper, for example). We could try to:

  • Write captured events to the persistent event log within a transaction. Writing to a persistent event log involves IO operations (side-effects) that cannot be rolled back. Should the STM retry the transaction, the atomic block is re-executed and the captured events would be written to the event log again and this is not what we want.

  • Write the captured events after the STM transaction commits with the thread that started the transaction. This would solve the problem of redundantly logged events but then we cannot guarantee any more that the order of logged events still corresponds to the order of updates to the application state. This is because the writes could be done by different threads which introduces a race.

So these two approaches don't work. One possible solution is the following approach:

  1. Write the captured events to a transient, transactional event log inside the atomic block. This ensures that events are not redundantly logged and the order of events corresponds to the order of updates to the application state. The simplest possible transient, transactional event log is a Ref[List[Event]].

  2. Transfer the logged events from the transactional event log to a persistent event log by preserving the order of events. Preserving the order of events can be achieved with a single thread (or actor) that reads from the transactional event log and writes events to the persistent event log. Any time an STM transaction commits we need to trigger this thread (or actor) to do the transfer.

Let's say this functionality is available through an EventLog trait:


where

  • the log method adds events to the transactional eventsRef (must be called within a transaction) and

  • the store* methods transfer the events from the eventsRef to the persistent event log (must be called after the transaction commits). The store method waits for the transfer to complete whereas storeAsync returns immediately.
Having an implementation of EventLog in place, it can be used as follows:


Within the transaction, captured events are added to the transactional event log. After the transaction successfully commits, the events are transferred to the persistent event log. This is done within the deferred block which is executed only once after commit. Here, we don't wait for the events being persisted (storeAsync). We could also extend the storeAsync and addInvoiceItem method signatures to let clients provide an asynchronous completion callback function in order to be notified when events have been written successfully (or an error occurred during writing). A production-ready implementation of EventLog should also provide strategies to recover from errors writing to the persistent event log. In a follow-up blog post I'll show an implementation of EventLog that uses Apache BookKeeper (update: I'll also show how to do write-ahead logging which first writes events to the event log and then, if writing was successful, updates the invoices map. This can be done by queueing up updates). In its current state the example application has an EventLog implementation that stores events in memory, for testing purposes (see TestEventLog.scala)

The InvoiceService also provides methods for reading invoices. Supporting consistent reads is as easy as


Here invoicesRef() implicitly starts a new transaction, so we don't need to make the read operation within an atomic {...} block. There are some situations where consistent reads are needed by clients (for example, in a web application during a post/redirect/get cycle where invoices must be immediately available for reading after their creation). In this case, the InvoiceService should be used. In other situations, eventually consistent reads are sufficient. In that case, we wouldn't use the InvoiceService to obtain Invoice objects, we'd rather obtain it from a separate (event-sourced) read model that is asynchronously generated from published events (using CQRS). This will be shown in a follow-up blog post.

Finally, lets look at some options how contention on the invoicesRef (and the transactional event log) can be reduced (in situations with many concurrent writes). We can say that

  • Updates to different Invoice entities are independent, so we don't need to care about ordering of events in this case.

  • Updates to the same Invoice entity are dependent, so the order of logged events must correspond to the order of updates to that entity.
This means that we can use transactional references for each individual Invoice entity. Consequently, updates to different invoice entities do not interfere with each other.


The transactional reference for the whole map is only needed for the case that new invoices are concurrently added to the map (and to ensure consistent reads, of course). We could also use a separate event log for each Ref[Invoice], so that there's no contention on the transactional event log for independent updates.


In this case, different transactional event logs would share the same persistent event log backend.

Summary

We started to build an event-sourced application whose state is kept entirely in memory, only state changes are persisted to an event log in terms of domain events. Application state is defined by immutable domain objects which are accessed through transactional references. A state change means updating the transactional references with new domain objects (new state value) within a STM transaction. Events that have been generated during domain object updates are written to a transient, transactional event log. Transfer to a persistent event log occurs once the transaction commits. It was also shown how the order of logged events can be kept consistent with the order of state changes. This is important when application state must be recovered from the event history (for example, during application start or failover to another node).

15 comments:

  1. Martin,

    thanks for sharing your experience on practicing with 'real world Scala'!

    This is a very nice example which might serve many Scala newcomers (and of course also some more settled ones) as a showcase on how to bring some of the 'new' ideas like CQRS, STM, Actors, ... into life with Scala.

    Enjoyed reading your great post really a lot! Keep up the good work and don't let us wait all too long for the next part ...

    Greetings

    Mario

    ReplyDelete
  2. Martin -

    Nice post. Thanks for sharing your experiences. I have one question on model migration. How do you handle domain model migration ? Say your domain model for Invoice changes - how do you handle this when you replay from history with an earlier version of the model ?

    Cheers.
    - Debasish

    ReplyDelete
  3. @Debasish

    The brief answer is: if only the structure/behavior of domain objects changes but not the stored domain events then only the event handler needs to be re-written and a replay is possible for the complete history. If, in addition, the structure of domain events changes (or those parts of the domain objects which are stored along with the events, such as InvoiceItem for example), we re-write the event handler in a way that it only handles the latest version of domain events and convert older versions to newer ones during replay. The conversion is done by an event converter that can be added to a converter chain. With every domain event schema change we also provide a corresponding converter that is added to the existing chain of converters. I plan to write more about model migration in an upcoming blog post together with a running example.

    Cheers,
    Martin

    ReplyDelete
  4. @Martin

    Thanks for the clarification. One more question .. why did u decide to add conditional updates since u r already using an STM ? Just curious about the usecase that this would serve when the client needs to supply the version number explicitly. I know CouchDB supports this versioning as the form of optimistic concurrency control. But it doesn't have any other transaction support.

    Cheers.
    Debasish

    ReplyDelete
  5. @Debasish

    I'm using conditional updates with version numbers to support transactions that run longer than STM transactions. For example, a user loads an invoice into the browser, manually makes some changes and submits those changes a minute later. The STM transaction is a short-running transaction that executes during submission of the update, while the user transaction is a long-running transaction that takes about a minute (in this example). So you could have overlapping/conflicting user transactions without having conflicting STM transactions. The version number is used to avoid conflicts during (long-running) user transactions.

    One could use blocking STM transactions that can be retried when a certain condition is not met, but they don't seem to be suitable for interactive conflict resolution: a user might want to look at the latest changes that have been concurrently made before submitting a modified update. Should I miss some important feature of the Akka/Multiverse STM here please let me know.

    Furthermore, not all entities in our production system are versioned. In an application where all entities are versioned it would make sense to have a single, version-number-based, optimistic concurrency mechanism as done by CouchDB (where all documents are versioned).

    Cheers,
    Martin

    ReplyDelete
  6. @Martin

    I must be missing something. Looks like in your case the user transaction boundary starts from the point the user loads the invoice. What if you change it to after the user hits the submit button ? Say user1 loads invoice 1 (version 1). Simultaneously user2 also loads invoice 1 (version 1). Both make changes and does a submit. If I have the transaction starting once the users hit submit, then the STM should take care of the updates. That means one of the updates will go through and the other will be rejected since it's read record has changed versions because of the update by the other competing transaction.

    Or am I missing anything ?

    Cheers.

    ReplyDelete
  7. @Debasish

    In this case one would have to use Ref.atomicCompareAndSet (see http://multiverse.codehaus.org/javadoc/org/multiverse/api/references/Ref.html#atomicCompareAndSet%28E,%20E%29) and retry the transaction manually (i.e. in the application code) should the method return false. This approach could be useful in cases where each Invoice instance is managed by its own Ref. In other cases where a Ref contains a whole invoices map (as in the example application), one would need to make comparisons against the whole map. This could lead to some unnecessary retries in cases of concurrent user transactions that make updates on different invoice instances (here, the updates are independent of each other).

    Furthermore, according to the API docs, Ref.compareAndSet doesn't care about any running transactions which (I think) could lead to unexpected behavior if the same Ref participates in other STM transactions. I'm also using STM because I don't want to do the retries myself in the application code. For these reasons and the very little implementation effort of the proprietary conditional update mechanism, I prefer the approach taken by the example application. Should I miss an something here, please let me know.

    Regarding transaction boundaries, the STM (write-)transaction in any case starts when the user hits the submit button, regardless if this approach or the approach of the example application is taken.

    Cheers,
    Martin

    ReplyDelete
  8. @Martin

    Ok .. so in your use case we have a single Ref managing multiple invoices. In case you managed *all* transactions through the STM, if 2 users update 2 diff invoices even then there will be multiple retries by the STM since we have 1 Ref for all of them. To avoid this, your strategy is to have a separate version field along with each Invoice. You manage this separately and allow updates in User transactions to go through so long the versions are consistent and updates are being made on diff invoice instances. The STM blocking part will be reduced to only the time of submit by the user.

    So is it true that if 2 users try updating the same invoice then u flag an error *before* the STM takes control of the transaction ?

    Cheers.

    ReplyDelete
  9. @Debasish

    Yes, this is roughly how it works in the latest version of the example app: the STM transaction is only executed if the version check succeeds. In the initial version, the version check was made inside the STM transaction but the Ref was only updated if the version check succeeds. The following paragraphs refer to the latest version of the example app (with more details to come in the next part of this blog post series).

    The current implementation is very similar to how Akka Agents are implemented. Updates are done asynchronously by a single writer actor (i.e. there are sequential updates to the Ref) whereas reads go to Ref directly. Having a single Ref writer allows for

    - getting the latest Ref value (e.g. invoice map) first (read-tx)
    - performing the version check and an invoice update (outside tx), and, if both succeed
    - writing the updated map to the Ref in a separate transaction (inside write-tx)

    Since there are no concurrent updates between the read-tx and the write-tx it is safe to run the version check outside the transaction. This behavior is implemented in the UpdateProjection trait which is implemented by InvoiceService. The major difference to Akka Agents is that the update function is executed outside the STM transaction. So far to the relation of version checks (and calling of update functions) to STM transactions.

    Should one want a higher degree of concurrency for invoice updates one could choose an UpdateProjection implementation that manages a single invoice (which is like having a separate agent for each invoice)

    However, the main reason for having chosen this approach was a different one: write-ahead logging. In the initial version of the example app, updates to the Ref have been visible before events have been written to the event log (write-behind). Write-ahead logging was not supported. On the other hand, having a single writer actor one can write events to the event log before the Ref is updated. Should the write to the event log fail, the Ref isn't updated either. Here, it is important that writing to the event log occurs outside of the STM transaction, otherwise, we'd have redundant event log entries should the STM retry the transaction.

    ReplyDelete
  10. @Martin

    Thanks for the explanation. Would be interested to know how this "memory image" architectural pattern scales out in practice. I am sure you will have some figures when you deploy your application in the real world. Of course the latency factor will be drastically reduced. In a distributed environment (when the memory image > single machine memory) what challenges do you foresee ?

    I did a small implementation of CQRS with Akka and functional domain models quite some time back (http://debasishg.blogspot.com/2011/01/cqrs-with-akka-actors-and-functional.html). However my projection / read model was based out of an RDBMS which was used for querying the current state of the application. Implementation wise it's much simpler than the "memory image" pattern. My implementation was actor based and did not use any STM since I don't have to manage the current state in memory.

    Cheers.

    ReplyDelete
  11. @Debasish

    I plan to write about the memory image pattern in a distributed application in a separate blog post (part 3, maybe). There, I also want to provide some real numbers about scaling out reads and writes plus some deployment figures. In the following are some general thoughts about scaling out reads and writes.

    In our distributed setup, we currently have a single master and n slaves. We do scale out reads by directing requests to the master and slaves, writes go to the single master (the current master/leader is known by ZooKeeper). Scaling out reads is rather trivial: events, generated by the master, are sent to slaves (async by default). The slaves re-create application state and read models from the events and serve read requests. This is similar to a distributed cache, except that the cache is not populated on demand but driven by events.

    Scaling out writes can be done by partitioning application state across nodes. In this case, one would have several masters where each master is responsible for a particular partition (metadata about which master is assigned which partition is managed within ZooKeeper). Creating partitions is also necessary when the memory image gets bigger than the memory of a single node. In our domain (healthcare), creation of partitions is rather easy because patient data are managed within medical records that do not contain references to entries in other medical records. So writing to a medical record on one node will (usually) not involve reads or writes to other nodes which makes it easier to scale out writes. I see a bigger challenge for scaling-out writes (and large memory images) when application data have no 'natural' boundary for partitioning (for example, when there are many cross-references etc).

    We also plan to evaluate 'big memory' solutions (Terracotta BigMemory, for example) to deal with the 2GB limit on a single machine. At the moment, I can't say if this will work with our STM approach. Will need to do some research ...

    Cheers,
    Martin

    ReplyDelete
  12. Hi Martin,

    I noticed in your latest event-sourced implementation (Eligosource) that you had removed the idea of an 'Aggregate' completely from your code. Was this just to simplify the example or is it a more thought-through design decision? I am working on a distributed event sourced state management implementation for data subscriptions using ZooKeeper to partition across nodes.

    Look forward to hearing your thoughts.

    -Gary

    ReplyDelete
    Replies
    1. Hi Gary,

      it was a design goal of the new eventsourced library not to make any assumption about design approaches taken by applications such as DDD. Of course, it makes perfectly sense to follow a DDD approach and use it together with the library but you don't have to.

      The biggest change in the new example app compared to the old one, however, is that is also uses command-sourcing in addition to event-sourcing. The eventsourced library allows you to do both with the same mechansims. This simplified the implementation a lot.

      Your project sounds very interesting. Do you plan writing a blog post about it?

      Cheers,
      Martin

      Delete
  13. here(http://www.jdon.org) is Java Event Sourcing and CQRS open source example, I think java's event is simple than scala.

    ReplyDelete
  14. Hi Martin
    This discussion is really interesting. My thoughts on this are: I would like to make an architecture diagram based on your article.

    ReplyDelete