Again, I assume that you already have a basic familiarity with Akka, Apache Camel and the akka-camel integration module. If you are new to it, you may want to read the Akka and Camel chapter (free pdf) of the Camel in Action book or the Introduction section of the official akka-camel documentation first.
Basic usage
Akka producer actors can send messages to any Camel endpoint, provided that the corresponding Camel component is on the classpath. This allows Akka actors to interact with external systems or other components over a large number of protocols and APIs.
Let's start with a simple producer actor that sends all messages it receives to an external HTTP service and returns the response to the initial sender. For sending messages over HTTP we can use the Camel jetty component which features an asynchronous HTTP client.
Concrete producer actors inherit a default implementation of
Actor.receive
from the Producer
trait. For simple use cases, only an endpoint URI must be defined. Producer actors also require a started CamelContextManager
for working properly. A CamelContextManager
is started when an application starts a CamelService
e.g. via CamelServiceManager.startCamelService
or when starting the CamelContextManager
directly viaThe latter approach is recommended when an application uses only producer actors but no consumer actors. This slightly reduces the overhead when starting actors. After starting the producer actor, clients can interact with the HTTP service via the actor API.
kra
Here,
!!
is used for sending the message and waiting for a response. Alternatively, one can also use !
together with an implicit sender reference. In this case the
sender
will receive an asynchronous reply from the producer actor. Before, the producer actor itself receives an asynchronous reply from the jetty endpoint. The asynchronous jetty endpoint doesn't block a thread waiting for a response and the producer actor doesn't do that either. This is important from a scalability perspective, especially for longer-running request-response cycles.By default, a producer actor initiates an in-out message exchange with its Camel endpoint i.e. it expects a response from it. If a producer actor wants to initiate an in-only message exchange then it must override the
oneway
method to return true
. The following example shows a producer actor that initiates an in-only message exchange with a JMS endpoint.This actor adds any message it receives to the
test
JMS queue. By default, producer actors that are configured with oneway = true
don't reply. This behavior is defined in the Producer.receiveAfterProduce
method which is implemented as follows.The
receiveAfterProduce
method has the same signature as Actor.receive
and is called with the result of the message exchange with the endpoint (please note that in-only message exchanges with Camel endpoints have a result as well). The result type for successful message exchanges is Message
, for failed message exchanges it is Failure
(see below). Concrete producer actors can override this method. For example, the following producer actor overrides
onReceiveAfterProduce
to reply with a constant "done"
message. The result of the message exchange with the JMS endpoint is ignored (
case _
).Failures
Messages exchanges with a Camel endpoint can fail. In this case,
onReceiveAfterProduce
is called with a Failure message containing the cause of the failure (a Throwable
). Let's extend the HttpProducer
usage example to deal with failure responses.In addition to a failure cause, a
Failure
message can also contain endpoint-specific headers with failure details such as the HTTP response code, for example. When using !
instead of !!
, together with an implicit sender reference (as shown in the previous section), that sender will then receive the Failure
message asynchronously. The JmsReplyingProducer
example can also be extended to return more meaningful responses: a "done"
message only on success and an error message on failure.Failed message exchanges never cause the producer actor to throw an exception during execution of
receive
. Should Producer
implementations want to throw an exception on failure (for whatever reason) they can do so in onReceiveAfterProduce
.In this case failure handling should be done in combination with a supervisor (see below).
Let's look at another example. What if we want
to throw an exception on failure (instead of returning a
Failure
message) but to respond with a normal Message
on success? In this case, we need to use self.senderFuture
inside onReceiveAfterProduce
and complete it with an exception. Forwarding results
Another option to deal with message exchange results inside
onReceiveAfterProduce
is to forward them to another actor. Forwarding a message also forwards the initial sender reference. This allows the receiving actor to reply to the initial sender. With producer actors that forward message exchange results to other actors (incl. other producer actors) one can build actor-based message processing pipelines that integrate external systems. In combination with consumer actors, this could be extended towards a scalable and distributed enterprise service bus (ESB) based on Akka actors ... but this is a topic for another blog post.
Correlation identifiers
The Producer trait also supports correlation identifiers. This allows clients to correlate request messages with asynchronous response messages. A correlation identifier is a message header that can be set by clients. The following example uses the correlation identifier (or message exchange identifier)
123
.An asynchronous response (
Message
or Failure
) from httpProducer
will contain that correlation identifier as well.Fault-tolerance
A failed message exchange by default does not cause a producer actor to throw an exception. However, concrete producer actors may decide to throw an exception inside
onReceiveAfterProduce
, for example, or there can be a system-level Camel problem that causes a runtime exception. An application that wants to handle these exceptions should supervise its producer actors. The following example shows how to implement a producer actor that replies to the initial sender with a
Failure
message when it is restarted or stopped by a supervisor.To handle restart callbacks, producer actors must override the
preRestartProducer
method instead of preRestart
. The preRestart
method is implemented by the Producer
trait and does additional resource de-allocation work after calling preRestartProducer
. More information about replies within preRestart
and postStop
can be found in my previous blog post about consumer actors.
Could you give a java example of your akka-camel consumer and producer please if possible.
ReplyDeleteRegards
@Rajan. Java examples around consumer and producer actors can be found in the akka-camel documentation. A Java example how to reply in failure cases can be found here.
ReplyDeleteMartin, will the examples remain consistent for AKKA 2 ? If not, do you plan to revisit these examples?
ReplyDeleteThanks, great work, great help.
No there will be differences in 2.x. Once akka-camel 2.x is released, the examples should be updated.
DeleteCheers,
Martin
Martin:
ReplyDelete1. Great job in integrating two powerful frameworks available today camel and akka. Will look forward for version 2.x.
2. Inline with the given example where a message is subscribed from jms queue/topic by an Akka Actor, can you shed more light on best practice to implement further steps. For example, a typical application which subscribes a message from jms will do the following but not limited to parsing,business logic,persistence to rdmbs and forwarding the message to some more systems.
I am particulary interested in persistence and publishing part, if not JTA how do we handle persistence to rdbms and publishing using JMS to be managed ? if we do DBPersistenceActor that does persistence part and sends message to JMSProducerActor. How can we handle exceptional situations? Do you get what i am looking for? any suggestions will be help.
Regards,
Surya.
What precisely are you trying to achieve and what specific problems do you have? Could you share some code (e.g. on github) so that we can discuss this based on a concrete example?
DeleteThis comment has been removed by the author.
Delete