Monday, August 30, 2010

Akka's grown-up hump

It's quite some time ago when I last wrote about Akka's Camel integration. An initial version of the akka-camel module was released with Akka 0.7. Meanwhile Akka 0.10 is out with an akka-camel module containing numerous new features and enhancements. Some of them will be briefly described in this blog post.

Java API

The akka-camel module now offers a Java API in addition to the Scala API. Both APIs are fully covered in the online documentation.

Support for typed consumer actors

Methods of typed actors can be published at Camel endpoints by annotating them with @consume. The annotation value defines the endpoint URI. Here's an example of a typed consumer actor in Java.
import org.apache.camel.Body;
import org.apache.camel.Header;
import se.scalablesolutions.akka.actor.TypedActor;
import se.scalablesolutions.akka.camel.consume;

public interface MyTypedConsumer {
@consume("file:data/foo")
public void foo(String body);

@consume("jetty:http://localhost:8877/camel/bar")
public String bar(@Body String body, @Header("Content-Type") String contentType);
}

public class MyTypedConsumerImpl extends TypedActor implements MyTypedConsumer {
public void foo(String body) {
System.out.println(String.format("Received message: ", body));
}

public String bar(String body, String contentType) {
return String.format("body=%s Content-Type header=%s", body, conctentType);
}
}
When creating an instance of the typed actor with
import se.scalablesolutions.akka.actor.TypedActor;

// Create typed actor and activate endpoints
MyTypedConsumer consumer = TypedActor.newInstance(
MyTypedConsumer.class, MyTypedConumerImpl.class);
then the actor's foo method can be invoked by dropping a file into the data/foo directory. The file content is passed via the body parameter. The bar method can be invoked by POSTing a message to http://localhost:8877/camel/bar. The HTTP message body is passed via the body parameter and the Content-Type header via the contentType parameter. For parameter binding, Camel's parameter binding annotations are used.

Endpoint lifecycle

Consumer actor endpoints are activated when the actor is started and de-activated when the actor is stopped. This is the case for both typed and untyped actors. An actor can either be stopped explicitly by an application or by a supervisor.

Fault tolerance

When a consumer actor isn't stopped but restarted by a supervisor, the actor's endpoint remains active. Communication partners can continue to exchange messages with the endpoint during the restart phase but message processing will occur only after restart completes. For in-out message exchanges, response times may therefore increase. Communication partners that initiate in-only message exchanges with the endpoint won't see any difference.

Producer actors

Actors that want to produce messages to endpoints either need to mixin the Producer trait (Scala API) or extend the abstract UntypedProducerActor class (Java API). Although the Producer trait was already available in the initial version of akka-camel, many enhancements have been made since then. Most of them are internal enhancements such as performance improvements and support for asynchronous routing. Also, extensions to the API have been made to support
  • pre-processing of messages before they are sent to an endpoint and
  • post-processing of messages after they have been received as response from an endpoint.
For example, instead of replying to the original sender (default behavior) a producer actor could do a custom post-processing e.g. by forwarding the response to another actor (together with the initial sender reference)
import se.scalablesolutions.akka.actor.Actor
import se.scalablesolutions.akka.camel.Producer

class MyProducer(target: ActorRef) extends Actor with Producer {
def endpointUri = "http://example.org/some/external/service"

override protected def receiveAfterProduce = {
// do not reply to initial sender but
// forward result to a target actor
case msg => target forward msg
}
}
Forwarding results to other actors makes it easier to create actor-based message processing pipelines that make use of external services. Examples are given in the akka-camel documentation.

Typed actors need to use Camel's ProducerTemplate directly to produce messages to Camel endpoints. A managed instance of a ProducerTemplate can be obtained via CamelContextManager.template.

Asynchronous routing

Since Akka 0.10, Camel's asynchronous routing engine is fully supported: in-out and in-only messages exchanges between endpoints and actors are designed to be asynchronous. This is the case for both, consumer and producer actors.

This is especially important for actors that participate in long-running request-reply interactions with external services. Threads are no longer blocked for the full duration of an in-out message exchange and are available for doing other work. There's also an asynchronous routing example described in the online documentation.

Routes to actors

Typed an untyped actors can also be accessed from Camel routes directly, using Akka's TypedActorComponent and ActorComponent, respectively. These are Camel components supporting typed-actor and and actor endpoint URIs in route definitions. For example,
from("seda:test").to("actor:uuid:12345678");
routes a message from a SEDA queue to an untyped actor with uuid 12345678. The actor endpoint looks up the actor in Akka's actor registry.

The TypedActorComponent is an extension of Camel's bean component where method invocations follow the semantics of the actor model. Here is an example route from a direct endpoint to the foo method of a typed actor.
from("direct:test").to("typed-actor:sample?method=foo");
The typed actor is registered under the name sample in the Camel registry. For more details how to add typed actors to the Camel registry, follow this link.

CamelService

Prerequisite for endpoints being activated when starting consumer actors is a running CamelService. When starting Akka in Kernel mode or using the Akka Initializer in a web application, a CamelService is started automatically. In all other cases a CamelService must be started by the application itself. This can be done either programmatically with
import se.scalablesolutions.akka.camel.CamelServiceManager._

startCamelService
or declaratively in a Spring XML configuration file.
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:akka="http://www.akkasource.org/schema/akka"
xmlns:camel="http://camel.apache.org/schema/spring"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
http://www.akkasource.org/schema/akka
http://scalablesolutions.se/akka/akka-0.10.xsd
http://camel.apache.org/schema/spring
http://camel.apache.org/schema/spring/camel-spring.xsd">

<!– A custom CamelContext (SpringCamelContext) –>
<camel:camelContext id="camelContext">
<!– … –>
</camel:camelContext>

<!– Create a CamelService using a custom CamelContext –>
<akka:camel-service>
<akka:camel-context ref="camelContext" />
</akka:camel-service>

</beans>
Usage of the element requires the akka-spring jar on the classpath. This example also shows how the Spring-managed CamelService is configured with a custom CamelContext.

A running CamelService can be stopped either by closing the application context or by calling the CamelServiceManager.stopCamelService method.

Outlook

The next Akka release will be Akka 1.0 (targeted for late fall) and akka-camel development will mainly focus on API stabilization. If you'd like to have some additional features in the next Akka release, want to give feedback or ask some questions, please contact the Akka community at the akka-user mailing list.

Saturday, April 3, 2010

Akka features for application integration

Akka is a platform for event-driven, scalable and fault-tolerant architectures on the JVM. It is mainly written in Scala. One of its core features is support for the actor model that provides a higher level of abstraction for writing concurrent and distributed systems.

Since version 0.7, Akka offers a new feature that let actors send and receive messages over a great variety of protocols and APIs. In addition to the native Scala actor API, actors can now exchange messages with other systems over large number of protcols and APIs such as HTTP, SOAP, TCP, FTP, SMTP or JMS, to mention a few. At the moment, approximately 80 protocols and APIs are supported. This new feature is provided by Akka's Camel module.

At the core of this new feature is Apache Camel, IMHO the most powerful and feature-rich integration framework currently available for the JVM. For an introduction to Apache Camel you may want to read this article. Camel comes with a large number of components that provide bindings to different protocols and APIs. Usage of Camel's integration components in Akka is essentially a one-liner. Here's an example.

import se.scalablesolutions.akka.actor.Actor
import se.scalablesolutions.akka.actor.Actor._
import se.scalablesolutions.akka.camel.{Message, Consumer}

class MyActor extends Actor with Consumer {
def endpointUri =
"mina:tcp://localhost:6200?textline=true"

def receive = {
case msg: Message => { /* ... */}
case _ => { /* ... */}
}
}
// start and expose actor via tcp
val myActor = actorOf[MyActor].start

The above example exposes an actor over a tcp endpoint on port 6200 via Apache Camel's Mina component. The endpointUri is an abstract method declared in the Consumer trait. After starting the actor, tcp clients can immediately send messages to and receive responses from that actor. If the message exchange should go over HTTP (via Camel's Jetty component), only the actor's endpointUri must be redefined.

class MyActor extends Actor with Consumer {
def endpointUri =
"jetty:http://localhost:8877/example"

def receive = {
case msg: Message => { /* ... */}
case _ => { /* ... */}
}
}

Actors can also trigger message exchanges with external systems i.e. produce to Camel endpoints.

import se.scalablesolutions.akka.actor.Actor
import se.scalablesolutions.akka.camel.Producer

class MyActor extends Actor with Producer {
def endpointUri = "jms:queue:example"
protected def receive = produce
}

In the above example, any message sent to this actor will be added (produced) to the example JMS queue. Producer actors may choose from the same set of Camel components as Consumer actors do.

The number of Camel components is constantly increasing. Akka's Camel module can support these in a plug-and-play manner. Just add them to your application's classpath, define a component-specific endpoint URI and use it to exchange messages over the component-specific protocols or APIs. This is possible because Camel components bind protocol-specific message formats to a Camel-specific normalized message format. The normalized message format hides protocol-specific details from Akka and makes it therefore very easy to support a large number of protocols through a uniform Camel component interface. Akka's Camel module further converts mutable Camel messages into immutable representations which are used by Consumer and Producer actors for pattern matching, transformation, serialization or storage, for example.

Highly-scalable eHealth integration solutions with Akka

One goal I had in mind when implementing the Akka Camel module was to have a basis for building highly-scalable eHealth integration solutions. For eHealth information systems it is becoming increasingly important to support standard interfaces as specified by IHE. Financial support from governments strongly depends on eHealth standard compliance.

Building blocks for implementing standard-compliant eHealth applications are provided by the Open eHealth Integration Platform (IPF). IPF is a mature open source integration platform, based on Apache Camel. It provides, among others, extensive support for IHE actor interfaces. These interfaces are based on Apache Camel's component technology. Therefore, it's a one-liner to expose an Akka actor through an IHE compliant interface. The following example implements the server-side interface of the IHE XDS Registry Stored Query (XDS-ITI18) transaction.

class RSQService extends Actor with Consumer {
def endpointUri = "xds-iti18:RSQService"

def receive = {
case msg: Message => { /* ... */}
case _ => { /* ... */}
}
}

In IHE, a message exchange between two participants is called a transaction. Here, the IPF xds-iti18 component is used to implement the server-side interface of the XDS ITI18 transaction. This allows any XDS-ITI18-comaptible client to communicate with the actor over an IHE standard protocol using ebXML/SOAP/HTTP (as defined in the XDS specification). The implementor of the receive method, however, doesn't need to care about all the low-level protocol details (which are scary if you take a closer look). The body of the received message is a high-level object graph containing XDS-ITI18-specific transaction data.

A high-level programming model for implementing eHealth standards is only one of several reasons why I consider Akka as a powerful technical basis for building scalable and fault-tolerant eHealth information systems and integration solutions. Akka's support for NoSQL datastores could further be used for implementing scalabale persistence layers in eHealth applications. Over the next weeks I'm going to explore this field in more detail and will keep you updated with further blog posts on that topic.

Sunday, February 7, 2010

Accessing a security-enabled Google App Engine service with Apache Camel

In a previous post I've described the low-level details for programmatic login to a Google App Engine service from a Java client. Things are getting much easier when using Apache Camel. The recently committed glogin component makes it trivial to login to a remotely deployed Google App Engine service as well as to a local development server. In the following example, an application-specific authorization cookie is obtained with the glogin component. It authorizes a client application to access http://camelcloud.appspot.com on Google App Engine.

import org.apache.camel.Exchange;
import org.apache.camel.ProducerTemplate;
import static org.apache.camel.component.gae.login.GLoginBinding.*;

...

ProducerTemplate template = ...

Exchange result = template.request(
"glogin://camelcloud.appspot.com"
+ "?userName=replaceme@gmail.com"
+ "&password=replaceme", null);
String cookie = result.getOut().getHeader(
GLOGIN_COOKIE, String.class));

Please note that the password is only sent to the Google Accounts API for authentication. It is never sent to Google App Engine or included into any URL. The obtained authorization cookie is valid for 24 hours and needs to be sent with subsequent requests to the GAE application. If inclusion of user credentials in an endpoint URI is not an option, username and password can also be dynamically set (per request) using Camel message headers:

import org.apache.camel.Exchange;
import org.apache.camel.ProducerTemplate;
import static org.apache.camel.component.gae.login.GLoginBinding.*;

...

ProducerTemplate template = ...

Exchange result = template.request(
"glogin://camelcloud.appspot.com", new Processor() {
public void process(Exchange exchange) {
exchange.getIn().setHeader(
GLOGIN_USER_NAME, "replaceme@gmail.com");
exchange.getIn().setHeader(
GLOGIN_PASSWORD, "replaceme");
}
});
String cookie = result.getOut().getHeader(
GLOGIN_COOKIE, String.class));

To login to a local development server, the devMode parameter in the endpoint URI must be set to true.

import org.apache.camel.Exchange;
import org.apache.camel.ProducerTemplate;
import static org.apache.camel.component.gae.login.GLoginBinding.*;

...

ProducerTemplate template = ...

Exchange result = template.request(
"glogin://localhost:8888"
+ "?userName=test@example.org"
+ "&devMode=true", null);
String cookie = result.getOut().getHeader(
GLOGIN_COOKIE, String.class));
The glogin component is part of the Camel Components for Google App Engine.

Add OAuth to your web application with Apache Camel

OAuth is an open protocol to allow secure API authorization from desktop and web applications. Google, for example, already supports OAuth for authorizing 3rd-party applications to access Google services on behalf of a user.

Recently, I added the gauth component to Apache Camel. You can use it to implement OAuth consumer functionality for any web application with only a few lines of code. gauth endpoints take care of exchanging authorization and access tokens between a web application and an OAuth service provider. At the moment, the gauth component can be used to interact with Google's OAuth services, later versions will support other OAuth providers as well.

From a user's perspective, an example OAuth scenario might look as follows:
  • The user logs into a web application that uses the Google Calendar API, for example.
  • To authorize access, the user is redirected to a Google Accounts authorization page where access for the requesting web application can be granted or denied.
  • After granting access the user is redirected back to the web application and the web application can now access the user's calendar data.
  • The user can revoke access at any time within Google Accounts.
To implement that scenario with Apache Camel, two routes are needed. The first route obtains an unauthorized request token from Google and then redirects the user to the Google Accounts authorization page:

String encodedCallback = URLEncoder.encode(
"https://example.org/handler", "UTF-8");
String encodedScope = URLEncoder.encode(
"http://www.google.com/calendar/feeds/", "UTF-8");

from("jetty:http://0.0.0.0:8080/authorize")
.to("gauth://authorize"
+ "?callback=" + encodedCallback
+ "&scope=" + encodedScope);

In this example, the authorization request is triggered by the user by sending a GET request to http://example.org/authorize (e.g. by clicking a link in the browser). The gauth://authorize endpoint then obtains an unauthorized request token from Google. The scope parameter in the endpoint URI defines which Google service the web application wants to access. After having obtained the token, the endpoint generates a redirect response (302) which redirects the user to the Google Accounts authorization page. After granting access, the user is redirected back to the web application (callback parameter). The callback now contains an authorized request token that must finally be upgraded to an access token. Handling the callback and upgrading to an access token is done in the second route.

from("jetty:https://example.org/handler")
.to("gauth://upgrade")
.to(new StoreTokenProcessor())

The jetty endpoint receives the callback from Google. The gauth://upgrade endpoint takes the authorized request token from the callback and upgrades it to an access token. The route finally stores the long-lived access token for the current user. The next time the user logs into the web application, the access token is already available and the application can continue to access the user's Google Calendar data without needing further user interaction. The user can invalidate the access token at any time within Google Accounts.

Only these two routes are needed to integrate with Google's OAuth provider services. The routes can perfectly co-exist with any other web application framework. Whereas the web framework provides the basis for web application-specific functionality, the OAuth service provider integration is done with Apache Camel. This approach allows for a clean separation of integration logic from application or domain logic.

For handling OAuth requests, web applications can also use other components than Camel's jetty component, such as the servlet component. For adding OAuth to Google App Engine applications, the jetty component needs to be replaced with Camel's ghttp component. Here's an example:

String encodedCallback = URLEncoder.encode(
"https://camelcloud.appspot.com/handler", "UTF-8");
String encodedScope = URLEncoder.encode(
"http://www.google.com/calendar/feeds/", "UTF-8");

from("ghttp:///authorize")
.to("gauth://authorize"
+ "?callback=" + encodedCallback
+ "&scope=" + encodedScope);

from("ghttp:///handler")
.to("gauth://upgrade")
.to(new StoreTokenProcessor())

The following figure gives an overview how the OAuth sequence of interactions relate to the gauth://authorize and gauth://upgrade endpoints.



Accessing a Google service with an access token (step 9) is application-specific and not covered by the gauth component. To get access to a user's Google Calendar data with an access token, one could use the GData client library. The gauth component documentation contains an example.

The gauth component is the first step towards a broader support of security standards such as OAuth and OpenID in Apache Camel. I'm currently thinking of the following extensions
  • A Camel OpenID component
  • A Camel OpenID/OAuth hybrid component
  • Support OAuth providers other than Google
The gauth component is currently part of the Camel 2.3 development snapshot (sources).

Thursday, January 14, 2010

Accessing a security-enabled Google App Engine service from a Java client

After a rather long search on Google pages and forums I could only find fragmented information how to programmatically access a Google App Engine service that requires users to authenticate. In this blog post I'm going to summarize my findings for a Java client application.

With programmatic access I mean that the user doesn't need to enter username and password into a login form created by Google but rather into an installed client application and the client coordinates the authentication and authorization process programmatically. The mechanism used here is the ClientLogin for installed applications.

The first step is to obtain an authentication token from the Google Accounts API. The easiest way to do that is with the GData client library for Java.

import java.net.URLEncoder;

import com.google.gdata.client.GoogleAuthTokenFactory;
import com.google.gdata.util.AuthenticationException;

public class AuthExample {

public static void main(String[] args) throws Exception {

String username = "myusername@gmail.com";
String password = "mypassword";
String serviceName = "ah";

GoogleAuthTokenFactory factory = new GoogleAuthTokenFactory(serviceName, "", null);
// Obtain authentication token from Google Accounts
String token = factory.getAuthToken(username, password, null, null, serviceName, "");

...
}
}

One has to provide username an password and the name of the Google service that should be accessed. For Google App Engine the service name is always ah, regardless of the name of the deployed application. The next step is to do a login at Google App Engine. The login URL is https://example.appspot.com/_ah/login?continue=https%3A%2F%2Fexample.appspot.com%2Fexample&auth=DQAAAJc...qNUA8. The continue query parameter instructs the login service where to rederict after successful login. In this example the redirect goes to https://example.appspot.com/example. The auth query parameter contains the authentication token obtained before.

import org.apache.http.HttpResponse;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.DefaultHttpClient;

public class AuthExample {

public static void main(String[] args) throws Exception {
...

String token = ...
String serviceUrl = "https://example.appspot.com/example";
String loginUrl = "https://example.appspot.com/_ah/login?continue=" +
URLEncoder.encode(serviceUrl, "UTF-8") + "&auth=" + token;

HttpClient httpclient = new DefaultHttpClient();
HttpGet httpget = new HttpGet(loginUrl);
HttpResponse response = httpclient.execute(httpget);
// process response
// ...

httpclient.getConnectionManager().shutdown();
}
}

When the login service sends a redirect after successful login, it also returns a cookie that allows the client to finally access the protected App Engine service at https://example.appspot.com/example. The redirect and cookie handling is done by the httpclient automatically. For the duration of the session the protected App Engine service can be accessed with that cookie.

Update: If the service expects POST requests instead of GET requests then an automated redirect is not an option. In this case, redirect must be disabled for the for the httpclient and a POST request to the serviceUrl must be created manually. Also, the authorization cookie must be set explicitly.

import org.apache.http.Header;
import org.apache.http.HttpResponse;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.params.ClientPNames;
import org.apache.http.impl.client.DefaultHttpClient;

public class AuthExample {

public static void main(String[] args) throws Exception {
...

String token = ...
String loginUrl = "https://example.appspot.com/_ah/login?auth=" + token;
String serviceUrl = "https://example.appspot.com/example";

HttpClient httpclient = new DefaultHttpClient();
httpclient.getParams().setBooleanParameter(ClientPNames.HANDLE_REDIRECTS, false);
HttpGet httpget = new HttpGet(loginUrl);
HttpResponse response = httpclient.execute(httpget);
// Get cookie returned from login service
Header[] headers = response.getHeaders("Set-Cookie");
httpclient.getConnectionManager().shutdown();

httpclient = new DefaultHttpClient();
HttpPost httppost = new HttpPost(serviceUrl);
// set cookie returned by login service
for (Header header : headers) {
httppost.addHeader("Cookie", header.getValue());
}
// set request entity body
// ...

response = httpclient.execute(httppost);
// process response
// ...

httpclient.getConnectionManager().shutdown();
}
}
Update: Login to a local development server. To get access to a security-enabled application on the local development server there's no need for getting an authentication token. Instead, POST an email address and a redirect URL to http://localhost:<port>/_ah/login and the server returns an authorization cookie. Here's an example:
HttpClient httpClient = new DefaultHttpClient();
httpClient.getParams().setBooleanParameter(
ClientPNames.HANDLE_REDIRECTS, false);
// POST login data to GAE SDK dev server
HttpPost httpPost = new HttpPost(
"http://localhost:8888/_ah/login");
httpPost.setHeader("Content-Type",
"application/x-www-form-urlencoded");
String email = URLEncoder.encode(
"test@example.com", "UTF-8");
String redirectUrl = URLEncoder.encode(
"http://localhost:8888", "UTF-8");
httpPost.setEntity(new StringEntity(
"email=" + email + "&continue=" + redirectUrl));
HttpResponse response = httpClient.execute(httpPost);
// Extract authorization cookie from response
String cookie = response.getFirstHeader("Set-Cookie").getValue();
httpClient.getConnectionManager().shutdown();
// Create a new client and access the secured
// service with the authorization cookie
httpClient = new DefaultHttpClient();
HttpGet httpget = new HttpGet("http://localhost:8888");
httpget.addHeader("Cookie", cookie);
response = httpClient.execute(httpget);
System.out.println(IOUtils.toString(response.getEntity().getContent()));
httpClient.getConnectionManager().shutdown();