A frequently asked question when building event sourced applications is how to interact with external services. This topic is covered to some extend by Martin Fowler's
Event Sourcing article in the sections
External Queries and
External Updates. In this blog post I'll show how to approach external service integration with the
Eventsourced library for
Akka. If you are new to this library, an overview is given in the user guide sections
Overview and
First steps.
The example application presented here was inspired by Fowler's
LMAX article where he describes how event sourcing differs from an alternative transaction processing approach:
Imagine you are making an order for jelly beans by credit card. A simple retailing system would take your order information, use a credit card validation service to check your credit card number, and then confirm your order - all within a single operation. The thread processing your order would block while waiting for the credit card to be checked, but that block wouldn't be very long for the user, and the server can always run another thread on the processor while it's waiting.
In the LMAX architecture, you would split this operation into two. The first operation would capture the order information and finish by outputting an event (credit card validation requested) to the credit card company. The Business Logic Processor would then carry on processing events for other customers until it received a credit-card-validated event in its input event stream. On processing that event it would carry out the confirmation tasks for that order.
Although Fowler mentions the LMAX architecture, we don't use the
Disruptor here for implementation. It's role is taken by an Akka
dispatcher in the following example. Nevertheless, the described architecture and message flow remain the same:
The two components in the high-level architecture are:
OrderProcessor
. An event sourced actor that maintains received orders and their validation state in memory. The OrderProcessor
writes any received event message to an event log (journal) so that it's in-memory state can be recovered by replaying these events e.g. after a crash or during normal application start. This actor corresponds to the Business Logic Processor in Fowler's example.
CreditCardValidator
. A plain remote, stateless actor that validates credit card information of submitted orders on receiving CreditCardValidationRequested
events. Depending on the validation outcome it replies with CreditCardValidated
or CreditCardValidationFailed
event messages to the OrderProcessor
.
The example application must meet the following requirements and conditions:
- The
OrderProcessor
and the CreditCardValidator
must communicate remotely so that they can be deployed separately. The CreditCardValidator
is an external service from the OrderProcessor
's perspective.
- The example application must be able to recover from JVM crashes and remote communication errors.
OrderProcessor
state must be recoverable from logged event messages and running credit card validations must be automatically resumed after crashes. To overcome temporary network problems and remote actor downtimes, remote communication must be re-tried. Long-lasting errors must be escalated.
- Event message replay during recovery must not redundantly emit validation requests to the
CreditCardValidator
and validation responses must be recorded in the event log (to solve the external queries problem). This will recover processor state in a deterministic way, making repeated recoveries independent from otherwise potentially different validation responses over time for the same validation request (a credit card may expire, for example).
- Message processing must be idempotent. This requirement is a consequence of the at-least-once message delivery guarantee supported by Eventsourced.
The
full example application code that meets these requirements is part of the Eventsourced project and can be executed with
sbt.
The
CreditCardValidator
can be started with:
> project eventsourced-examples
> run-main org.eligosource.eventsourced.example.CreditCardValidator
The application that runs the
OrderProcessor
and sends
OrderSubmitted
events can be started with
> project eventsourced-examples
> run-main org.eligosource.eventsourced.example.OrderProcessor
The example application defines an oversimplified domain class
Order
together with the domain events
Whenever the
OrderProcessor
receives a domain event it appends that event to the event log (journal) before processing it. To add event logging behavior to an actor it must be modified with the stackable
Eventsourced
trait during construction.
Eventsourced
actors only write messages of type
Message
to the event log (together with the contained event). Messages of other type can be received by an
Eventsourced
actor as well but aren't logged. The
Receiver
trait allows the
OrderProcessor
's
receive
method to pattern-match against received events directly (instead of
Message
). It is not required for implementing an event sourced actor but can help to make implementations simpler.
On receiving an
OrderSubmitted
event, the
OrderProcessor
extracts the contained
order
object from the event, updates the order with an order id and stores it in the
orders
map. The
orders
map represents the current state of the
OrderProcessor
(which can be recovered by replaying logged event messages).
After updating the
orders
map, the
OrderProcessor
replies to the sender of an
OrderSubmitted
event with an
OrderStored
event. This event is a business-level acknowledgement that the received
OrderSubmitted
event has been successfully written to the event log. Finally, the
OrderProcessor
emits a
CreditCardValidationRequested
event message to the
CreditCardValidator
via reliable request-reply channel (see below). The emitted message is derived from the current event message which can be accessed via the
message
method of the
Receiver
trait. Alternatively, the
OrderProcessor
could also have used an
emitter for sending the event (see also
channel usage hints).
A reliable request-reply channel is pattern on top of a reliable channel with the following properties: It
- persists request
Message
s for failure recovery and preserves message order.
- extracts requests from received
Message
s before sending them to a destination.
- wraps replies from a destination into a
Message
before sending them back to the request sender.
- sends a special
DestinationNotResponding
reply to the request sender if the destination doesn't reply within a configurable timeout.
- sends a special
DestinationFailure
reply to the request sender if the destination responds with Status.Failure
.
- guarantees at-least-once delivery of requests to the destination.
- guarantees at-least-once delivery of replies to the request sender.
- requires a positive receipt confirmation for a reply to mark a request-reply interaction as successfully completed.
- redelivers requests, and subsequently replies, on missing or negative receipt confirmations.
- sends a
DeliveryStopped
event to the actor system's event stream if the maximum number of delivery attempts has been reached (according to the channel's redelivery policy).
A reliable request-reply channel offers all the properties we need to reliably communicate with the remote
CreditCardValidator
. The channel is created as child actor of the
OrderProcessor
when the
OrderProcessor
receives a
SetCreditCardValidator
message.
The channel is created with the
channelOf
method of the actor system's
EventsourcingExtension
and configured with a
ReliableRequestReplyChannelProps
object. Configuration data are the channel destination (
validator
), a redelivery policy and a destination reply timeout. When sending validation requests via the created
validationRequestChannel
, the
OrderProcessor
must be prepared for receiving
CreditCardValidated
,
CreditCardValidationFailed
,
DestinationNotResponding
or
DestinationFailure
replies. These replies are sent to the
OrderProcessor
inside a
Message
and are therefore written to the event log. Consequently,
OrderProcessor
recoveries in the future will replay past reply messages instead of obtaining them again from the validator which ensures deterministic state recovery. Furthermore, the
validationRequestChannel
will ignore validation requests it receives during a replay, except those whose corresponding replies have not been positively confirmed yet. The following snippet shows how replies are processed by the
OrderProcessor
.
- A
CreditCardValidated
reply updates the creditCardValidation
status of the corresponding order to Success
and stores the updated order in the orders
map. Further actions, such as notifying others that an order has been accepted, are omitted here but are part of the full example code. Then, the receipt of the reply is positively confirmed (confirm(true)
) so that the channel doesn't redeliver the corresponding validation request.
- A
CreditCardValidationFailed
reply updates the creditCardValidation
status of the corresponding order to Failure
and stores the updated order in the orders
map. Again, further actions are omitted here and the receipt of the reply is positively confirmed.
Because the
validationRequestChannel
delivers messages at-least-once, we need to detect duplicates in order to make reply processing idempotent. Here, we simply require that the order object to be updated must have a
Pending
creditCardValidation
status before changing state (and notifying others). If the order's status is not
Pending
, the order has already been updated by a previous reply and the current reply is a duplicate. In this case, the methods
onValidationSuccess
and
onValidationFailure
don't have any effect (
orders.get(orderId).filter(_.creditCardValidation == Pending)
is
None
). The receipt of the duplicate is still positively confirmed. More general guidelines how to detect duplicates are outlined
here.
- A
DestinationNotResponding
reply is always confirmed negatively (confirm(false)
) so that the channel is going redeliver the validation request to the CreditCardValidator
. This may help to overcome temporary network problems, for example, but doesn't handle the case where the maximum number of redeliveries has been reached (see below).
- A
DestinationFailure
reply will be negatively confirmed by default unless it has been delivered more than twice. This may help to overcome temporary CreditCardValidator
failures i.e. cases where a Status.Failure
is returned by the validator.
Should the
CreditCardValidator
be unavailable for a longer time and the
validationRequestChannel
reaches the maximum number of redeliveries, it will stop message delivery and publishes a
DeliveryStopped
event to the actor system's event stream. The channel still continues to accept new event messages and persists them so that the
OrderProcessor
can continue receiving
OrderSubmitted
events but the interaction with the
CreditCardValidator
is suspended. It is now up to the application to re-activate message delivery.
Subscribing to
DeliveryStopped
events allows an application to escalate a persisting network problem or
CreditCardValidator
outage by alerting a system administrator or switching to another credit card validation service, for example. In our case, a simple re-activation of the
validationRequestChannel
is scheduled.
The
OrderProcessor
subscribes itself to the actor system's event stream. On receiving a
DeliveryStopped
event it schedules a re-activation of the
validationRequestChannel
by sending it a
Deliver
message.
This finally meets all the requirements stated above but there's a lot more to say about external service integration. Examples are external updates or usage of channels that don't preserve message order for optimizing concurrency and throughput. I also didn't cover processor-specific, non-blocking recovery as implemented by the example application. This is enough food for another blog post.