Apama Monitorscript by Example

 
Thread Tools Search this Thread
Special Forums News, Links, Events and Announcements Complex Event Processing RSS News Apama Monitorscript by Example
# 1  
Old 02-21-2008
Apama Monitorscript by Example

Louis Lovas
Wed, 20 Feb 2008 20:58:37 -0500
Apama Monitorscript by Example There have been a couple recent posts on Apama's Monitorscript language, both here and here. To provide a bit more insight into the Apama EPL, below is a working sample that demonstrates a number of its capabilities. The language includes a declarative nature for defining and registering listeners for specific event types and it has a java-like syntax for imperative logic. The language provides a balance between a recognizable vernacular and a purposed nature for event processing.

Example narrative
Prior to an annotated walk-thru of the code sample, I thought it would help to first explain its purpose and what event streams it's processing. This simple example defines a work dispatcher. It receives a request in the form of an event (AddSymbol) to dispatch a discrete listener against an event stream of market depth (bids and asks) events. This discrete listener processes the market depth for a specific symbol. The actual work performed as it pertains to this example is inconsequential and is represented by an empty method (processDepth). Additionally, once a listener is dispatched it also listens for a request to terminate.

The subtleness of this example is its ability to leverage the simplicity of the Apama EPL and the power of the runtime engine wherein it executes. Thousands or even tens of thousands of listeners can be dispatched each running in its own independent context processing its unique slice of the streaming market data.

In reality there are a number of techniques that can be employed within the MonitorScript EPL to accomplish this sort of work dispatcher. The EPL includes a spawn operator which I've outlined in a previous blog. The spawn operator is the primary means for establishing independent worker threads and is the basis for instance creation. The example below focuses on event listeners to define discrete units of work.

1 package com.apamax.sample;

2 monitor ProcessMarket {
3 sequence symbols; // contains list of symbols to process.
4 com.apama.marketdata.SubscribeDepth subDepth;
5 com.apama.marketdata.Depth adepth;
6 dictionary< string, string > emptyDict;

7 action onload {

8 // Listen for incoming AddSymbol events and
9 // add to symbols list if not already present
10 AddSymbol addSymbol;
11 on all AddSymbol(): addSymbol {
12 if symbols.indexOf(addSymbol.symbol) = -1 then {
13 string local_symbol := addSymbol.symbol;
14 symbols.append(local_symbol);

15 // Subscribe to this symbol

16 route com.apama.marketdata.SubscribeDepth("", "", local_symbol, emptyDict );

17 // wait for 20.0 seconds, if no depth event received, terminate
18 listener waitListener;
19 waitListener := on wait(20.0) {
20 route RemoveSymbol(local_symbol);
21 }

22 listener depthListener;
23 depthListener := on all com.apama.marketdata.Depth(symbol=local_symbol):adepth {
24 waitListener.quit();
25 processDepth(adepth);
26 }

27 // Listen for RemoveSymbol events and remove from symbols list,
28 // unsubscribe & quit
29 RemoveSymbol removeSymbol;
30 on RemoveSymbol(symbol=local_symbol): removeSymbol {
31 integer index := symbols.indexOf(removeSymbol.symbol);
32 if index != -1 then {
33 symbols.remove(index);
34 processRemove(removeSymbol.symbol);
35
36 // Unsubscribe to this symbol
37 route com.apama.marketdata.UnsubscribeDepth("", "",
removeSymbol.symbol,
emptyDict );
38 depthListener.quit();
39 }
40 }
41 }
42 else {
43 log "Debug: Ignored (existing) Add Symbol Event = " + addSymbol.symbol;
44 }
45 }
46 }

47 action processDepth(com.apama.marketdata.Depth d) {
48 // Do something
49 }

50 action processRemove(string s) {
51 // Do something.
52 }
53 }


Example Annotation

In describing this example, the first point to note is that the event definitions are not included. For the sake of brevity they're assumed to be defined elsewhere. Actually there are only a few anyway. They can be categorized into two logical groups; control events (AddSymbol, RemoveSymbol, SubscribeDepth, UnsubscribeDepth) and data events (Depth). This categorization is only for a semantic understanding of the example, there is no such classification in the language. Additionally, Monitorscript has an easily recognizable syntax to anyone schooled in Java, C++ and other classic languages.

A monitor (line 2) defines the encapsulating block definition. Similar to a java class it is typically scoped to a package name space (line 1). Monitors are the main block scope and a typical Apama application is made up of many monitors that interact with one another by sending and receiving events. Within a monitor one can declare events, define actions (i.e. methods) and variables (integers, strings, floats, etc.). This example defines a handful of monitor-scoped variables. The language also supports a number of complex data types; the sequence and dictionary both use a C++ template style declaration to define array types and collection types respectively (lines 3 and 6).

The onload action (line 7) is the main entry point of a monitor. When a monitor is loaded into the runtime engine, it's onload action is immediately invoked. This work dispatcher example is entirely implemented within this action, for the sake of brevity it's a simple way to describe the language. Line 10 defines an instance of an AddSymbol event and declares a listener for all occurrences of this event type (line 11). The remainder of the functionality of this example is scoped to the encapsulating block of this listener (lines 12 - 45). This is an important note, since the intent is to receive and process multiple AddSymbol events (potentially 1,000's) where each AddSymbol will cause the invocation (dispatch) of a discrete unit of work that is represented by this encapsulating block of code. Within this block of code we communicate with other monitors and establish a number of unique listeners for this unique symbol name.

The route statement (line 16) sends a SubscribeDepth event. Route is the standardized form of communication between monitors. Under the covers, the route statement causes the event to be routed to be placed at the head of the engine's input queue - thus become the next event to the processed by the engine. Semantically, routing a SubscribeDepth event starts the flow of Depth events for this symbol (i.e. local_symbol). Lines 22-26 establish a listener to receive the stream of Depth events for this symbol, calling the action processDepth upon receipt of each one.

In addition to establishing a Depth listener, this block of code also creates a wait timer in lines 17-21. The purpose of this timer is to terminate this dispatched unit of work for this unique symbol if we do not receive an initial Depth event within 20 seconds. Line 24 kills that wait listener once the Depth events start flowing. Termination is handled by the RemoveSymbol listener declared at line 30. Note that since it will be executing within the context of a specific symbol's unit of work we're only interested in receiving a single occurrence of RemoveSymbol. This is specified in the on statement - sans the all modifier. Upon receipt of a RemoveSymbol event we unsubscribe, remove the symbol's entry from the list and terminate (i.e. quit) the Depth listener for this symbol. Like AddSymbol, RemoveSymbol control events can arrive from another monitor or a client connected to the runtime engine.

I hope this simple example sheds light on the simplicity, elegance and power of the Apama Montorscript EPL.

Post Script ...

After posting this blog, one of my esteemed colleagues with a much better command of the Monitorscript language offered a few refinements to avoid the need to manually handle termination (i.e. lines 17 - 21 in the code snippet). It does add one new control event - Terminate, but it avoids the need to use listener variables.


on com.apama.marketdata.Depth(symbol=local_symbol):adepth and not wait (20.0) {
on all com.apama.marketdata.Depth(symbol=local_symbol):adepth and not Terminate(local_symbol) {
processDepth(adepth);
}
}

...


on RemoveSymbol(symbol=local_symbol):removeSymbol {
...
route Terminate(removeSymbol.symbol);
}



This enhancement shows the declaration of complex (or compound) listeners against multiple event streams (and a timeout condition) concurrently. This is a commonly used technique in MonitorScript - and clearly quite powerful.




Source...
Login or Register to Ask a Question

Previous Thread | Next Thread
Login or Register to Ask a Question