Friday, January 20, 2012

Building an Event-Sourced Web Application - Part 2: Projections, Persistence, Consumers and Web Interface

UPDATE: A successor of the example application described in this blog post is available here. It is based on the eventsourced library, a library for building reliable, scalable and distributed event-sourced applications in Scala.

A few weeks ago I started a blog post series to summarize my experiences in building an event-sourced web application using Scala and Akka. This was done based on an example application (see branch part-1). There, I gave an overview of the application architecture and presented some details of the immutable domain model and the service layer. Since then, the example application has been extended (see branch part-2) with a number of new features and enhancements. Here's an overview:

Enhancements are:
  • The STM-based state management was completely revised and generalized into the traits UpdateProjection and EventProjection. An UpdateProjection is similar to an Akka Agent: it applies state transition functions asynchronously and can participate in STM transactions. The major difference is that an UpdateProjection is specialized on domain object updates and can log captured events to a persistent event log. By default, update events are logged before state changes are visible via STM references (this is an important difference to the service layer of part 1). UpdateProjection implementors (such as the example application's InvoiceService) are domain event producers. EventProjection implementors, on the other hand, are domain event consumers. They internally use plain Akka Agents to manage state and derive new state values from received domain events (using an application-defined event projection function). EventProjection implementors are usually components that manage read models or coordinate business processes in event-driven architectures, for example.
  • The domain model was enhanced by introducing the domain classes DraftInvoice, SentInvoice and PaidInvoice to represent the states an invoice can have. Valid state transitions are defined by the methods on these domain classes and can therefore be checked by the compiler. This approach is explained more detailed here although the implementation used in our example application slightly differs. In part 1, we only had a single Invoice class and valid state transitions had to be checked at runtime.

New features include:
  • A revised EventLog trait together with two implementations: JournalioEventLog is based on Journal.IO and BookkeeperEventLog on Apache BookKeeper. An EventLog supports synchronous and asynchronous appending of events as well as iterating over stored events, either from the beginning or from an application-defined position in the event history. Event log entries are also assigned sequence numbers so that event consumers can detect gaps in event streams or re-order (resequence) them, if needed.
  • Event consumers. One example is InvoiceStatistics, an EventProjection that derives invoice update statistics from domain events. Here, a separate read model is used (following the CQRS pattern) to serve invoice statistic queries. Another example is InvoiceReplicator, an EventProjection that simply reconstructs the invoice map (as maintained by the InvoiceService) from invoice events at a different location. It can be used to replicate application state across different nodes and to serve (eventually consistent) reads. The replicated state could also be used by a snapshot service to take snapshots of application state. InvoiceReplicator needs to receive events in the correct order and is therefore configured to resequence the received event stream. A third example is the PaymentProcess. It coordinates the activities of InvoiceService and PaymentService. Instead of having these services sending commands to each other, it is the PaymentProcess that sends commands to (i.e. calls methods on) these services in reaction to domain events. This event-driven approach to implementing business processes not only decouples the services from each other but also lets other components extend (or monitor) the business process by subscribing to and reacting on the relevant domain events. The PaymentProcess is currently stateless. Processes that need to maintain state should implement EventProjection and recover the process state during application start (or failover) from the event history.
  • A RESTful web interface for invoices and invoice statistics with support for HTML, XML and JSON representations. These can be negotiated with the HTTP Accept header. The web layer is based on the Jersey web framework (the JAX-RS reference implementation). HTML representations are rendered with the Scalate template engine. The mapping between XML and JSON representations and immutable domain classes is based on JAXB annotations. A JAXB-based XML provider must be supported by any JAX-RS implementation but Jersey additionally comes with a JAXB-based JSON provider so that the same metadata (JAXB annotations) can be used to generate both XML and JSON representations. Following some simple rules, it is possible to JAXB-annotate Scala case classes without polluting them with Java collection types or getters and setters. One major drawback of the current JAX-RS specification is that it doesn't support asynchronous responses yet. This will change with JAX-RS 2.0 and then we can make full use of the asynchronous InvoiceService responses.
  • A communication Channel for connecting domain event producers to consumers. The example application provides a SimpleChannel implementation for local communication. Alternative implementations, for example, could connect to a distributed event bus to communicate events across components of a distributed application. 

Running the example application

The example application can be started with

sbt run-main dev.example.eventsourcing.server.Webserver

Two classes relevant for starting the application are:
  • Appserver: configures the event log, services, read models and processes and connects them via an event channel. It also recovers application state from the event history.
  • Webserver: configures the web service layer and starts an embedded web server (Jetty).
To experiment with the BookKeeper based event log, you need to replace JournalioEventLog with BookkeeperEventLog in Appserver and additionally start a test BookKeeper instance with

sbt run-main dev.example.eventsourcing.server.Zookeeper
sbt run-main dev.example.eventsourcing.server.Bookkeeper

Examples how to interact with the RESTful web interface can be found here.

Service Layer Enhancements

In the service layer implementation from part 1 we've seen how to keep the order of logged events in correspondence with the order of updates to the application state. We used a transient event log that could participate in STM transactions. After the transaction commits, the events from the transient event log have been transferred to a persistent event log. A drawback of this approach is that one can loose updates in case of crashes after an STM reference has been updated but before the changes have been written to the persistent event log. This can lead to situations where clients can already see application state that cannot be recovered from the event log. While some applications may tolerate this, others may require that any visible application state must be fully recoverable from the event log. Therefore, an alternative approach must be chosen.

We need a way to write events, captured during domain object updates, to a persistent event log before the STM reference is updated. But writing to the persistent event log must be done outside an STM transaction for reasons explained in part 1. Updates must also be based on the current (i.e. latest) application state. We therefore need to
  1. Get the current state value from a transactional reference (STM transaction 1)
  2. Update domain object(s) obtained from the current state (no STM transaction)
  3. Write the captured update event(s) to a persistent event log (no STM transaction)
  4. Compute a new state value from the domain object update and write it to the transactional reference (STM transaction 2)
Since steps 1-4 are not a single transaction we must prevent their concurrent execution. This can be achieved using a single actor, for example. This actor is the single writer to the transactional reference. This is actually very similar to how Akka Agents work internally. The main difference in our case is that the computation of a new state value occurs outside an STM transaction, whereas Akka Agents apply update functions within STM transactions. This difference allows us to have side effects, such as writing to a persistent event log. The example application provides the above functionality in the UpdateProjection trait:
  • Instances of UpdateProjection manage (part of) application state with a transactional ref of type Ref[S] where S is the state value type. Clients concurrently read application state via currentState. Sequential writes to the transactional ref are done exclusively by the updater actor (more on write concurrency below).
  • UpdateProjection implementors change application state with the transacted method. The update parameter is a function that computes a domain object Update[Event, B] from current state S where B is a domain object type. The update result (either Success[B] or Failure[DomainError]) is returned as future value from the transacted method.
  • The update function and the underlying Future implementation object (promise) are sent to the updater actor with an ApplyUpdate message. The updater then reads the current state and applies the update function to it. If the update succeeds, it writes the captured events to an EventLog and projects the update result onto the current state. The projection is done with the project function. It creates a new state value from the current state and the update result. The new state value is then finally set on the transactional ref and the promise is completed with the update result.
  • Furthermore, the transacted method can participate in STM transactions. If there's an enclosing transaction, the updater actor will only be triggered if the enclosing transaction successfully commits. If there's no enclosing transaction the updater actor will always be triggered.
The example application uses UpdateProjection to implement the stateful InvoiceService. The state is of type Map[String, Invoice] i.e. a single map containing draft, sent and paid invoices. Here's a simplified version of InvoiceService:

The updateInvoice method uses the transacted method of the UpdateProjection trait. It tries to get an invoice with given invoiceId from the current state and applies the supplied update function f to it. The updateInvoice method is used by updateDraftInvoice for updating draft invoices in the invoice map. The updateDraftInvoice method is used by the service methods addInvoiceItem and sendInvoiceTo. Adding an item to an existing draft invoice yields a future value of an updated draft invoice (return type Future[DomainValidation[DraftInvoice]]). Sending an existing draft invoice, on the other hand, causes a state transition of that invoice to a sent invoice (return type Future[DomainValidation[SentInvoice]]) i.e. the service methods make use of the newly introduced domain object types. The InvoiceService must also implement the abstract members project, initialState and eventLog (declared by Projection and UpdateProjection).
  • The project implementation projects an updated invoice onto the current state by simply adding it to the map of invoices (replacing an old invoice if present).
  • An initialState (empty map by default) and an EventLog instance are provided during construction of an InvoiceService.
The above InvoiceService implementation supports concurrent reads but only sequential writes to the whole invoice map. This may be sufficient for many applications but if a higher degree of write concurrency is needed, one could choose to have a separate UpdateProjection instance per invoice object (which is comparable to have a separate Akka Agent for each invoice object). This allows both concurrent reads and writes to the invoices of an application. The following snippet shows the general idea (not part of the example project).

Here, an InvoiceService maintains a map of PersistentInvoice instances where a PersistentInvoice is an UpdateProjection that contains a reference to a single (draft, sent or paid) invoice. Consequently, updates to different invoices can now be made concurrently. The projection function degenerates to a function that simply returns the updated invoice.

Event Logs


CQRS and Consistency


Business Processes


Web Interface

TODO (see also this blog post)