Press "Enter" to skip to content

A short introduction to Reactive Programming

Rx_Logo_512Rx (Reactive Extensions) is a library for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators.” – as they say on their site. It was originally developed for .NET and later ported to the JVM (Java, Scala, etc.) and other languages.

Two weeks ago we had a great opportunity at Prezi: to learn some Rx from icon-twitter Eric Meijer himself. It was a training with presentation and workshop for one and a half days. There was no strict target language, many were using Scala or Java, but one could use any language, if there was an Rx library available. I choose JavaScript (node.js), since it’s one of my favorite languages, I use with great confidence and I wanted to concentrate on the new knowledge. It was super exciting, and I want to share the basics with you.

A little detour to algorithmic thinking

Before I go into more detail about Rx, let me show you a bit about what I was thought at the University lately. One of the classes, called Programming, is about algorithmic thinking this year. We learned some patterns like summing, counting, maximum selection and then we applied these to certain problems. In the last weeks we generalized these patterns even further with the use of enumerators. There are famous ones like the sequential input file or the set, but anyone can define their own based on them.

We had an exercise when one had to find the largest sum of two adjacent elements in an n element array.

var x = [21, 47, 34, 61, 88, 19, 34, 57, 78, 59];

First of all, you have to realize, that this cannot be done with any famous enumerator, since you need to have two adjacent elements at the same time from the same source. So you have to define one for yourself. Enumerators have four key functions:

  • First() – places the enumerator pointer to the beginning
  • Current() – retuns the pointed element
  • Next() – steps the pointer forward
  • End() – decides if the enumerator reached its end

So the special enumerator would need these functions defined like this:

  • First():i := 2; e := x[i - 1]; a := x[i]
  • Current():(e, a)
  • Next():i := i + 1; e := a; a := x[i]
  • End():i = n

Now that the special enumerator is defined the specification can be done, like this:

k1
k2
k3

This is an easy maximum selection, and its implementation looks like this:

var i = 2, e = x[i - 1], a = x[i],
    max = e + a, E = [e, a];
while (i < x.length) {
    i++; e = a; a = x[i];
    if (max < a + e) {
        max = a + e; E = [e, a];
    }
}

That’s fine, but what if I want the maximum sum of 4 adjacent elements? That would require 2 more temporary variables, or some kind of stack to hold these items… well, this solution seems to be a bit rigid, isn’t it?

Let’s React!

I started this post with Rx, which is about reactive programming. This means that we have an observable asynchronous event stream and we can subscribe to its events. Well, ok – you might say – that’s nice but how is that gonna help us with the problem above? Let’s start with the basics.

In Rx you can manipulate your event stream what you subscribe for. You can take or drop some of them, filter or even map them to a whole new stream of items, before subscribing. Imagine an pipeline, where you can add all kinds of flow transforming parts along the way, until you reach the tap. Let’s see for example I want the first two even numbers divided by two from our original array:

var obs = Rx.Observable.fromArray(x);
var subs = obs.filter(function(e) { return e % 2 == 0; })
              .map(function(e) { return e / 2; })
              .take(2)
              .subscribe(function(e) { console.log(e); });

What did I do here? I created an observable event stream from the array, then I filtered the even numbers, mapped them with divide by two, took the first two and in the subscriber printed the result.

Pretty cool, huh? Now let’s se how can we handle the original problem. For that, we need to know another cool Rx feature, the buffer. Buffer can collect a series of items (based on count or time), and create a stream of those. So, for example it can group the items in the array into pairs of adjacent elements, but we have to be careful, it won’t pair the last item with anything, so we have to filter for pairs. Then we can map the pairs into their sum, and finally get the largest of them with max. Like this:

var group_size = 2;
var obs = Rx.Observable.fromArray(x);
var subs = obs.bufferWithCount(group_size, 1)
              .filter(function(e) { return e.length == group_size; })
              .map(function(e) { return e.reduce(function(a, b) { return a + b; }, 0); })
              .max()
              .subscribe(function(e) { console.log(e); });

You might’ve noticed that I created a group_size variable and used reduce to sum the numbers in map. What if I change the group_size to 4? Hey, that would solve the problem with four adjacent numbers, right? Awesome! Now that’s what I call flexible!

Ok, I have to admit, working with a static list of numbers is not so exciting, but this was a really simple way to introduce Rx. Where it really shines is real time event processing, like mouse movement, or log parsing. You can harness the tools I introduced and many more, so don’t be afraid and give it a shot!

4 Comments

Leave a Reply

%d bloggers like this: