Thursday, February 17, 2011

Akka Consumer Actors: New Features and Best Practices

In this blog post I want to give some guidance how to implement consumer actors with the akka-camel module. Besides basic usage scenarios, I will also explain how to make consumer actors fault-tolerant, redeliver messages on failure, deal with bounded mailboxes etc. The code examples shown below require the current Akka 1.1-SNAPSHOT to compile and run.

In the following, I assume that you already have a basic familiarity with Akka, Apache Camel and the akka-camel integration module. If you are new to it, you may want to read the Akka and Camel chapter (free pdf) of the Camel in Action book or the Introduction section of the official akka-camel documentation first.

Basic usage

Akka consumer actors can receive messages from any Camel endpoint, provided that the corresponding Camel component is on the classpath. This allows clients to interact with Akka actors over a large number of protocols and APIs.

Camel endpoints either initiate in-only (one-way) message exchanges with consumer actors or in-out (two-way) message exchanges. Replies from consumer actors are mandatory for in-out message exchanges but optional for in-only message exchanges. For replying to a Camel endpoint, the consumer actor uses the very same interface as for replying to any other sender (e.g. to another actor). Examples are self.reply or self.reply_?.

Let's start by defining a simple consumer actor that accepts messages via tcp on port 6200 and replies to the tcp client (tcp support is given by Camel's mina component).

For consumer actors to work, applications need to start a CamelService which is managed by the CamelServiceManager.

When starting a consumer actor, the endpoint defined for that actor will be activated asynchronously by the CamelService. If your application wants to wait for consumer endpoints to be finally activated you can do so with the awaitEndpointActivation method (which is especially useful for testing).

For sending a test message to the consumer actor, the above code uses a Camel ProducerTemplate which can be obtained from the CamelContextManager.

If Camel endpoints, such as the file endpoint, create in-only message exchanges then consumer actors need not reply, by default. The message exchange is completed once the input message has been added to the consumer actor's mailbox.

When placing a file into the data/input directory, the Camel file endpoint will pick up that file and send its content as message to the consumer actor. Once the message is in the actor's mailbox, the file endpoint will delete the corresponding file (see delete=true in the endpoint URI).

If you want to let the consumer actor decide when the file should be deleted, then you'll need to turn auto-acknowledgements off as shown in the following example (autoack = false). In this case the consumer actor must reply with a special Ack message when message processing is done. This asynchronous reply finally causes the file endpoint to delete the consumed file.

Turning auto-acknowledgements on and off is only relevant for in-only message exchanges because, for in-out message exchanges, consumer actors need to reply in any case with an (application-specific) message. Consumer actors may also reply with a Failure message to indicate a processing failure. Failure replies can be made for both in-only and in-out message exchanges. A Failure reply can be done inside receive method but there are better ways as shown in the next sections.

Fault-tolerance and message redelivery

Message processing inside receive may throw exceptions which usually requires a failure response to Camel (i.e. to the consumer endpoint). This is done with a Failure message that contains the failure reason (an instance of Throwable). Instead of catching and handling the exception inside receive, consumer actors should be part of supervisor hierarchies and send failure responses from within restart callback methods. Here's an example of a fault-tolerant file consumer.

The above file consumer overrides the preRestart and postStop callback methods to send reply messages to Camel. A reply within preRestart and postStop is possible after receive has thrown an exception (new feature since Akka 1.1). When receive returns normally it is expected that any necessary reply has already been done within receive.
  • If the lifecycle of the SupervisedFileConsumer is configured to be PERMANENT, a supervisor will restart the consumer upon failure with a call to preRestart. Within preRestart a Failure reply is sent which causes the file endpoint to redeliver the content of the consumed file and the consumer actor can try to process it again. Should the processing succeed in a second attempt, an Ack is sent within receive. A reply within preRestart must be a safe reply via self.reply_? because an unsafe self.reply will throw an exception when the consumer is restarted without having failed. This can be the case in context of all-for-one restart strategies.
  • If the lifecycle of the SupervisedFileConsumer is configured to be TEMPORARY, a supervisor will shut down the consumer upon failure with a call to postStop. Within postStop an Ack is sent which causes the file endpoint to delete the file. One can, of course, choose to reply with a Failure here, so that files that couldn't be processed successfully are kept in the input directory. A reply within postStop must be a safe reply via self.reply_? because an unsafe self.reply will throw an exception when the consumer has been stopped by the application (and not by a supervisor) after successful execution of receive.

Another frequently discussed consumer actor example is a fault-tolerant JMS consumer. A JMS consumer actor should acknowledge a message receipt upon successful message processing and trigger a message redelivery on failure. This is exactly the same pattern as described for the SupervisedFileConsumer above. You just need to change the file endpoint URI to a jms or activemq endpoint URI and you're done (of course, you additionally need to configure the JMS connection with a redelivery policy and, optionally, use transacted queues. An explanation how to do this would however exceed the scope of this blog post).

Simplifications and tradeoffs with blocking=true

In all the examples so far the internally created Camel routes use the ! (bang) operator to send the input message to the consumer actor. This means that the Camel route does not block a thread waiting for a response. It's an asynchronous reply will cause the Camel route to resume processing. That's also the reason why any exception thrown by receive isn't reported back to Camel directly but must be done explicitly with a Failure response.

If you want that exceptions thrown by receive are reported back to Camel directly (i.e. without sending Failure responses) then you'll need to set blocking = true for the consumer actor. This causes the Camel route to send the input message with the !! (bangbang) operator and to wait for a response. However, this will block a thread until the consumer sends a response or throws an exception within receive. The advantage of this approach is that error handling is strongly simplified in this case but scalability will likely decrease.

Here's an example of a consumer actor that uses the simplified approach to error handling. Any exception thrown by receive will still cause the file endpoint to redeliver the message but a thread will be blocked by Camel during the execution of receive.

No supervisor is needed here. It depends on the non-functional requirements of your application whether to go for this simple but blocking approach or to use a more scalable, non-blocking approach in combination with a supervisor.

Bounded mailboxes and error handling with custom Camel routes

For consumer actors that require a significant amount of time for processing a single message, it can make sense to install a bounded mailbox. A bounded mailbox throws an exception if its capacity is reached and the Camel route tries to add additional messages to the mailbox. Here's an example of a file consumer actor that uses a bounded mailbox with a capacity of 5. Processing is artificially delayed by 1 second using a Thread.sleep.

When, let's say, 10 files are put into the data/input directory, they will be picked up by the file endpoint and added to the actor's mailbox. The capacity of the mailbox will be reached soon because the file endpoint can send messages much faster than the consumer actor can process it. Exceptions thrown by the mailbox are directly reported to the Camel route which causes the file consumer to redeliver messages until they can be added to the mailbox. The same applies to JMS and other endpoints that support redelivery.

When dealing with endpoints that do not support redelivery, one needs to customize the Camel route to the consumer actor with a special error handler that does the redelivery. This is shown for a consumer actor that consumes messages from a direct endpoint.

Here we use onRouteDefinition to define how the Camel route should be customized during its creation. In this example, an error handler is defined that attempts max. 3 redeliveries with a delay of 1000 ms. For details refer to the intercepting route construction section in the akka-camel documentation. When using a producer template to send messages to this endpoint, some of them will be added to the mailbox on first attempt, some of them after a second attempt triggered by the error handler.

The examples presented in this post cover many of the consumer-actor-related questions and topics that have been asked and discussed on the akka-user mailing list. In another post I plan to cover best practices for implementing Akka producer actors.

1 comment:

  1. autoAck = false is less than optional when using features like localWorkDirectory with FTP2; if you don't, the temporary file will be deleted in a race with the consumer actor's message handling, and the deletion will usually win before the file is opened.