Monday, February 28, 2011

Akka Producer Actors: New Features and Best Practices

In a previous post I wrote about new features and best practices for Akka consumer actors. In this post, I'll cover Akka producer actors. For the following examples to compile and run, you'll need the current Akka 1.1-SNAPSHOT.

Again, I assume that you already have a basic familiarity with Akka, Apache Camel and the akka-camel integration module. If you are new to it, you may want to read the Akka and Camel chapter (free pdf) of the Camel in Action book or the Introduction section of the official akka-camel documentation first.

Basic usage


Akka producer actors can send messages to any Camel endpoint, provided that the corresponding Camel component is on the classpath. This allows Akka actors to interact with external systems or other components over a large number of protocols and APIs.

Let's start with a simple producer actor that sends all messages it receives to an external HTTP service and returns the response to the initial sender. For sending messages over HTTP we can use the Camel jetty component which features an asynchronous HTTP client.


Concrete producer actors inherit a default implementation of Actor.receive from the Producer trait. For simple use cases, only an endpoint URI must be defined. Producer actors also require a started CamelContextManager for working properly. A CamelContextManager is started when an application starts a CamelService e.g. via CamelServiceManager.startCamelService or when starting the CamelContextManager directly via


The latter approach is recommended when an application uses only producer actors but no consumer actors. This slightly reduces the overhead when starting actors. After starting the producer actor, clients can interact with the HTTP service via the actor API.
kra

Here, !! is used for sending the message and waiting for a response. Alternatively, one can also use ! together with an implicit sender reference.


In this case the sender will receive an asynchronous reply from the producer actor. Before, the producer actor itself receives an asynchronous reply from the jetty endpoint. The asynchronous jetty endpoint doesn't block a thread waiting for a response and the producer actor doesn't do that either. This is important from a scalability perspective, especially for longer-running request-response cycles.

By default, a producer actor initiates an in-out message exchange with its Camel endpoint i.e. it expects a response from it. If a producer actor wants to initiate an in-only message exchange then it must override the oneway method to return true. The following example shows a producer actor that initiates an in-only message exchange with a JMS endpoint.


This actor adds any message it receives to the test JMS queue. By default, producer actors that are configured with oneway = true don't reply. This behavior is defined in the Producer.receiveAfterProduce method which is implemented as follows.


The receiveAfterProduce method has the same signature as Actor.receive and is called with the result of the message exchange with the endpoint (please note that in-only message exchanges with Camel endpoints have a result as well). The result type for successful message exchanges is Message, for failed message exchanges it is Failure (see below).

Concrete producer actors can override this method. For example, the following producer actor overrides onReceiveAfterProduce to reply with a constant "done" message.


The result of the message exchange with the JMS endpoint is ignored (case _).

Failures


Messages exchanges with a Camel endpoint can fail. In this case, onReceiveAfterProduce is called with a Failure message containing the cause of the failure (a Throwable). Let's extend the HttpProducer usage example to deal with failure responses.


In addition to a failure cause, a Failure message can also contain endpoint-specific headers with failure details such as the HTTP response code, for example. When using ! instead of !!, together with an implicit sender reference (as shown in the previous section), that sender will then receive the Failure message asynchronously. The JmsReplyingProducer example can also be extended to return more meaningful responses: a "done" message only on success and an error message on failure.


Failed message exchanges never cause the producer actor to throw an exception during execution of receive. Should Producer implementations want to throw an exception on failure (for whatever reason) they can do so in onReceiveAfterProduce.


In this case failure handling should be done in combination with a supervisor (see below).

Let's look at another example. What if we want


to throw an exception on failure (instead of returning a Failure message) but to respond with a normal Message on success? In this case, we need to use self.senderFuture inside onReceiveAfterProduce and complete it with an exception.



Forwarding results


Another option to deal with message exchange results inside onReceiveAfterProduce is to forward them to another actor. Forwarding a message also forwards the initial sender reference. This allows the receiving actor to reply to the initial sender.


With producer actors that forward message exchange results to other actors (incl. other producer actors) one can build actor-based message processing pipelines that integrate external systems. In combination with consumer actors, this could be extended towards a scalable and distributed enterprise service bus (ESB) based on Akka actors ... but this is a topic for another blog post.

Correlation identifiers


The Producer trait also supports correlation identifiers. This allows clients to correlate request messages with asynchronous response messages. A correlation identifier is a message header that can be set by clients. The following example uses the correlation identifier (or message exchange identifier) 123.


An asynchronous response (Message or Failure) from httpProducer will contain that correlation identifier as well.

Fault-tolerance


A failed message exchange by default does not cause a producer actor to throw an exception. However, concrete producer actors may decide to throw an exception inside onReceiveAfterProduce, for example, or there can be a system-level Camel problem that causes a runtime exception. An application that wants to handle these exceptions should supervise its producer actors.

The following example shows how to implement a producer actor that replies to the initial sender with a Failure message when it is restarted or stopped by a supervisor.


To handle restart callbacks, producer actors must override the preRestartProducer method instead of preRestart. The preRestart method is implemented by the Producer trait and does additional resource de-allocation work after calling preRestartProducer. More information about replies within preRestart and postStop can be found in my previous blog post about consumer actors.

7 comments:

  1. Could you give a java example of your akka-camel consumer and producer please if possible.

    Regards

    ReplyDelete
  2. @Rajan. Java examples around consumer and producer actors can be found in the akka-camel documentation. A Java example how to reply in failure cases can be found here.

    ReplyDelete
  3. Martin, will the examples remain consistent for AKKA 2 ? If not, do you plan to revisit these examples?

    Thanks, great work, great help.

    ReplyDelete
    Replies
    1. No there will be differences in 2.x. Once akka-camel 2.x is released, the examples should be updated.

      Cheers,
      Martin

      Delete
  4. Martin:

    1. Great job in integrating two powerful frameworks available today camel and akka. Will look forward for version 2.x.

    2. Inline with the given example where a message is subscribed from jms queue/topic by an Akka Actor, can you shed more light on best practice to implement further steps. For example, a typical application which subscribes a message from jms will do the following but not limited to parsing,business logic,persistence to rdmbs and forwarding the message to some more systems.

    I am particulary interested in persistence and publishing part, if not JTA how do we handle persistence to rdbms and publishing using JMS to be managed ? if we do DBPersistenceActor that does persistence part and sends message to JMSProducerActor. How can we handle exceptional situations? Do you get what i am looking for? any suggestions will be help.

    Regards,
    Surya.

    ReplyDelete
    Replies
    1. What precisely are you trying to achieve and what specific problems do you have? Could you share some code (e.g. on github) so that we can discuss this based on a concrete example?

      Delete
    2. This comment has been removed by the author.

      Delete