Martin Krasser's Blog
Tuesday, January 6, 2015
Monday, December 16, 2013
Introduction to Akka Persistence
Akka Persistence is a new module in Akka 2.3. At the time of writing this post, it is available as milestone release (2.3-M2). Akka Persistence adds actor state persistence and at-least-once message delivery semantics to Akka. It is inspired by and the successor of the eventsourced project. They share many high-level concepts but completely differ on API and implementation level.
To persist an actor's state, only changes to that actor's state are written to a journal, not current state directly. These changes are appended as immutable facts to a journal, nothing is ever mutated, which allows for very high transaction rates and efficient replication. Actor state can be recovered by replaying stored changes and projecting them again. This not only allows state recovery after an actor has been restarted by a supervisor but also after JVM or node crashes, for example. State changes are defined in terms of messages an actor receives (or generates).
Persistence of messages also forms the basis for supporting at-least-once message delivery semantics. This requires retries to counter transport losses, which means keeping state at the sending end and having an acknowledgement mechanism at the receiving end (see Message Delivery Guarantees in Akka). Akka Persistence supports that for point-to-point communications. Reliable point-to-point communications are an important part of highly scalable applications (see also Pet Helland's position paper Life beyond Distributed Transactions).
The following gives a high-level overview of the current features in Akka Persistence. Links to more detailed documentation are included.
Event sourced processors do not persist commands. Instead they allow application code to derive events from a command and atomically persist these events. After persistence, they are applied to current state. During recovery, events are replayed and only the state-changing behavior of an event sourced processor is executed again. Other side effects that have been executed during normal operation are not performed again.
Processors automatically recover themselves. Akka Persistence guarantees that new messages sent to a processor never interleave with replayed messages. New messages are internally buffered until recovery completes, hence, an application may send messages to a processor immediately after creating it.
Updates:
To persist an actor's state, only changes to that actor's state are written to a journal, not current state directly. These changes are appended as immutable facts to a journal, nothing is ever mutated, which allows for very high transaction rates and efficient replication. Actor state can be recovered by replaying stored changes and projecting them again. This not only allows state recovery after an actor has been restarted by a supervisor but also after JVM or node crashes, for example. State changes are defined in terms of messages an actor receives (or generates).
Persistence of messages also forms the basis for supporting at-least-once message delivery semantics. This requires retries to counter transport losses, which means keeping state at the sending end and having an acknowledgement mechanism at the receiving end (see Message Delivery Guarantees in Akka). Akka Persistence supports that for point-to-point communications. Reliable point-to-point communications are an important part of highly scalable applications (see also Pet Helland's position paper Life beyond Distributed Transactions).
The following gives a high-level overview of the current features in Akka Persistence. Links to more detailed documentation are included.
Processors
Processors are persistent actors. They internally communicate with a journal to persist messages they receive or generate. They may also request message replay from a journal to recover internal state in failure cases. Processors may either persist messages- before an actor's behavior is executed (command sourcing) or
- during an actor's behavior is executed (event sourcing)
Event sourced processors do not persist commands. Instead they allow application code to derive events from a command and atomically persist these events. After persistence, they are applied to current state. During recovery, events are replayed and only the state-changing behavior of an event sourced processor is executed again. Other side effects that have been executed during normal operation are not performed again.
Processors automatically recover themselves. Akka Persistence guarantees that new messages sent to a processor never interleave with replayed messages. New messages are internally buffered until recovery completes, hence, an application may send messages to a processor immediately after creating it.
Snapshots
The recovery time of a processor increases with the number of messages that have been written by that processor. To reduce recovery time, applications may take snapshots of processor state which can be used as starting points for message replay. Usage of snapshots is optional and only needed for optimization.Channels
Channels are actors that provide at-least-once message delivery semantics between a sending processor and a receiver that acknowledges the receipt of messages on application level. They also ensure that successfully acknowledged messages are not delivered again to receivers during processor recovery (i.e. replay of messages). Applications that want to have reliable message delivery without application-defined sending processors should use persistent channels. A persistent channel is like a normal channel that additionally persists messages before sending them to a receiver.Journals
Journals (and snapshot stores) are pluggable in Akka Persistence. The default journal plugin is backed by LevelDB which writes messages to the local filesystem. A replicated journal is planned but not yet part of the distribution. Replicated journals allow stateful actors to be migrated in a cluster, for example. For testing purposes, a remotely shared LevelDB journal can be used instead of a replicated journal to experiment with stateful actor migration. Application code doesn't need to change when switching to a replicated journal later.Updates:
Wednesday, March 20, 2013
Eventsourced for Akka - A high-level technical overview
Eventsourced is an Akka extension that adds scalable actor state persistence and at-least-once message delivery guarantees to Akka. With Eventsourced, stateful actors
Logged messages represent intended changes to an actor's state. Logging changes instead of updating current state is one of the core concept of event sourcing. Eventsourced can be used to implement event sourcing concepts but it is not limited to that. More details about Eventsourced and its relation to event sourcing can be found here.
Eventsourced can also be used to make message exchanges between actors reliable so that they can be resumed after crashes, for example. For that purpose, channels with at-least-once message delivery guarantees are provided. Channels also prevent that output messages, sent by persistent actors, are redundantly delivered during replays which is relevant for message exchanges between these actors and other services.
Messages wrapped inside Message are logged by a processor, unwrapped messages are not logged. Logging behavior is implemented by the Eventsourced trait, a processor's receive method doesn't need to care about that. Acknowledging a successful write to a sender can be done by sending a reply. A processor can also hot-swap its behavior by still keeping its logging functionality.
Processors are registered at an EventsourcingExtension. This extension provides methods to recover processor state by replaying logged messages. Processors can be registered and recovered at any time during an application run.
Eventsourced doesn't impose any restrictions how processors maintain state. A processor can use vars, mutable data structures or STM references, for example.
A processor may also sent messages directly to another actor without using a channel. In this case that actor will redundantly receive messages during processor recovery.
Eventsourced provides three different channel types (more are planned).
- persist received messages by appending them to a log (journal)
- project received messages to derive current state
- usually hold current state in memory (memory image)
- recover current (or past) state by replaying received messages (during normal application start or after crashes)
- never persist current state directly (except optional state snapshots for recovery time optimization)
Logged messages represent intended changes to an actor's state. Logging changes instead of updating current state is one of the core concept of event sourcing. Eventsourced can be used to implement event sourcing concepts but it is not limited to that. More details about Eventsourced and its relation to event sourcing can be found here.
Eventsourced can also be used to make message exchanges between actors reliable so that they can be resumed after crashes, for example. For that purpose, channels with at-least-once message delivery guarantees are provided. Channels also prevent that output messages, sent by persistent actors, are redundantly delivered during replays which is relevant for message exchanges between these actors and other services.
Building blocks
The core building blocks provided by Eventsourced are processors, channels and journals. These are managed by an Akka extension, the EventsourcingExtension.Processor
A processor is a stateful actor that logs (persists) messages it receives. A stateful actor is turned into a processor by modifying it with the stackable Eventsourced trait during construction. A processor can be used like any other actor.Messages wrapped inside Message are logged by a processor, unwrapped messages are not logged. Logging behavior is implemented by the Eventsourced trait, a processor's receive method doesn't need to care about that. Acknowledging a successful write to a sender can be done by sending a reply. A processor can also hot-swap its behavior by still keeping its logging functionality.
Processors are registered at an EventsourcingExtension. This extension provides methods to recover processor state by replaying logged messages. Processors can be registered and recovered at any time during an application run.
Eventsourced doesn't impose any restrictions how processors maintain state. A processor can use vars, mutable data structures or STM references, for example.
Channel
Channels are used by processors for sending messages to other actors (channel destinations) and receiving replies from them. Channels- require their destinations to confirm the receipt of messages for providing at-least-once delivery guarantees (explicit ack-retry protocol). Receipt confirmations are written to a log.
- prevent redundant delivery of messages to destinations during processor recovery (replay of messages). Replayed messages with matching receipt confirmations are dropped by the corresponding channels.
A processor may also sent messages directly to another actor without using a channel. In this case that actor will redundantly receive messages during processor recovery.
Eventsourced provides three different channel types (more are planned).
- Default channel
- Does not store received messages.
- Re-delivers uncomfirmed messages only during recovery of the sending processor.
- Order of messages as sent by a processor is not preserved in failure cases.
- Reliable channel
- Stores received messages.
- Re-delivers unconfirmed messages based on a configurable re-delivery policy.
- Order of messages as sent by a processor is preserved, even in failure cases.
- Often used to deal with unreliable remote destinations.
- Reliable request-reply channel
- Same as reliable channel but additionally guarantees at-least-once delivery of replies.
- Order of replies not guaranteed to correspond to the order of sent request messages.
Journal
A journal is an actor that is used by processors and channels to log messages and receipt confirmations. The quality of service (availability, scalability, ...) provided by a journal depends on the used storage technology. The Journals section in the user guide gives an overview of existing journal implementations and their development status.References
Labels:
akka,
event sourcing,
eventsourced,
scala,
scalability
Thursday, January 31, 2013
Event sourcing and external service integration
A frequently asked question when building event sourced applications is how to interact with external services. This topic is covered to some extend by Martin Fowler's Event Sourcing article in the sections External Queries and External Updates. In this blog post I'll show how to approach external service integration with the Eventsourced library for Akka. If you are new to this library, an overview is given in the user guide sections Overview and First steps.
The example application presented here was inspired by Fowler's LMAX article where he describes how event sourcing differs from an alternative transaction processing approach:
The two components in the high-level architecture are:
The
The application that runs the
The example application defines an oversimplified domain class
together with the domain events
Whenever the
On receiving an
After updating the
A reliable request-reply channel is pattern on top of a reliable channel with the following properties: It
The channel is created with the
Subscribing to
The
This finally meets all the requirements stated above but there's a lot more to say about external service integration. Examples are external updates or usage of channels that don't preserve message order for optimizing concurrency and throughput. I also didn't cover processor-specific, non-blocking recovery as implemented by the example application. This is enough food for another blog post.
The example application presented here was inspired by Fowler's LMAX article where he describes how event sourcing differs from an alternative transaction processing approach:
Imagine you are making an order for jelly beans by credit card. A simple retailing system would take your order information, use a credit card validation service to check your credit card number, and then confirm your order - all within a single operation. The thread processing your order would block while waiting for the credit card to be checked, but that block wouldn't be very long for the user, and the server can always run another thread on the processor while it's waiting.Although Fowler mentions the LMAX architecture, we don't use the Disruptor here for implementation. It's role is taken by an Akka dispatcher in the following example. Nevertheless, the described architecture and message flow remain the same:
In the LMAX architecture, you would split this operation into two. The first operation would capture the order information and finish by outputting an event (credit card validation requested) to the credit card company. The Business Logic Processor would then carry on processing events for other customers until it received a credit-card-validated event in its input event stream. On processing that event it would carry out the confirmation tasks for that order.
The two components in the high-level architecture are:
OrderProcessor
. An event sourced actor that maintains received orders and their validation state in memory. TheOrderProcessor
writes any received event message to an event log (journal) so that it's in-memory state can be recovered by replaying these events e.g. after a crash or during normal application start. This actor corresponds to the Business Logic Processor in Fowler's example.CreditCardValidator
. A plain remote, stateless actor that validates credit card information of submitted orders on receivingCreditCardValidationRequested
events. Depending on the validation outcome it replies withCreditCardValidated
orCreditCardValidationFailed
event messages to theOrderProcessor
.
- The
OrderProcessor
and theCreditCardValidator
must communicate remotely so that they can be deployed separately. TheCreditCardValidator
is an external service from theOrderProcessor
's perspective. - The example application must be able to recover from JVM crashes and remote communication errors.
OrderProcessor
state must be recoverable from logged event messages and running credit card validations must be automatically resumed after crashes. To overcome temporary network problems and remote actor downtimes, remote communication must be re-tried. Long-lasting errors must be escalated. - Event message replay during recovery must not redundantly emit validation requests to the
CreditCardValidator
and validation responses must be recorded in the event log (to solve the external queries problem). This will recover processor state in a deterministic way, making repeated recoveries independent from otherwise potentially different validation responses over time for the same validation request (a credit card may expire, for example). - Message processing must be idempotent. This requirement is a consequence of the at-least-once message delivery guarantee supported by Eventsourced.
The
CreditCardValidator
can be started with:
> project eventsourced-examples
> run-main org.eligosource.eventsourced.example.CreditCardValidator
The application that runs the
OrderProcessor
and sends OrderSubmitted
events can be started with
> project eventsourced-examples
> run-main org.eligosource.eventsourced.example.OrderProcessor
The example application defines an oversimplified domain class
Order
together with the domain events
Whenever the
OrderProcessor
receives a domain event it appends that event to the event log (journal) before processing it. To add event logging behavior to an actor it must be modified with the stackable Eventsourced
trait during construction.
Eventsourced
actors only write messages of type Message
to the event log (together with the contained event). Messages of other type can be received by an Eventsourced
actor as well but aren't logged. The Receiver
trait allows the OrderProcessor
's receive
method to pattern-match against received events directly (instead of Message
). It is not required for implementing an event sourced actor but can help to make implementations simpler.
On receiving an
OrderSubmitted
event, the OrderProcessor
extracts the contained order
object from the event, updates the order with an order id and stores it in the orders
map. The orders
map represents the current state of the OrderProcessor
(which can be recovered by replaying logged event messages).
After updating the
orders
map, the OrderProcessor
replies to the sender of an OrderSubmitted
event with an OrderStored
event. This event is a business-level acknowledgement that the received OrderSubmitted
event has been successfully written to the event log. Finally, the OrderProcessor
emits a CreditCardValidationRequested
event message to the CreditCardValidator
via reliable request-reply channel (see below). The emitted message is derived from the current event message which can be accessed via the message
method of the Receiver
trait. Alternatively, the OrderProcessor
could also have used an emitter for sending the event (see also channel usage hints).
A reliable request-reply channel is pattern on top of a reliable channel with the following properties: It
- persists request
Message
s for failure recovery and preserves message order. - extracts requests from received
Message
s before sending them to a destination. - wraps replies from a destination into a
Message
before sending them back to the request sender. - sends a special
DestinationNotResponding
reply to the request sender if the destination doesn't reply within a configurable timeout. - sends a special
DestinationFailure
reply to the request sender if the destination responds withStatus.Failure
. - guarantees at-least-once delivery of requests to the destination.
- guarantees at-least-once delivery of replies to the request sender.
- requires a positive receipt confirmation for a reply to mark a request-reply interaction as successfully completed.
- redelivers requests, and subsequently replies, on missing or negative receipt confirmations.
- sends a
DeliveryStopped
event to the actor system's event stream if the maximum number of delivery attempts has been reached (according to the channel's redelivery policy).
CreditCardValidator
. The channel is created as child actor of the OrderProcessor
when the OrderProcessor
receives a SetCreditCardValidator
message.
The channel is created with the
channelOf
method of the actor system's EventsourcingExtension
and configured with a ReliableRequestReplyChannelProps
object. Configuration data are the channel destination (validator
), a redelivery policy and a destination reply timeout. When sending validation requests via the created validationRequestChannel
, the OrderProcessor
must be prepared for receiving CreditCardValidated
, CreditCardValidationFailed
, DestinationNotResponding
or DestinationFailure
replies. These replies are sent to the OrderProcessor
inside a Message
and are therefore written to the event log. Consequently, OrderProcessor
recoveries in the future will replay past reply messages instead of obtaining them again from the validator which ensures deterministic state recovery. Furthermore, the validationRequestChannel
will ignore validation requests it receives during a replay, except those whose corresponding replies have not been positively confirmed yet. The following snippet shows how replies are processed by the OrderProcessor
.
- A
CreditCardValidated
reply updates thecreditCardValidation
status of the corresponding order toSuccess
and stores the updated order in theorders
map. Further actions, such as notifying others that an order has been accepted, are omitted here but are part of the full example code. Then, the receipt of the reply is positively confirmed (confirm(true)
) so that the channel doesn't redeliver the corresponding validation request. - A
CreditCardValidationFailed
reply updates thecreditCardValidation
status of the corresponding order toFailure
and stores the updated order in theorders
map. Again, further actions are omitted here and the receipt of the reply is positively confirmed.
validationRequestChannel
delivers messages at-least-once, we need to detect duplicates in order to make reply processing idempotent. Here, we simply require that the order object to be updated must have a Pending
creditCardValidation
status before changing state (and notifying others). If the order's status is not Pending
, the order has already been updated by a previous reply and the current reply is a duplicate. In this case, the methods onValidationSuccess
and onValidationFailure
don't have any effect (orders.get(orderId).filter(_.creditCardValidation == Pending)
is None
). The receipt of the duplicate is still positively confirmed. More general guidelines how to detect duplicates are outlined here.
- A
DestinationNotResponding
reply is always confirmed negatively (confirm(false)
) so that the channel is going redeliver the validation request to theCreditCardValidator
. This may help to overcome temporary network problems, for example, but doesn't handle the case where the maximum number of redeliveries has been reached (see below). - A
DestinationFailure
reply will be negatively confirmed by default unless it has been delivered more than twice. This may help to overcome temporaryCreditCardValidator
failures i.e. cases where aStatus.Failure
is returned by the validator.
CreditCardValidator
be unavailable for a longer time and the validationRequestChannel
reaches the maximum number of redeliveries, it will stop message delivery and publishes a DeliveryStopped
event to the actor system's event stream. The channel still continues to accept new event messages and persists them so that the OrderProcessor
can continue receiving OrderSubmitted
events but the interaction with the CreditCardValidator
is suspended. It is now up to the application to re-activate message delivery.
Subscribing to
DeliveryStopped
events allows an application to escalate a persisting network problem or CreditCardValidator
outage by alerting a system administrator or switching to another credit card validation service, for example. In our case, a simple re-activation of the validationRequestChannel
is scheduled.
The
OrderProcessor
subscribes itself to the actor system's event stream. On receiving a DeliveryStopped
event it schedules a re-activation of the validationRequestChannel
by sending it a Deliver
message.
This finally meets all the requirements stated above but there's a lot more to say about external service integration. Examples are external updates or usage of channels that don't preserve message order for optimizing concurrency and throughput. I also didn't cover processor-specific, non-blocking recovery as implemented by the example application. This is enough food for another blog post.
Thursday, February 23, 2012
Using JAXB for XML and JSON APIs in Scala Web Applications
In the past, I already mentioned several times the implementation of RESTful XML and JSON APIs in Scala web applications using JAXB, without going into details. In this blog post I want to shed more light on this approach together with some links to more advanced examples. A JAXB-based approach to web APIs can be useful if you want to support both XML and JSON representations but only want to maintain a single binding definition for both representations. I should also say that I'm still investigating this approach, so see the following as rather experimental.
First of all, JAXB is a Java standard for binding XML schemas to Java classes. It allows you to convert Java objects to XML documents, and vice versa, based on JAXB annotations on the corresponding Java classes. JAXB doesn't cover JSON but there are libraries that allow you to convert Java objects to JSON (and vice versa) based on the very same JAXB annotations that are used for defining XML bindings. One such library is Jersey's JSON library (jersey-json) which internally uses the Jackson library.
As you'll see in the following, JAXB can also be used together with immutable domain or resource models based on Scala case classes. There's no need to pollute them with getters and setters or Java collections from the
We can implement the above requirements by defining and annotating the
Let's dissect the above code a bit:
We're now ready to use the
The result is:
Marshalling a
which prints
Unmarshalling creates a
We have implemented
which prints the following to stdout:
Unmarshalling can be done with the
An implicit
If the unmarshalling fails or the request
If the body parser should be chosen at runtime depending on the
whereas
renders a JSON response from a
Jaxb requires an implicit
First of all, JAXB is a Java standard for binding XML schemas to Java classes. It allows you to convert Java objects to XML documents, and vice versa, based on JAXB annotations on the corresponding Java classes. JAXB doesn't cover JSON but there are libraries that allow you to convert Java objects to JSON (and vice versa) based on the very same JAXB annotations that are used for defining XML bindings. One such library is Jersey's JSON library (jersey-json) which internally uses the Jackson library.
As you'll see in the following, JAXB can also be used together with immutable domain or resource models based on Scala case classes. There's no need to pollute them with getters and setters or Java collections from the
java.util
package. Necessary conversions from Scala collections or other type constructors (such as Option
, for example) to Java types supported by JAXB can be defined externally to the annotated model (and reused). At the end of this blog post, I'll also show some examples how to develop JAXB-based XML and JSON APIs with the Play Framework.Model
In the following, I'll use a model that consists of the singlecase class Person(fullname: String, username: Option[String], age: Int)
. To define Person
as root element in the XML schema, the following JAXB annotations should be added.@XmlRootElement
makes Person
a root element in the XML schema and @XmlAccessorType(XmlAccessType.FIELD)
instructs JAXB to access fields directly instead of using getters and setters. But before we can use the Person
class with JAXB a few additional things need to be done.- A no-arg constructor or a static no-arg factory method must be provided, otherwise, JAXB doesn't know how to create
Person
instances. In our example we'll use a no-arg constructor. - A person's
fullname
should be mandatory in the corresponding XML schema. This can be achieved by placing an@XmlElement(required=true)
annotation on the field corresponding to thefullname
parameter. - A person's
username
should be an optionalString
in the corresponding XML schema i.e. theusername
element of the complexPerson
type should have an XML attributeminOccurs="0"
. Furthermore, it should be avoided thatscala.Option
appears as complex type in the XML schema. This can be achieved by providing a type adapter fromOption[String]
toString
via the JAXB@XmlJavaTypeAdapter
annotation.
We can implement the above requirements by defining and annotating the
Person
class as follows:Let's dissect the above code a bit:
- The no-arg constructor on the
Person
class is only needed by JAXB and should therefore be declared private so that it cannot be accessed elsewhere in the application code (unless you're using reflection like JAXB does). - Placing JAXB annotations on fields of a case class is a bit tricky. When writing a case class, usually only case class parameters are defined but not fields directly. The Scala compiler then generates the corresponding fields in the resulting .class file. Annotations that are placed on case class parameters are not copied to their corresponding fields, by default. To instruct the Scala compiler to copy these annotations, the Scala
@field
annotation must be used in addition. This is done in the custom annotation type definitionsxmlElement
andxmlTypeAdapter
. They can be used in the same way as their dependent annotation typesXmlElement
andXmlJavaTypeAdapter
, respectively. Placing the custom@xmlElement
annotation on thefullname
parameter will cause the Scala compiler to copy the dependent@XmlElement
annotation (a JAXB annotation) to the generatedfullname
field where it can be finally processed by JAXB. - To convert between
Option[String]
(on Scala side) andString
(used by JAXB on XML schema side) we implement a JAXB type adapter (interfaceXmlAdapter
). The above example defines a genericOptionAdapter
(that can also be reused elsewhere) and a concreteStringOptionAdapter
used for the optionalusername
parameter. Please note that annotating theusername
parameter with@xmlTypeAdapter(classOf[OptionAdapter[String]])
is not sufficient because JAXB will not be able to inferString
as the target type (JAXB uses reflection) and will useObject
instead (resulting in an XMLanyType
in the corresponding XML schema). Type adapters can also be used to convert between Scala and Java collection types. Since JAXB can only handle Java collection types you'll need to use type adapters should you want to use Scala collection types in your case classes (and you really should). You can find an example here.
We're now ready to use the
Person
class to generate an XML schema and to convert Person
objects to and from XML or JSON. Please note that the following code examples require JAXB version 2.2.4u2 or higher, otherwise the OptionAdapter
won't work properly. The reason is JAXB issue 415. Either use JDK 7u4 or higher which already includes this version or install the required JAXB version manually. The following will write an XML schema, generated from the Person
class, to stdout:The result is:
Marshalling a
Person
object to XML can be done withwhich prints
Unmarshalling creates a
Person
object from XML.We have implemented
StringOptionAdapter
in a way that an empty <username/>
element or <username></username>
in personXml1
would also yield None
on Scala side. Creating JSON from Person
objects can be done with the JSONJAXBContext
from Jersey's JSON library.which prints the following to stdout:
Unmarshalling can be done with the
context.createJSONUnmarshaller.unmarshalFromJSON
method. The JSONConfiguration
object provides a number of configuration options that determine how JSON is rendered and parsed. Refer to the official documentation for details.Play and JAXB
This section shows some examples how to develop JAXB-based XML and JSON APIs with the Play Framework 2.0. It is based on JAXB-specific body parsers and type class instances defined in traitJaxbSupport
which is part of the event-sourcing example application (Play-specific work is currently done on the play branch). You can reuse this trait in other applications as is, there are no dependencies to the rest of the project (update: except to SysError
). To enable JAXB-based XML and JSON processing for a Play web application, add JaxbSupport
to a controller object as follows:An implicit
JSONJAXBContext
must be in scope for both XML and JSON processing. For XML processing alone, it is sufficient to have an implicit JAXBContext.XML and JSON Parsing
JaxbSupport
provides Play-specific body parsers that convert XML or JSON request body to instances of JAXB-annotated classes. The following action uses a JAXB body parser that expect an XML body and tries to convert it to a Person
instance (using a JAXB unmarshaller). If the unmarshalling fails or the request
Content-Type
is other than text/xml
or application/xml
, a 400
status code (bad request) is returned. Converting a JSON body to a Person
instance can be done with the jaxb.parse.json
body parser.If the body parser should be chosen at runtime depending on the
Content-Type
header, use the dynamic jaxb.parse
body parser. The following action is able to process both XML and JSON bodies and convert them to a Person
instance.JaxbSupport
also implements the following related body parsers jaxb.parse.xml(maxLength: Int)
andjaxb.parse.json(maxLength: Int)
jaxb.parse(maxLength: Int)
andjaxb.parse(maxLength: Int)
jaxb.parse.tolerantXml
andjaxb.parse.tolerantJson
jaxb.parse.tolerantXml(maxLength: Int)
andjaxb.parse.tolerantJson(maxLength: Int)
XML and JSON Rendering
For rendering XML and JSON, JaxbSupport provides the wrapper classesJaxbXml
, JaxbJson
and Jaxb
. The following action renders an XML response from a Person
object (using a JAXB marshaller):whereas
renders a JSON response from a
Person
object. If you want to do content negotiation based on the Accept
request header, use the Jaxb
wrapper.Jaxb requires an implicit
request
in context for obtaining the Accept
request header. If the Accept
MIME type is application/xml
or text/xml
an XML representation is returned, if it is application/json
a JSON representation is returned. Further JaxbSupport
application examples can be found here.
Monday, February 28, 2011
Akka Producer Actors: New Features and Best Practices
In a previous post I wrote about new features and best practices for Akka consumer actors. In this post, I'll cover Akka producer actors. For the following examples to compile and run, you'll need the current Akka 1.1-SNAPSHOT.
Again, I assume that you already have a basic familiarity with Akka, Apache Camel and the akka-camel integration module. If you are new to it, you may want to read the Akka and Camel chapter (free pdf) of the Camel in Action book or the Introduction section of the official akka-camel documentation first.
Akka producer actors can send messages to any Camel endpoint, provided that the corresponding Camel component is on the classpath. This allows Akka actors to interact with external systems or other components over a large number of protocols and APIs.
Let's start with a simple producer actor that sends all messages it receives to an external HTTP service and returns the response to the initial sender. For sending messages over HTTP we can use the Camel jetty component which features an asynchronous HTTP client.
Concrete producer actors inherit a default implementation of
The latter approach is recommended when an application uses only producer actors but no consumer actors. This slightly reduces the overhead when starting actors. After starting the producer actor, clients can interact with the HTTP service via the actor API.
kra
Here,
In this case the
By default, a producer actor initiates an in-out message exchange with its Camel endpoint i.e. it expects a response from it. If a producer actor wants to initiate an in-only message exchange then it must override the
This actor adds any message it receives to the
The
Concrete producer actors can override this method. For example, the following producer actor overrides
The result of the message exchange with the JMS endpoint is ignored (
Messages exchanges with a Camel endpoint can fail. In this case,
In addition to a failure cause, a
Failed message exchanges never cause the producer actor to throw an exception during execution of
In this case failure handling should be done in combination with a supervisor (see below).
Let's look at another example. What if we want
to throw an exception on failure (instead of returning a
Another option to deal with message exchange results inside
With producer actors that forward message exchange results to other actors (incl. other producer actors) one can build actor-based message processing pipelines that integrate external systems. In combination with consumer actors, this could be extended towards a scalable and distributed enterprise service bus (ESB) based on Akka actors ... but this is a topic for another blog post.
The Producer trait also supports correlation identifiers. This allows clients to correlate request messages with asynchronous response messages. A correlation identifier is a message header that can be set by clients. The following example uses the correlation identifier (or message exchange identifier)
An asynchronous response (
A failed message exchange by default does not cause a producer actor to throw an exception. However, concrete producer actors may decide to throw an exception inside
The following example shows how to implement a producer actor that replies to the initial sender with a
To handle restart callbacks, producer actors must override the
Again, I assume that you already have a basic familiarity with Akka, Apache Camel and the akka-camel integration module. If you are new to it, you may want to read the Akka and Camel chapter (free pdf) of the Camel in Action book or the Introduction section of the official akka-camel documentation first.
Basic usage
Akka producer actors can send messages to any Camel endpoint, provided that the corresponding Camel component is on the classpath. This allows Akka actors to interact with external systems or other components over a large number of protocols and APIs.
Let's start with a simple producer actor that sends all messages it receives to an external HTTP service and returns the response to the initial sender. For sending messages over HTTP we can use the Camel jetty component which features an asynchronous HTTP client.
Concrete producer actors inherit a default implementation of
Actor.receive
from the Producer
trait. For simple use cases, only an endpoint URI must be defined. Producer actors also require a started CamelContextManager
for working properly. A CamelContextManager
is started when an application starts a CamelService
e.g. via CamelServiceManager.startCamelService
or when starting the CamelContextManager
directly viaThe latter approach is recommended when an application uses only producer actors but no consumer actors. This slightly reduces the overhead when starting actors. After starting the producer actor, clients can interact with the HTTP service via the actor API.
kra
Here,
!!
is used for sending the message and waiting for a response. Alternatively, one can also use !
together with an implicit sender reference. In this case the
sender
will receive an asynchronous reply from the producer actor. Before, the producer actor itself receives an asynchronous reply from the jetty endpoint. The asynchronous jetty endpoint doesn't block a thread waiting for a response and the producer actor doesn't do that either. This is important from a scalability perspective, especially for longer-running request-response cycles.By default, a producer actor initiates an in-out message exchange with its Camel endpoint i.e. it expects a response from it. If a producer actor wants to initiate an in-only message exchange then it must override the
oneway
method to return true
. The following example shows a producer actor that initiates an in-only message exchange with a JMS endpoint.This actor adds any message it receives to the
test
JMS queue. By default, producer actors that are configured with oneway = true
don't reply. This behavior is defined in the Producer.receiveAfterProduce
method which is implemented as follows.The
receiveAfterProduce
method has the same signature as Actor.receive
and is called with the result of the message exchange with the endpoint (please note that in-only message exchanges with Camel endpoints have a result as well). The result type for successful message exchanges is Message
, for failed message exchanges it is Failure
(see below). Concrete producer actors can override this method. For example, the following producer actor overrides
onReceiveAfterProduce
to reply with a constant "done"
message. The result of the message exchange with the JMS endpoint is ignored (
case _
).Failures
Messages exchanges with a Camel endpoint can fail. In this case,
onReceiveAfterProduce
is called with a Failure message containing the cause of the failure (a Throwable
). Let's extend the HttpProducer
usage example to deal with failure responses.In addition to a failure cause, a
Failure
message can also contain endpoint-specific headers with failure details such as the HTTP response code, for example. When using !
instead of !!
, together with an implicit sender reference (as shown in the previous section), that sender will then receive the Failure
message asynchronously. The JmsReplyingProducer
example can also be extended to return more meaningful responses: a "done"
message only on success and an error message on failure.Failed message exchanges never cause the producer actor to throw an exception during execution of
receive
. Should Producer
implementations want to throw an exception on failure (for whatever reason) they can do so in onReceiveAfterProduce
.In this case failure handling should be done in combination with a supervisor (see below).
Let's look at another example. What if we want
to throw an exception on failure (instead of returning a
Failure
message) but to respond with a normal Message
on success? In this case, we need to use self.senderFuture
inside onReceiveAfterProduce
and complete it with an exception. Forwarding results
Another option to deal with message exchange results inside
onReceiveAfterProduce
is to forward them to another actor. Forwarding a message also forwards the initial sender reference. This allows the receiving actor to reply to the initial sender. With producer actors that forward message exchange results to other actors (incl. other producer actors) one can build actor-based message processing pipelines that integrate external systems. In combination with consumer actors, this could be extended towards a scalable and distributed enterprise service bus (ESB) based on Akka actors ... but this is a topic for another blog post.
Correlation identifiers
The Producer trait also supports correlation identifiers. This allows clients to correlate request messages with asynchronous response messages. A correlation identifier is a message header that can be set by clients. The following example uses the correlation identifier (or message exchange identifier)
123
.An asynchronous response (
Message
or Failure
) from httpProducer
will contain that correlation identifier as well.Fault-tolerance
A failed message exchange by default does not cause a producer actor to throw an exception. However, concrete producer actors may decide to throw an exception inside
onReceiveAfterProduce
, for example, or there can be a system-level Camel problem that causes a runtime exception. An application that wants to handle these exceptions should supervise its producer actors. The following example shows how to implement a producer actor that replies to the initial sender with a
Failure
message when it is restarted or stopped by a supervisor.To handle restart callbacks, producer actors must override the
preRestartProducer
method instead of preRestart
. The preRestart
method is implemented by the Producer
trait and does additional resource de-allocation work after calling preRestartProducer
. More information about replies within preRestart
and postStop
can be found in my previous blog post about consumer actors.
Thursday, February 17, 2011
Akka Consumer Actors: New Features and Best Practices
In this blog post I want to give some guidance how to implement consumer actors with the akka-camel module. Besides basic usage scenarios, I will also explain how to make consumer actors fault-tolerant, redeliver messages on failure, deal with bounded mailboxes etc. The code examples shown below require the current Akka 1.1-SNAPSHOT to compile and run.
In the following, I assume that you already have a basic familiarity with Akka, Apache Camel and the akka-camel integration module. If you are new to it, you may want to read the Akka and Camel chapter (free pdf) of the Camel in Action book or the Introduction section of the official akka-camel documentation first.
Camel endpoints either initiate in-only (one-way) message exchanges with consumer actors or in-out (two-way) message exchanges. Replies from consumer actors are mandatory for in-out message exchanges but optional for in-only message exchanges. For replying to a Camel endpoint, the consumer actor uses the very same interface as for replying to any other sender (e.g. to another actor). Examples are
Let's start by defining a simple consumer actor that accepts messages via tcp on port 6200 and replies to the tcp client (tcp support is given by Camel's mina component).
For consumer actors to work, applications need to start a
When starting a consumer actor, the endpoint defined for that actor will be activated asynchronously by the
For sending a test message to the consumer actor, the above code uses a Camel
If Camel endpoints, such as the file endpoint, create in-only message exchanges then consumer actors need not reply, by default. The message exchange is completed once the input message has been added to the consumer actor's mailbox.
When placing a file into the
If you want to let the consumer actor decide when the file should be deleted, then you'll need to turn auto-acknowledgements off as shown in the following example (
Turning auto-acknowledgements on and off is only relevant for in-only message exchanges because, for in-out message exchanges, consumer actors need to reply in any case with an (application-specific) message. Consumer actors may also reply with a
The above file consumer overrides the preRestart and postStop callback methods to send reply messages to Camel. A reply within preRestart and postStop is possible after receive has thrown an exception (new feature since Akka 1.1). When receive returns normally it is expected that any necessary reply has already been done within receive.
Another frequently discussed consumer actor example is a fault-tolerant JMS consumer. A JMS consumer actor should acknowledge a message receipt upon successful message processing and trigger a message redelivery on failure. This is exactly the same pattern as described for theSimplifications and tradeoffs with
In all the examples so far the internally created Camel routes use the
If you want that exceptions thrown by receive are reported back to Camel directly (i.e. without sending
Here's an example of a consumer actor that uses the simplified approach to error handling. Any exception thrown by receive will still cause the file endpoint to redeliver the message but a thread will be blocked by Camel during the execution of receive.
No supervisor is needed here. It depends on the non-functional requirements of your application whether to go for this simple but blocking approach or to use a more scalable, non-blocking approach in combination with a supervisor.
When, let's say, 10 files are put into the
When dealing with endpoints that do not support redelivery, one needs to customize the Camel route to the consumer actor with a special error handler that does the redelivery. This is shown for a consumer actor that consumes messages from a direct endpoint.
Here we use
The examples presented in this post cover many of the consumer-actor-related questions and topics that have been asked and discussed on the akka-user mailing list. In another post I plan to cover best practices for implementing Akka producer actors.
In the following, I assume that you already have a basic familiarity with Akka, Apache Camel and the akka-camel integration module. If you are new to it, you may want to read the Akka and Camel chapter (free pdf) of the Camel in Action book or the Introduction section of the official akka-camel documentation first.
Basic usage
Akka consumer actors can receive messages from any Camel endpoint, provided that the corresponding Camel component is on the classpath. This allows clients to interact with Akka actors over a large number of protocols and APIs.Camel endpoints either initiate in-only (one-way) message exchanges with consumer actors or in-out (two-way) message exchanges. Replies from consumer actors are mandatory for in-out message exchanges but optional for in-only message exchanges. For replying to a Camel endpoint, the consumer actor uses the very same interface as for replying to any other sender (e.g. to another actor). Examples are
self.reply
or self.reply_?
.Let's start by defining a simple consumer actor that accepts messages via tcp on port 6200 and replies to the tcp client (tcp support is given by Camel's mina component).
For consumer actors to work, applications need to start a
CamelService
which is managed by the CamelServiceManager
.When starting a consumer actor, the endpoint defined for that actor will be activated asynchronously by the
CamelService
. If your application wants to wait for consumer endpoints to be finally activated you can do so with the awaitEndpointActivation
method (which is especially useful for testing).For sending a test message to the consumer actor, the above code uses a Camel
ProducerTemplate
which can be obtained from the CamelContextManager
.If Camel endpoints, such as the file endpoint, create in-only message exchanges then consumer actors need not reply, by default. The message exchange is completed once the input message has been added to the consumer actor's mailbox.
When placing a file into the
data/input
directory, the Camel file endpoint will pick up that file and send its content as message to the consumer actor. Once the message is in the actor's mailbox, the file endpoint will delete the corresponding file (see delete=true
in the endpoint URI).If you want to let the consumer actor decide when the file should be deleted, then you'll need to turn auto-acknowledgements off as shown in the following example (
autoack = false
). In this case the consumer actor must reply with a special Ack
message when message processing is done. This asynchronous reply finally causes the file endpoint to delete the consumed file.Turning auto-acknowledgements on and off is only relevant for in-only message exchanges because, for in-out message exchanges, consumer actors need to reply in any case with an (application-specific) message. Consumer actors may also reply with a
Failure
message to indicate a processing failure. Failure
replies can be made for both in-only and in-out message exchanges. A Failure
reply can be done inside receive
method but there are better ways as shown in the next sections.Fault-tolerance and message redelivery
Message processing inside receive may throw exceptions which usually requires a failure response to Camel (i.e. to the consumer endpoint). This is done with aFailure
message that contains the failure reason (an instance of Throwable
). Instead of catching and handling the exception inside receive
, consumer actors should be part of supervisor hierarchies and send failure responses from within restart callback methods. Here's an example of a fault-tolerant file consumer.The above file consumer overrides the preRestart and postStop callback methods to send reply messages to Camel. A reply within preRestart and postStop is possible after receive has thrown an exception (new feature since Akka 1.1). When receive returns normally it is expected that any necessary reply has already been done within receive.
- If the lifecycle of the
SupervisedFileConsumer
is configured to bePERMANENT
, a supervisor will restart the consumer upon failure with a call topreRestart
. WithinpreRestart
aFailure
reply is sent which causes the file endpoint to redeliver the content of the consumed file and the consumer actor can try to process it again. Should the processing succeed in a second attempt, anAck
is sent withinreceive
. A reply withinpreRestart
must be a safe reply viaself.reply_?
because an unsafeself.reply
will throw an exception when the consumer is restarted without having failed. This can be the case in context of all-for-one restart strategies. - If the lifecycle of the
SupervisedFileConsumer
is configured to beTEMPORARY
, a supervisor will shut down the consumer upon failure with a call topostStop
. WithinpostStop
anAck
is sent which causes the file endpoint to delete the file. One can, of course, choose to reply with aFailure
here, so that files that couldn't be processed successfully are kept in the input directory. A reply withinpostStop
must be a safe reply viaself.reply_?
because an unsafeself.reply
will throw an exception when the consumer has been stopped by the application (and not by a supervisor) after successful execution ofreceive
.
Another frequently discussed consumer actor example is a fault-tolerant JMS consumer. A JMS consumer actor should acknowledge a message receipt upon successful message processing and trigger a message redelivery on failure. This is exactly the same pattern as described for the
SupervisedFileConsumer
above. You just need to change the file endpoint URI to a jms or activemq endpoint URI and you're done (of course, you additionally need to configure the JMS connection with a redelivery policy and, optionally, use transacted queues. An explanation how to do this would however exceed the scope of this blog post).Simplifications and tradeoffs with blocking=true
In all the examples so far the internally created Camel routes use the !
(bang) operator to send the input message to the consumer actor. This means that the Camel route does not block a thread waiting for a response. It's an asynchronous reply will cause the Camel route to resume processing. That's also the reason why any exception thrown by receive isn't reported back to Camel directly but must be done explicitly with a Failure
response.If you want that exceptions thrown by receive are reported back to Camel directly (i.e. without sending
Failure
responses) then you'll need to set blocking = true
for the consumer actor. This causes the Camel route to send the input message with the !!
(bangbang) operator and to wait for a response. However, this will block a thread until the consumer sends a response or throws an exception within receive
. The advantage of this approach is that error handling is strongly simplified in this case but scalability will likely decrease.Here's an example of a consumer actor that uses the simplified approach to error handling. Any exception thrown by receive will still cause the file endpoint to redeliver the message but a thread will be blocked by Camel during the execution of receive.
No supervisor is needed here. It depends on the non-functional requirements of your application whether to go for this simple but blocking approach or to use a more scalable, non-blocking approach in combination with a supervisor.
Bounded mailboxes and error handling with custom Camel routes
For consumer actors that require a significant amount of time for processing a single message, it can make sense to install a bounded mailbox. A bounded mailbox throws an exception if its capacity is reached and the Camel route tries to add additional messages to the mailbox. Here's an example of a file consumer actor that uses a bounded mailbox with a capacity of 5. Processing is artificially delayed by 1 second using aThread.sleep
.When, let's say, 10 files are put into the
data/input
directory, they will be picked up by the file endpoint and added to the actor's mailbox. The capacity of the mailbox will be reached soon because the file endpoint can send messages much faster than the consumer actor can process it. Exceptions thrown by the mailbox are directly reported to the Camel route which causes the file consumer to redeliver messages until they can be added to the mailbox. The same applies to JMS and other endpoints that support redelivery.When dealing with endpoints that do not support redelivery, one needs to customize the Camel route to the consumer actor with a special error handler that does the redelivery. This is shown for a consumer actor that consumes messages from a direct endpoint.
Here we use
onRouteDefinition
to define how the Camel route should be customized during its creation. In this example, an error handler is defined that attempts max. 3 redeliveries with a delay of 1000 ms. For details refer to the intercepting route construction section in the akka-camel documentation. When using a producer template to send messages to this endpoint, some of them will be added to the mailbox on first attempt, some of them after a second attempt triggered by the error handler.The examples presented in this post cover many of the consumer-actor-related questions and topics that have been asked and discussed on the akka-user mailing list. In another post I plan to cover best practices for implementing Akka producer actors.
Subscribe to:
Posts (Atom)