Reactive Programming using RxJS

Reactive Programming Paradigm

  • Asynchronous programming
  • Works with Functional Programming
  • ReactiveX works with many languages
    • JS
    • .Net
    • Java

 

More information:

http://reactivex.io/intro.html

 

Mission Statement

ReactiveX observable model allows you to treat streams of asynchronous events with the same sort simple, composable operations that you use for collections of data items like arrays. It frees you from callbacks, and makes code more readable and less prone to bugs.

 

Observables are:

  • Composable
  • Flexible
  • Less Opinionated

 

Reactive Programming

Uses a collection of operators to help filter, select, transform, combine and compose Observables. This allows for efficient execution and composition. Observables work by the producer pushing values to the consumer whenever the values are available. This approach is flexible and can be used synchronously or asynchronously. ReactiveX follows and extends the ‘Gang of Four’ Observer pattern byadding:

  1. Ability for producer to signal consumer that no more data is available (onCompleted)
  2. Ability for producer to signal consumer that error has occurred (onError)

 

Observers and Observables

Create an observable. It is simple, just passes array of numbers. Observables are always a stream of data that is like an array.

In this example we are looping through the numbers array and sending each element with waits in between, giving the stream of data to go over time. At the end, we send a complete signal to the consumer.

 

All Observable Operators are composable. We can see a full list of operators here:

http://reactivex.io/documentation/operators.html

In the example below, the Observable has two operators attached to it from the producer’s end. It will map() and filter(), thereby changing the data before it streams it out.

It is important to note that we should only import specific libraries that are used. In the examples above we are importing the whole RxJS library, though only using a couple of modules and Observable class from it. We should not do this.

 

Working with Observables

In the example below, the Observerable is tied to the DOM and tracks the mousemove event. Notice that the producer is also applying operators. The subscriber code is identical to any other subscriber in that it catches success, errors and complete signals.

 

Terminology:

React to the Observable by adding code on the observer (subscribe method).

In this example we are using Observable for an XmlHttpRequest (XHR).

Use a flatMap to process inner observables (having a subscribe method within a subscribe).

Use retryWhen for Retry Logic. This is an operator that can be put on an observable. Example below is retrying 3 times:

Another operator we could use is the retryWhen. This takes a function so that we can customize when the retry needs to happen. This is shown in the example below as retryStrategy. The delay operator defines when we should do the next retry and the scan() operator counts the number retries so that we stop after defined amount. The takeWhile() operator will take the accumulator (counter) and will call the observer’s complete method once the condition is met.

 

Fetch

New standard, returns a promise. Can be used for GET, PUT, DELETE, POST

https://fetch.spec.whatwg.org/

Fetch Polyfill for older browsers

https://github.com/github/fetch

 

Working with Observable Data

Change your way of thinking. Data is coming back streaming, not single response like promises.

Tip for what operators to use – go to site below and try to follow the site tips. Can find by category or action types:

http://reactivex.io/documentation/operators.html

 

Error Handling

Doing a try catch in Observables is not possible because the events occur after that section of code has been processed. So we need to use the “catch” and “retry” operators that are part of Observable.

Note that when using the observer’s error, it stops the process / flow of the data stream. Below at line 7 the data stream stops and we never see value 3 passed. However the error handler in line 15 catches the error and the data process flow stops gracefully.

 

This is another way to do error handler. Note how “throw” is being done in the Observable using it’s operator.

In the next example below, we use the “onErrorResumeNext” which ignores all errors

tbd