RxJS: Observables, observers and operators introduction May 17, 2017 8 mins read Edit post

  • When the input’s event listener fires, the Observable passes the value to the observer.
  • When an Observable produces values, it then informs the observer, calling when a new value was successfully captured and when an error occurs.
  • As we’ve mentioned, Observables can be chained, which means we can do something like this:

    Here are the steps of this sequence:

    Quite a lot happening, and if you’re a little unsure, remember:

    Each time a new Observable is returned, a new observer is hooked up to the previous Observable, thus allowing us to pass values along a “stream” of observers that simply do something you’ve asked and call when it’s done, passing it to the next observer.

  • As we’ve setup our Observable function, we can now invoke our observer, passing in as a value and subscribe to it:

    We subscribe to the Observable instance, and pass our observer (object literal) into the constructor (which is then assigned to ).

  • That’s all we actually needed to create the basis of our Observable, the next piece we need is a method on the Observable:

    We’re going to use our Observable just like in RxJS:

    Which means we need to return a new Observable and pass a function in as the argument:

    This then passes our function to our in the constructor.

Table of contents

@toddmotto: 😎 New to Rx? Start here:
Observables, observers and operators introduction:

RxJS is an incredible tool for reactive programming, and today we’re going to dive a little deeper into what Observables and observers are – as well as learn how to create our own operators.

If you’ve used RxJS before and want to understand some of the inner workings and internals to “how” Observables work, as well as the operators, then this post is for you.

method on it.

event in the DOM. It could even be something more complex such as communication over HTTP.

To better understand Observables, we’re going to write our own! But first, let’s take a look at an example with a subscription:

in the console).

When the input’s event listener fires, the Observable passes the value to the observer.

What is an observer?

(subscribe will invoke our Observable).

when an error occurs.

on our observer, or we (as the “consumers”) decide we are no longer interested in the values and we unsubscribe.

block, the value is passed (or can be passed) through a chain of Observables, which is typically done via “operators”. This chain is what we call an Observable sequence. Each operator returns a new Observable to continue our sequence – also known as a “stream”.

What is an operator?

As we’ve mentioned, Observables can be chained, which means we can do something like this:

Here are the steps of this sequence:

Quite a lot happening, and if you’re a little unsure, remember:

when it’s done, passing it to the next observer.

In short, an operator typically returns a new Observable each time – allowing us to continue our stream. As users we don’t need to worry about all the Observables and observers which are created and used behind scenes, we only use one per chain – our subscription.

So, let’s get started and write our own Observable implementation. It won’t be as advanced as Rx’s implementation, but we’ll hopefully build the picture enough.

function as its only argument. We’ll store the subscribe property on the instance of Observable, so that we can call it later with an observer:

will be invoked either by us or another Observable. This will make more sense as we continue.

Before we dive into our real world example, let’s give a basic one.

as a value and subscribe to it:

method on the Observable:

We’re going to use our Observable just like in RxJS:

Which means we need to return a new Observable and pass a function in as the argument:

in the constructor. Next up, we need to hook our event in:

argument, and where does it come from?

on.

on our observer with the updated value.

, let’s fix that:

As we know, Observables need a “tear down” function which is called when the Observable is destroyed, in our case we’ll remove the event:

because this Observable is dealing with DOM APIs and events, so technically they’re infinitely available.

Let’s try it out! Here’s the full code of what we’ve done:

Live example (type, then watch):

object, we’ll add a new prototype method:

in JavaScript but for any value:

So we need to take the callback function and invoke it, which in turn will return our desired data. Before we can do this, we need the latest value in the stream.

operator. Because it’s on the prototype we can do exactly that:

Ready for more funk? Now we subscribe inside a returned Observable:

passed through map:

Now we can chain it!

object like before? You’ve successfully created an Observable stream.

Try it again:

RxJS: Observables, observers and operators introduction May 17, 2017 8 mins read Edit post

You might also like More from author

Comments are closed, but trackbacks and pingbacks are open.