Reactive Extensions for .NET

Generating Market Data Snapshot from a Tick Stream using Rx

October 16th, 2010

One of my colleges (http://weblogs.asp.net/sweinstein/) brought up an interesting problem: there is a common theme in finance applications to subscribe to a market data source to receive real time updates on the instruments that this source covers. A late subscriber (the one that has not been subscribed since the start of the trading day, for example) usually expects to receive a snapshot per instrument before real time updates. So what is the best way to support snapshot guarantee, per instrument, from a “hot” underlying source using Rx? There are multiple ways to do it, of course, and here is my version of it:

RX Sandbox

August 31st, 2010

If you are learning the Reactive Extensions for .Net, a great learning tool is RX Sandbox. It allows you to easily visualize the results of most of the standard RX operators.

Detecting Running High/Low Prices Using Reactive Framework – Follow-up

January 1st, 2010

This article shows some improvements to the original code presented in my previous post. Improvements are based on the feedback I received from Erik Meijer and his team and mainly focus on some built-in opportunities in Rx framework that I missed. Here is an extract from the original implementation of the running low feed:

// Daily low price feed
double min = double.MaxValue;
var feedLo = feed
    .Where(p => p < min)
    .Do(p => min = Math.Min(min, p))
    .Select(p => "New LO: " + p);

This implementation depends on the external (to the framework) state (current low value stored in “min” local variable), “Do” combinator to continuously update this state with every price tick, and a “Where” combinator to suppress the ticks that do not represent running low value. Turns out, there is a better, more elegant and framework friendly way to implement this. It relies on the built-in “Scan” and “HoldUntilChanged” combinators:

// Daily low price feed
var feedLo = feed
    .Scan(Math.Min)
    .HoldUntilChanged()
    .Select(p => "New LO: " + p);

The code above is functionally equivalent to the original one but is cleaner, more compact, and easier to read. This is how it works:

  • “Scan” operator is used to accumulate the running min value. “Scan” is similar to “Aggregate” in the LINQ-to-Objects world but, instead of collapsing the sequence to a single accumulated value, it keeps outputting the current accumulated value with every tick. As a result “Math.Min” function is continuously applied with every new tick with accumulated min value and new price as arguments and its result is “ticked out”. This produces a stream where every successive number is either less than or equal to the previous one. Close to what we need except we want to suppress duplicates.
  • “HoldUntilChanged” is another built-in combinator that wraps an underlying stream and only repeats a value from it if this value differs from the previous one. Perfect for removing successive duplicates (please, note, it will NOT remove ALL the duplicates, only successive ones, so it is not a proper substitute to Distinct, when such a combinator exists) . We use “HoldUntilChanged” instead of original “Where” clause.

Processing Financial Data with Microsoft Reactive Framework (Reactive Extensions for .NET)

December 20th, 2009
Reactive Framework Labs

I have recently discovered Microsoft Reactive Framework (thanks to spontaneous introduction at New York .Net Meetup Group) and have been playing with it a lot trying to apply it to many financial problems I’ve coded over the last years. I am truly amazed with the technology and the power it unleashes for processing and analyzing streaming financial data (like quotes, market data feeds, trade executions, etc.). Seamless integration with LINQ combined with very powerful support for composition makes self coded CEP in .Net a low hanging fruit. In the following series of articles I am going to demonstrate sample Rx-based implementations for some commonly occurring tasks in finance, including: