The Elements of a StreamBase Application

June 20th, 2007

In prior posts [1, 2] I wrote about my initial experiences with StreamBase’s complex event processing model. Here I will dive a bit deeper and talk about the actual computational tools StreamBase provides. Here I will concentrate on the perspective offered by their graphical environment.  There is a second perspective offered by their StreamSQL language, which does not map exactly one to one with the graphical model (in that distinct graphical elements often combine into single StreamSQL statements and visa versa). I’ll blog about StreamSQL in a future post.

At the end you will find some use cases that bring things together based on my experience building a matching engine for an ECN.

Streams

It should come as little surprise that an application called StreamBase has elements called streams! Streams are the entry and exit points of events in a StreamBase application.

Input Streams

InputStreams

Input Streams define the entry points for events. They are given names and associated with a data schema that defines a tuple (flat structure) of data elements with types integer, boolean, double, string, and timestamp. As you can see in the figure, input streams are arrows with a single output port which connects them to downstream processing elements. One can use the API’s provided for C++, Java and .NET to send data to these inputs. I won’t cover the API’s here but they are really trivial to use. Another developer here at Lab49 (Akintoye Olorode) got a .NET client attached to my ECN app in about an hour.  

Output Stream

OutputStreamsOutput Streams define the exit points for events produced by your application. It is not necessary to define the schema of an output stream because it is derived from the data flowing into its input port.  As with inputs, there are simple API’s for dequeuing data from these streams. Every client that attaches to an output effectively gets its own queue and therefore it can dequeue tuples at its own pace subject to memory constraints and configuration.

Input and Output Adapters

Adapters These adapters are very similar to the corresponding streams except they are pre-configured to attach to specific sources and sinks. Right out of the box, StreamBase includes input adapters for flat files using regular expression parsing, CSV files, Sockets and other StreamBase apps (so you can chain apps together into complex systems). Output adapters include XML, CSV, email and HTTP. You can, of course, write your own adapters using their API and StreamBase has other adapters you can obtain separately.

Operators

Clearly one brings data into a StreamBase app to do something with it.  There is a nice paper mentioned here that describes some of the things one naturally does in a streaming application.  It is a worthwhile read since it relates well to what follows.

Splits

There are two ways to send data off along different processing paths in a  StreamBase application. Split If you want to be very specific you can use a Split (pictured here). A split has one input port and a user defined number of outputs. Tuples coming in flow first to output port 1, then 2 and so on. If the ordering is not crucial you can forego the split and just route multiple connections from an output port  to two or more input ports. 

Maps

Map Maps are very common operators since they allow you to transform data flowing through a given stream. You can add, remove or replace fields in a tuple by using mathematical operations and function calls. Many functions are built-in and you can call external ones written in C++ or Java. The key nature of the Map Operator is that it works on one tuple at a time.   

Filtersfilter

A filter, as the name implies, conditionally passes data. You define one or more predicates and the filter will pass data  through and output port if it  satisfies the predicate. Here we show a simple accept or reject filter but a multi-output switch is possible as well.

Aggregates

aggregate Aggregation is where stream processing begins to get more interesting. Aggregate operators are naturally defined to operate on many tuples. The concept of a window is used to define the tuples that make up the aggregate. Think of the window as having a width and advancing over the stream of tuples. Windows can be defined based on a set number of tuples, a set time frame or dynamically based on data in the tuples themselves (such as a sequence number or timestamp). There is not necessarily a single window associated with an aggregate operator since you can use grouping and independent window opening and closing criteria. I will not attempt to fully describe all the rich behavior here but defer you to the StreamBase docs.

Unions and Merges

Unions and mergers are used to bring multiple independent streams of the same schema into a single stream. A union simple passes tuples as they arrive and there can be as many inputs as necessary.

union-merge

A merge acts between two inputs and uses data in the tuples to order the outputs. In other words, a merge is a kind of streaming sort.    

Gathers and Joins

The Gather and Join operators integrate multiple streams of differing schema. The Gather is most like what one thinks of as a database equi-join in that some common field in the tuples is used to tie the tuples together into a tuple that contains fields from both and matches on the joining field.

gather-join

The Join operator is more general in that a predicate or a range can be used to match multiple tuples. This operator is a bit more complex than gather and I did not find application for it in my own work. Therefore, I’ll defer the reader to the StreamBase docs for further information.

Heartbeats and Metronomes

These operators deal with timers. A Metronome is a source of tuples containing a single field of type timestamp. Like its namesake it delivers these tuples at a specified time delay and can be used to “clock” other operators (for example, to cause windows within aggregations to close).

hb-metro

A Heartbeat operator is slightly different in that it passes tuples from its input to its output provided there is a timestamp field in the tuple. It will uses that timestamp to insure that tuples are emitted at a specified interval even in the absence of input. If it sees no input within the specified time it emits a tuple whose fields are all null except for the time stamp field.

 

Data Constructs and Query Operators

Query Tables

QueryTable These are in-memory database tables that can have multiple indexes (either hashed or ordered). Tables are useful when you need to cache tuples for indeterminate periods so that they can be related to tuples streaming in a a later time.  Although keeping state in this way at first seem to be going against the themes of a streaming database, it really doesn’t. You should think of streaming as building up as much state as necessary to make a decision rather than simply processing tuples on the fly. 

 

JDBC Tablesjdbc

If your streaming application needs to integrate with a traditional relational  data store, StreamBase provides a JDBC interface. Clearly one must be careful to use this facility with care if performance is critical. One possible usage in the financial realm would be to draw on external indicative for various instruments.

 

Materialized Windows

material Materialized Windows all you to buffer tuples as they stream through your application. They have all the options of the windows that are available via aggregate operators (explained above) however they provide indexing, grouping and are acted upon by Query operators (see below) just like Query Tables.

Query Operatorsquery-op

This family of operators allow you to read, insert, update, upsert and delete  data form Query Tables, JDBC Tables and Materialized windows. These operators can take advantage of indexes for efficiency and for ordering results.  

 

Putting It All Together

As a mentioned in my earlier post, I used StreamBase to build a simple matching engine for an ECN that would handle limit orders and cancels of limit orders. The matching engine contains input streams for orders, cancels and requests for depth of book. It has output streams for quotes, trades (tick data), execution reports, cancel confirmations, and delta notifications on depth of book.

Below I walk through a partial list of use cases and how they were implemented in terms of the StreamBase operators and data constructs.   

Updating the Order Book

The most obvious thing a matching engine needs to do is keep a database of orders that are on the book. A ECN or exchange excepts orders at various prices (most not yet marketable) and holds them until they become marketable, are canceled or expire.

order-in

Here I used a Map operator to place a timestamp on incoming orders. The timestamp flows through some splits that are used to route the order down other processing paths (elided here) after it is stored in the order book (for example, there is a path that considers whether the order is executable). Following the splits, the order hits a query operator that inserts it into the database implemented as a Query Table.

Updating the Order Summary

In order to make a matching engine efficient, one needs to avoid dealing with the order book directly on an order by order basis. A typical way of avoiding this is to keep a summary table organized by side (BUY or SELL) and price which stores the aggregate quantity available at that price and possibly the number of orders and other stats. This table is convenient for testing executability of incoming orders, computing the quote and providing depth of book.

summary-update  

Here again I have elided irrelevant elements. You can see the use of a union and the initial stages of processing. There are three distinct events that might trigger an update to summary data: new orders, cancels and executions. A union combines these disparate events into one event stream that triggers the update. An aggregate operation is used to accumulate updates that are part of the same transaction and the net result is written to a table called SummaryDB. The SummaryUpdateQuery operator does an upsert since it may be crating a new price level of adding to an existing price level. You can also see a Delete query that is used to remove records from the summary database when there is no quantity remaining at a price.  

Extracting the Quote

I was really quite impressed with the speed by which I was able to construct the bulk of the ECN (it took about two days to get 80% of the functionality). One speed bump occurred when trying to figure out how to compute the quote from the Summary DB table. In practice this should have been trivial because the quote is literally sitting in the table at the max price on the BUY side and min price on the SELL side. In SQL this extraction is easily implemented using HAVING. For example, getting the BID would look like this:

SELECT symbol, MAX(price) AS BidPrice, qty AS BidQty
FROM StockSummary
WHERE symbol = “IBM” and side = 0
GROUP BY symbol
HAVING price = MAX(price)

As it turns out, StreamSQL apparently has a HAVING clause. However, this functionality is not exposed via the visual development environment (as far as I can see). So I had to decompose the problem into first establishing the extreme prices on each side and then looking up the quantity at that price. This took four queries instead of two.

quote

You can also see the gather operation come into play to integrate both sides of the quote (bid and ask) into a single tuple prior to emitting it on an output stream.

Conclusion

I have quite a bit of experience working on matching engines and this one certainly had a small subset of the functionality of a full blown matching engine for an ECN or exchange. However, had I approached this project directly in C++ I am pretty sure it would take over a week to get the core functionality and more to build scaffolding to test it. With StreamBase I had working pieces in a few hours, 80% of the functionality in two days and a complete working engine in under a week.

Of course, it is fair to ask how this would scale to building a full service matching engine for multiple order types, cancel with replace, away market shipping and other bells and whistles. Given the version of StreamBase I worked with (3.7) I think there would be some scalability issues. However,  just yesterday I got an early peek of the upcoming StreamBase release at NYC SIFMA conference. This addresses many of the concerns I posted in my earlier blogs and definitely shows StreamBase’s commitment to delivering production quality development tools for event processing. You’ll have to wait for a future blog for discussion of these new goodies.   

One Response to “The Elements of a StreamBase Application”

  1. Lab49 Blog » Blog Archive » Lab49’s presence at SIFMA 2007 Says:

    [...] StreamBase featured our ECN application in their booth. I spent some time there talking with their team and as usual came away totally impressed. Like most people, I can’t wait to get my hands on the just-announced StreamBase 5.0 which promises to be a major advancement. Those guys just continue to push the envelope. [...]