Showing posts with label actor model. Show all posts
Showing posts with label actor model. Show all posts

Monday, August 30, 2010

Akka's grown-up hump

It's quite some time ago when I last wrote about Akka's Camel integration. An initial version of the akka-camel module was released with Akka 0.7. Meanwhile Akka 0.10 is out with an akka-camel module containing numerous new features and enhancements. Some of them will be briefly described in this blog post.

Java API

The akka-camel module now offers a Java API in addition to the Scala API. Both APIs are fully covered in the online documentation.

Support for typed consumer actors

Methods of typed actors can be published at Camel endpoints by annotating them with @consume. The annotation value defines the endpoint URI. Here's an example of a typed consumer actor in Java.
import org.apache.camel.Body;
import org.apache.camel.Header;
import se.scalablesolutions.akka.actor.TypedActor;
import se.scalablesolutions.akka.camel.consume;

public interface MyTypedConsumer {
@consume("file:data/foo")
public void foo(String body);

@consume("jetty:http://localhost:8877/camel/bar")
public String bar(@Body String body, @Header("Content-Type") String contentType);
}

public class MyTypedConsumerImpl extends TypedActor implements MyTypedConsumer {
public void foo(String body) {
System.out.println(String.format("Received message: ", body));
}

public String bar(String body, String contentType) {
return String.format("body=%s Content-Type header=%s", body, conctentType);
}
}
When creating an instance of the typed actor with
import se.scalablesolutions.akka.actor.TypedActor;

// Create typed actor and activate endpoints
MyTypedConsumer consumer = TypedActor.newInstance(
MyTypedConsumer.class, MyTypedConumerImpl.class);
then the actor's foo method can be invoked by dropping a file into the data/foo directory. The file content is passed via the body parameter. The bar method can be invoked by POSTing a message to http://localhost:8877/camel/bar. The HTTP message body is passed via the body parameter and the Content-Type header via the contentType parameter. For parameter binding, Camel's parameter binding annotations are used.

Endpoint lifecycle

Consumer actor endpoints are activated when the actor is started and de-activated when the actor is stopped. This is the case for both typed and untyped actors. An actor can either be stopped explicitly by an application or by a supervisor.

Fault tolerance

When a consumer actor isn't stopped but restarted by a supervisor, the actor's endpoint remains active. Communication partners can continue to exchange messages with the endpoint during the restart phase but message processing will occur only after restart completes. For in-out message exchanges, response times may therefore increase. Communication partners that initiate in-only message exchanges with the endpoint won't see any difference.

Producer actors

Actors that want to produce messages to endpoints either need to mixin the Producer trait (Scala API) or extend the abstract UntypedProducerActor class (Java API). Although the Producer trait was already available in the initial version of akka-camel, many enhancements have been made since then. Most of them are internal enhancements such as performance improvements and support for asynchronous routing. Also, extensions to the API have been made to support
  • pre-processing of messages before they are sent to an endpoint and
  • post-processing of messages after they have been received as response from an endpoint.
For example, instead of replying to the original sender (default behavior) a producer actor could do a custom post-processing e.g. by forwarding the response to another actor (together with the initial sender reference)
import se.scalablesolutions.akka.actor.Actor
import se.scalablesolutions.akka.camel.Producer

class MyProducer(target: ActorRef) extends Actor with Producer {
def endpointUri = "http://example.org/some/external/service"

override protected def receiveAfterProduce = {
// do not reply to initial sender but
// forward result to a target actor
case msg => target forward msg
}
}
Forwarding results to other actors makes it easier to create actor-based message processing pipelines that make use of external services. Examples are given in the akka-camel documentation.

Typed actors need to use Camel's ProducerTemplate directly to produce messages to Camel endpoints. A managed instance of a ProducerTemplate can be obtained via CamelContextManager.template.

Asynchronous routing

Since Akka 0.10, Camel's asynchronous routing engine is fully supported: in-out and in-only messages exchanges between endpoints and actors are designed to be asynchronous. This is the case for both, consumer and producer actors.

This is especially important for actors that participate in long-running request-reply interactions with external services. Threads are no longer blocked for the full duration of an in-out message exchange and are available for doing other work. There's also an asynchronous routing example described in the online documentation.

Routes to actors

Typed an untyped actors can also be accessed from Camel routes directly, using Akka's TypedActorComponent and ActorComponent, respectively. These are Camel components supporting typed-actor and and actor endpoint URIs in route definitions. For example,
from("seda:test").to("actor:uuid:12345678");
routes a message from a SEDA queue to an untyped actor with uuid 12345678. The actor endpoint looks up the actor in Akka's actor registry.

The TypedActorComponent is an extension of Camel's bean component where method invocations follow the semantics of the actor model. Here is an example route from a direct endpoint to the foo method of a typed actor.
from("direct:test").to("typed-actor:sample?method=foo");
The typed actor is registered under the name sample in the Camel registry. For more details how to add typed actors to the Camel registry, follow this link.

CamelService

Prerequisite for endpoints being activated when starting consumer actors is a running CamelService. When starting Akka in Kernel mode or using the Akka Initializer in a web application, a CamelService is started automatically. In all other cases a CamelService must be started by the application itself. This can be done either programmatically with
import se.scalablesolutions.akka.camel.CamelServiceManager._

startCamelService
or declaratively in a Spring XML configuration file.
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:akka="http://www.akkasource.org/schema/akka"
xmlns:camel="http://camel.apache.org/schema/spring"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
http://www.akkasource.org/schema/akka
http://scalablesolutions.se/akka/akka-0.10.xsd
http://camel.apache.org/schema/spring
http://camel.apache.org/schema/spring/camel-spring.xsd">

<!– A custom CamelContext (SpringCamelContext) –>
<camel:camelContext id="camelContext">
<!– … –>
</camel:camelContext>

<!– Create a CamelService using a custom CamelContext –>
<akka:camel-service>
<akka:camel-context ref="camelContext" />
</akka:camel-service>

</beans>
Usage of the element requires the akka-spring jar on the classpath. This example also shows how the Spring-managed CamelService is configured with a custom CamelContext.

A running CamelService can be stopped either by closing the application context or by calling the CamelServiceManager.stopCamelService method.

Outlook

The next Akka release will be Akka 1.0 (targeted for late fall) and akka-camel development will mainly focus on API stabilization. If you'd like to have some additional features in the next Akka release, want to give feedback or ask some questions, please contact the Akka community at the akka-user mailing list.

Saturday, April 3, 2010

Akka features for application integration

Akka is a platform for event-driven, scalable and fault-tolerant architectures on the JVM. It is mainly written in Scala. One of its core features is support for the actor model that provides a higher level of abstraction for writing concurrent and distributed systems.

Since version 0.7, Akka offers a new feature that let actors send and receive messages over a great variety of protocols and APIs. In addition to the native Scala actor API, actors can now exchange messages with other systems over large number of protcols and APIs such as HTTP, SOAP, TCP, FTP, SMTP or JMS, to mention a few. At the moment, approximately 80 protocols and APIs are supported. This new feature is provided by Akka's Camel module.

At the core of this new feature is Apache Camel, IMHO the most powerful and feature-rich integration framework currently available for the JVM. For an introduction to Apache Camel you may want to read this article. Camel comes with a large number of components that provide bindings to different protocols and APIs. Usage of Camel's integration components in Akka is essentially a one-liner. Here's an example.

import se.scalablesolutions.akka.actor.Actor
import se.scalablesolutions.akka.actor.Actor._
import se.scalablesolutions.akka.camel.{Message, Consumer}

class MyActor extends Actor with Consumer {
def endpointUri =
"mina:tcp://localhost:6200?textline=true"

def receive = {
case msg: Message => { /* ... */}
case _ => { /* ... */}
}
}
// start and expose actor via tcp
val myActor = actorOf[MyActor].start

The above example exposes an actor over a tcp endpoint on port 6200 via Apache Camel's Mina component. The endpointUri is an abstract method declared in the Consumer trait. After starting the actor, tcp clients can immediately send messages to and receive responses from that actor. If the message exchange should go over HTTP (via Camel's Jetty component), only the actor's endpointUri must be redefined.

class MyActor extends Actor with Consumer {
def endpointUri =
"jetty:http://localhost:8877/example"

def receive = {
case msg: Message => { /* ... */}
case _ => { /* ... */}
}
}

Actors can also trigger message exchanges with external systems i.e. produce to Camel endpoints.

import se.scalablesolutions.akka.actor.Actor
import se.scalablesolutions.akka.camel.Producer

class MyActor extends Actor with Producer {
def endpointUri = "jms:queue:example"
protected def receive = produce
}

In the above example, any message sent to this actor will be added (produced) to the example JMS queue. Producer actors may choose from the same set of Camel components as Consumer actors do.

The number of Camel components is constantly increasing. Akka's Camel module can support these in a plug-and-play manner. Just add them to your application's classpath, define a component-specific endpoint URI and use it to exchange messages over the component-specific protocols or APIs. This is possible because Camel components bind protocol-specific message formats to a Camel-specific normalized message format. The normalized message format hides protocol-specific details from Akka and makes it therefore very easy to support a large number of protocols through a uniform Camel component interface. Akka's Camel module further converts mutable Camel messages into immutable representations which are used by Consumer and Producer actors for pattern matching, transformation, serialization or storage, for example.

Highly-scalable eHealth integration solutions with Akka

One goal I had in mind when implementing the Akka Camel module was to have a basis for building highly-scalable eHealth integration solutions. For eHealth information systems it is becoming increasingly important to support standard interfaces as specified by IHE. Financial support from governments strongly depends on eHealth standard compliance.

Building blocks for implementing standard-compliant eHealth applications are provided by the Open eHealth Integration Platform (IPF). IPF is a mature open source integration platform, based on Apache Camel. It provides, among others, extensive support for IHE actor interfaces. These interfaces are based on Apache Camel's component technology. Therefore, it's a one-liner to expose an Akka actor through an IHE compliant interface. The following example implements the server-side interface of the IHE XDS Registry Stored Query (XDS-ITI18) transaction.

class RSQService extends Actor with Consumer {
def endpointUri = "xds-iti18:RSQService"

def receive = {
case msg: Message => { /* ... */}
case _ => { /* ... */}
}
}

In IHE, a message exchange between two participants is called a transaction. Here, the IPF xds-iti18 component is used to implement the server-side interface of the XDS ITI18 transaction. This allows any XDS-ITI18-comaptible client to communicate with the actor over an IHE standard protocol using ebXML/SOAP/HTTP (as defined in the XDS specification). The implementor of the receive method, however, doesn't need to care about all the low-level protocol details (which are scary if you take a closer look). The body of the received message is a high-level object graph containing XDS-ITI18-specific transaction data.

A high-level programming model for implementing eHealth standards is only one of several reasons why I consider Akka as a powerful technical basis for building scalable and fault-tolerant eHealth information systems and integration solutions. Akka's support for NoSQL datastores could further be used for implementing scalabale persistence layers in eHealth applications. Over the next weeks I'm going to explore this field in more detail and will keep you updated with further blog posts on that topic.