Recursion in Rx.NET Observables

Nick Lydon
3 min readNov 27, 2019

--

I've had several occasions when composing observables where I've needed the ability to loop, or wait for another event to finish before continuing processing. The naive solution would be to call Thread.Sleep repeatedly until the condition is met, but we don't want to simply block a thread. A more functional solution would be to use recursion somehow - ideally returning an observable that represents the delay. What we desire is:

1. Non-blocking code

2. Retaining the order of the source Observable

3. The ability to unit test it

4. Running without blowing the stack

Unfortunately it wasn't so obvious to me how one does it in Rx.NET using the commonly available operators. There's an extension method called Expand, which takes a function that returns an observable. The function is called not only with the values emitted by the source observable, but also by those emitted by the function itself (hence recursively). There are a couple of issues with this:

1. The values are emitted both from the source observable and the result of the projection in expand. There's no way to provide a seed value or change the type of the emitted value.

2. The values are merged into the output, which means the previous order is lost.

3. There's a difference with the rxjs equivalent - after a certain depth it will overflow the stack. Maybe it's solved by using a different scheduler, but this default behaviour doesn't seem obvious.

I'll list a few ways of doing it in Rx.NET:

Select with a recursive function returning an observable

The most obvious (for me anyway) attempt would be to use Select to project an Observable, and then use Concat to ensure the order:

This looks like it works, but if you try increasing the level of recursion, e.g. y > 100000, you'll see that it produces the dreaded StackOverflowException. Maybe a language with tail call optimisation gets around this issue, but it isn't clear to me how one would write that code with the recursive call in tail position.

Select with async/await

It's possible to avoid blocking by using async/await. There's no need to call a function recursively, as it can be done using an imperative loop:

If you try running this you'll immediately see that there's a mistake and it doesn't seem to be waiting for the previous Task to be finished. This is a good opportunity to highlight an important difference between async/await and reactive extensions:

1. async/await is always hot - the code is executed immediately. Whereas Observables may be cold - execution of the producing function is deferred until subscription.

2. The Task result will be cached. If an Observable is cold then the result will be recalculated each time it's subscribed to.

Here's an updated example with the execution of the async/await code deferred until subscription:

The issue with this code is that it's harder to test. Observables can be easily tested using virtual time if you replace the scheduler with an instance of the TestScheduler class. Unfortunately the Task Parallel Library (TPL) has its own scheduler so you'd have to create workarounds to get them to play together nicely.

Recursion using IScheduler.Schedule

The scheduler interface includes methods to schedule work at some point in the future. Let's look at the signature:

This is similar to a reduce/aggregate function, where we pass the initial seed state and a function to compute the next value. The difference here is that instead of returning the value from the action, we're scheduling a new action with the new state:

In this example we are wrapping these recursive calls in a new observable. Each time we're returning an instance of IDisposable so that previously scheduled work can be cancelled (when unsubscribing prematurely for example).

This method:

1. Does not block. It schedules work to be done at some point in the future

2. Can be ordered like any other observable - merged, concatenated, etc.

3. Accepts an instance of IScheduler so that the TestScheduler can be used and the queue drained synchronously in unit tests

4. Does not blow the stack. The scheduler calls our function in a loop rather than recursively

A lot of the information for this was taken from the undisputed champion of reactive extensions resources Introduction to Rx, this blog post and this msdn forum post

--

--

Nick Lydon
Nick Lydon

Written by Nick Lydon

British software developer working as a freelancer in Berlin. Mainly dotnet, but happy to try new things! https://github.com/NickLydon

No responses yet