Common rxjs patterns
Looking at the questions on stackoverflow.com, there are a few that seem to come up regularly.
How do I turn polling on and off?
Let’s walk through the solution.
Maybe it helps to work from the inside-out. We’ll start with the innermost observable. What kind of operator resembles polling for information? We want to repeatedly do something every n seconds, so this sounds like interval
would be a good fit. Now what would we use if we don’t want to poll? I think the answer to that is an observable that doesn’t emit: EMPTY
.
Now that we have the kernel of the observable described, we need to decide how and when that observable should be subscribed. Every time we get a value to pause (false
) or resume (true
) polling, we want to cancel what we were previously doing and make the decision from the first step all over again. The operator that always uses the newest value from its source, and unsubscribes from the previously created inner observable, is switchMap
.
I’ve seen people sometimes make the mistake of using concatMap
and wondering why nothing was ever output, even though the source observable emitted values. The answer is that the inner observable needs to complete before the next will be subscribed, which in the case of interval
will never happen.
Let’s say there are two input streams: one for resuming/starting polling; and one to pause polling. We need to combine those two streams, but how do we do it? Well, they can emit in any order, and we aren’t waiting for one of them to complete first, so we use merge
.
All that’s left to do is put it all together.
How do I integrate an external API?
You’re confronted with another API (presumably asynchronous) and you want to wrap it in an observable. For this example let’s take async iterators.
We want to create an observable and emit these values in sequence. In this case you can use the existing rxjs operators:
Although there’s another solution that’s much more general, but comes with the caveat that you have to handle propagation of complete, error and unsubscription:
By creating a new observable, you have access to the subscriber and can call next
, complete
and error
. This allows for a lot of flexibility, but is also much harder to get right. By composing the existing rxjs operators, you take advantage of the fact that all this is done for you.
How do I create a recursive function?
I’ve already written a piece about recursion in the Rx.NET library, and most of the points apply here. A lot of the time a solution can be found using the expand
operator. It does have several complications, in that the source observable’s values are propagated, which might be unexpected, and that the level of concurrency is unbounded (meaning it’s impossible to specify that it should run serially). If this is undesirable, it might be possible to use mergeScan
, which specifies a parameter for the maximum number of concurrent requests. The operator is a combination of mergeMap
and scan
.
How do I marble test imperative code, e.g. subjects?
It’s often easier to test functions that accept a source observable as input and return a transformed observable. Not only is it easier, but it also leads to a more flexible design, in that you can compose these observables together.
Sometimes though, you have an API where you have to deal with subscriptions, or updates via method calls. In this case there’s an output observable, but it’s not clear how to interact with the test scheduler so that the expected output can be specified as a marble diagram. If there’s no place to supply a source observable from the test scheduler, what can you do?
The solution is to subscribe to the cold observable provided by the test scheduler, and in the subscription call the necessary API method/call next
on the subject: