CEP coding: An event-cache for Apama

March 13th, 2008

CEP engines try to keep state-management out of the developers’ hands (or, if you prefer, out of your way), letting you concentrate on just the events. Inevitably, though, we end up needing just a little more state than we have in the current event stream; or we have an event that’s needed by several monitors (e.g., benchmark prices); or the event data’s needed in several monitors, but doesn’t actually trigger any action.

Enter the cache-monitor: a common Apama pattern we’ve used in our projects. It’s a simple pattern, but I’m posting it to illustrate Apama’s programming model which seems to be not common knowledge. What throws people for a loop at first is that in order to retrieve a value from a cache-monitor, we mimic a function call from typical procedural languages by routing a Request-event to that monitor, and listening for the matching Reply-event routed back to us by the cache. To see what I mean, consider the 3 snippets below.

First, here are our Request and Reply events:


event PriceReq {
string ticker;
}
event PriceReply {
string ticker;
wildcard Price price;
wildcard boolean exists;
}

The keyword wildcard tells the event-engine that those marked fields should not be used in event-matching. Note also that the reply has a validity/existence boolean; this is because we always need the reply event, even when the requested item is not found.

By route-ing the PriceRequest-event we place it at the front of the event-queue, and hand control back to the event-engine after our current listener completes installing it’s PriceReply-listener. The event-engine will find matches for that PriceReq event — the listener in our cache-monitor. That listener’s code block will execute — code that will route the reply event matching the caller’s PriceReply-listener. The result is effectively a function call via eventing.

Now here’s the cache-monitor:


monitor PriceCache {
Price px;
PriceReq req;
dictionary<string,Price> allPrices; // the cache
action onload() {
// cache incoming prices
on all Price() :px {
if( not allPrices.hasKey(px.ticker) ) then {
allPrices.put(px.ticker, px);
}
}
// request listener
on all PriceReq() :req getPrice {
if( allPrices.hasKey(px.ticker) then {
route new PriceReply(px.ticker, allPrices[px.ticker], true);
} else {
// not found, reply contains empty Price and ‘false’ flag
route new PriceReply(px.ticker, new Price, false);
}
// NOTE: if code were here, it would actually execute
// before any PriceReply-listeners execute!
}
}
}

Can you see how you would code cache-lookups yet? It’s a good exercise to go through as it starts you down the road toward thinking in event-listeners instead of the more familiar procedural mode of pascal, C and friends. Here’s the final piece of the puzzle:


route new PriceReq(”LAB49″);
on PriceReply(ticker = “LAB49″) :priceReply {
if( priceReply.exists ) {
// do something with priceReply.price
}
}

One final note: our Reply-listener uses the syntax

on PriceReply (ticker = "LAB49") {

as opposed to

on all PriceReply (ticker = “LAB49″) {

This is because we’re only looking for a single matching reply, and after we get it our listener should be removed and never match anything else.

One Response to “CEP coding: An event-cache for Apama”

  1. Kalani Thielen Says:

    That’s exactly why Apama Monitorscript needs functional modules or classes, and parametric polymorphism. The dictionary type shouldn’t be a magic type, it should be expressible in Monitorscript, and it should be possible to import user-defined types (like your informal cache type) across monitors the same way you can import dictionary across monitors.

    Grumble, grumble …