Intro to Rx

May 18th, 2010 / LeeCampbell

Microsoft is currently working on a new library (or possibly an addition to the Framework) for building “reactive” applications. It’s full name is Reactive Extensions for .NET but is generally referred to as just “Rx”. Essentially Rx is built upon the foundations of the Observer pattern. .NET already exposes some other ways to implement the Observer pattern such as Multicast delegates or Events. Multicast delegates (which Events are) however can be cumbersome to use, have a nasty interface and are difficult to compose and can not be queried. Rx looks to solve these problems.

Here I will introduce you to the building blocks and some basic types that make up Rx.

IObservable<T>

IObservable<T> is one of the 2 core interfaces for working with Rx. It is a simple interface with just a Subscribe method. Microsoft are so confident that this interface will be of use to you it has been included in the BCL as of version 4.0 of .NET. You should be able to think of anything that implements IObservable<T> as a Stream of T objects. So if a method returned an IObservable<Price> I could think of it as a stream of Prices.

IObserver<T>

IObserver<T> is the other one of the 2 core interfaces for working with Rx. It too has made it into the BCL as of .NET 4.0. Don’t worry if you are not on .NET 4.0 yet as the Rx team have included these 2 interfaces in a separate assembly for .NET 3.5 users. IObserver<T> is meant to be the “functional dual of IEnumerable<T>”. If you want to know what that last statement meant then enjoy the hours of videos on Channel9 where they discuss the mathematical purity of the types. For everyone else it means that where an IEnumerable<T> can effectively yield 3 things (the next value, an exception or the end of the sequence), so too can IObservable<T> via IObserver<T>’s 3 methods OnNext(T), OnError(Exception) and OnCompleted().

Interesting while you will be exposed to the IObservable<T> interface a lot if you work with Rx, I find I don’t often need to concern myself with IObserver<T>. Another interesting thing I have found with Rx is that I never actually implement these interfaces myself, Rx provides all of the implementations I need out of the box. Lets have a look at the simple ones.

Subject<T>

If you were to create your own implementation of IObservable<T> you may find that you need to expose method to publish items to the subscribers, throw errors and notify when the stream is complete. Hmmm they all sound like the methods on the IObserver<T> interface. While it may seem odd to have one type implementing both interfaces, it does make life easy. This is what Subject<T> does for you. Effectively you can expose your Subject<T> behind a method that returns IObservable<T> but internally you can use the OnNext, OnError and OnCompleted methods to control the stream.

In this (awfully basic) example, I create a subject, subscribe to that subject and then publish to the stream.

using System;
using System.Collections.Generic;

namespace RxConsole
{
    class Program
    {
        static void Main(string[] args)
        {
            var subject = new Subject<string>();

            WriteStreamToConsole(subject);

            subject.OnNext("a");
            subject.OnNext("b");
            subject.OnNext("c");
            Console.ReadKey();
        }

        private static void WriteStreamToConsole(IObservable<string> stream)
        {
            stream.Subscribe(Console.WriteLine);
        }
    }
}

Note that the WriteStreamToConsole method takes an IObservable<string> as it only wants access to the subscribe method. Hang on, doesn’t the Subscribe method need an IObserver<string>? Surely Console.Writeline does not match that interface. Well not it doesn’t but the Rx team supply me with an Extension method to IObservable<T> that just takes an Action<T>. The action will be executed every time an item is published. There are other overloads to the Subscribe extension method that allows you to pass combinations of delegates to be invoke for OnNext, OnCompleted and OnError. This effectively means I don’t need to implement IObserver<T>. Cool.

As you can see, Subject<T> could be quite useful for getting started in Rx programming. Subject<T> is a basic implementation however. There are 3 siblings to Subject<T> that offer subtly different implementations which can drastically change the way your program runs.

ReplaySubject<T>

ReplaySubject<T> will remember all publications to it so that any subscriptions that happen after publications have been made, will still get all of the publications. Consider this example where we have moved our first publication to occur before our subscription

static void Main(string[] args)
{
    var subject = new Subject<string>();

    subject.OnNext("a");
    WriteStreamToConsole(subject);

    subject.OnNext("b");
    subject.OnNext("c");
    Console.ReadKey();
}

The result of this would be that “b” and “c” would be written to the console, but “a” ignored. If we were to make the minor change to make subject a ReplaySubject<T> we would see all publications again.

static void Main(string[] args)
{
    var subject = new ReplaySubject<string>();

    subject.OnNext("a");
    WriteStreamToConsole(subject);

    subject.OnNext("b");
    subject.OnNext("c");
    Console.ReadKey();
}

This can be very handy for eliminating race conditions.

BehaviorSubject<T>

BehaviourSubject<T> is similar to ReplaySubject<T> except it only remembers the last publication. BehaviorSubject<T> also requires you to provide it a default value of T. This means that all subscribers will receive a value immediately (unless it is already completed).

In this example the value “a” is written to the console.

static void Main(string[] args)
{
    var subject = new BehaviorSubject<string>("a");
    WriteStreamToConsole(subject);
    Console.ReadKey();
}

In this example the value “b” is written to the console, but not “a”.

static void Main(string[] args)
{
    var subject = new BehaviorSubject<string>("a");
    subject.OnNext("b");
    WriteStreamToConsole(subject);
    Console.ReadKey();
}

In this example the values “b”, “c” & “d” are all written to the console, but again not “a”

static void Main(string[] args)
{
    var subject = new BehaviorSubject<string>("a");

    subject.OnNext("b");
    WriteStreamToConsole(subject);
    subject.OnNext("c");
    subject.OnNext("d");
    Console.ReadKey();
}

Finally in this example, no values will be published as the stream has completed. Nothing is written to the console.

static void Main(string[] args)
{
    var subject = new BehaviorSubject<string>("a");

    subject.OnNext("b");
    subject.OnNext("c");
    subject.OnCompleted();
    WriteStreamToConsole(subject);

    Console.ReadKey();
}

AsyncSubject<T>

AsyncSubject<T> is similar to the Replay and Behavior subjects, however it will only store the last value, and only publish it when the stream is completed.

In this example no values will be published so no values will be written to the console.

static void Main(string[] args)
{
    var subject = new AsyncSubject<string>();

    subject.OnNext("a");
    WriteStreamToConsole(subject);
    subject.OnNext("b");
    subject.OnNext("c");
    Console.ReadKey();
}

In this example we invoke the OnCompleted method and the value “c” is published and therefore written to the console.

static void Main(string[] args)
{
    var subject = new AsyncSubject<string>();

    subject.OnNext("a");
    WriteStreamToConsole(subject);
    subject.OnNext("b");
    subject.OnNext("c");
    subject.OnCompleted();
    Console.ReadKey();
}

 

So that is the very basics of Rx. With only that under you belt it may be hard to understand why Rx is a topic of interest. To follow on from this post I will discuss further fundamentals to Rx

  1. Extension methods
  2. Scheduling / Multithreading
  3. LINQ syntax

Once we have covered these it should allow you to really get Rx working for you to produce some tasty Reactive applications. Hopefully after we have covered these background topics we can knock up some Samples where Rx can really help you in your day to day coding.

Comments are closed.