Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Latest commit

 

History

History
History

Folders and files

NameName
Last commit message
Last commit date

parent directory

..
 
 
 
 
 
 

README.md

Outline

Reactive Programming With Angular


Background

  • Reactive programming is programming with asynchronous data streams
  • JavaScript is asynchronous by design
    • HTTP requests
    • Timeouts
    • UI events (clicks, key presses, etc.)
    • How to handle all this?

Callbacks

  • Traditionally solved by registering callback functions to be executed upon completion of task
  • E.g. setTimeout that calls callback function once given amount of milliseconds has passed
window.setTimeout(() => {
	// Executed after one second
}, 1000);

Problem: Messy Code

  • Using callbacks quickly leads to messy code with multiple nested functions that is hard to follow and rationalize
getData((x) => {
    getMoreData(x, (y) => {
        getMoreData(y, (z) => {
            ...
        });
    });
});
  • For more information google for callback hell

Solution: Promises

  • Promise is a promise of providing a value later
  • Promise constructor takes single argument that is a function with two parameters:
    • resolve: function to be called when we want to indicate success
    • reject: function to be called when we want to indicate failure
new Promise((resolve, reject) => {
  if (...) resolve(x);
  else if (...) resolve(y);
  else reject();
});
  • Both functions allow arguments that are provided for promise consumer

Promises are Resolved or Rejected

  • Promises are consumed by calling then on them. then takes two arguments: success and failure handler
somethingReturningPromise().then(
  (value) => { // Resolved
    // Handle success case
  },
  (value) => { // Rejected
    // Handle reject case, e.g. show an error note
  });

Promise Chaining

  • Promises can be "chained" by calling then multiple times in a row
  • Each .then() will change the value of the promise by returning a new value
  • If the value returned it is a promise, it will be waited for
fetch('/users') // Make the HTTP request
  .then(response => response.json()) // .json() will return a promise
  .then(json => json.users) // Map the result to contain only the "users" field
  .then(users => alert('Found ' + users.length + ' users')); // Show an alert with the users

Problem: Stream Handling and Disposability

  • Promises don't work for streams, they are just to subscribe for single events
  • Promises can't be cancelled

Solution: Observables

  • Generalization of promises for streams
  • A way for representing asynchronous event streams
    • e.g. mouse clicks, WebSocket streams
  • Can also be used for single events e.g. HTTP requests

RxJS

  • ReactiveX is a library for representing asynchronous event streams with Observables and modifying them with various stream operations
  • RxJS is a ReactiveX implementation for JavaScript
  • Angular integrates with RxJS 6

Idea:

"In ReactiveX an observer subscribes to an Observable."

  • You subscribe to stream of events so that your handler gets invoked every time there is a new item
observable.subscribe(item => doSomething(item));

Streams can be manipulated with traditional array conversion functions such as map and filter

  observable
    .filter(node => node.children.length > 2)
    .map(node => node.name);

You can merge, concat and do other operations on streams to produce new streams from the existing ones

const resultStream = stream1.merge(stream2);

Observables can also be created from e.g. objects, maps and arrays

Rx.Observable.of(42);
Rx.Observable.from([1,2,3,4]);
Rx.Observable.range(1,10);

Subscribing

  • Subscribe method takes three functions as arguments:

    • onNext: called when a new item is emitted
    • onError: called if observable sequence fails
    • onComplete: called when sequence is complete
    observable.subscribe(
        next => doSomething(next), // onNext
        error => handleError(error), // onError
        () => done() // onComplete
    );

Unsubscribing (cancelling)

  • Observable sequence subscriptions can be unsubscribed
    • E.g. observable that produces events that are saved into the memory
      const eventSubscription = eventStream.subscribe(
            event => this.events.push(event)
      );
    • Sequence will not stop until unsubscribed
      eventSubscription.unsubscribe();

Demo


Catching Errors

  • Observables die on errors
  • The way to survive from errors is by catching them and returning a new observable sequence
    observable.catch((error) => {
        console.log(error);
        return Rx.Observable.of([1, 2, 3]);
    };

Hot vs. Cold Observables

  • Cold observables start running upon subscription
    • E.g. http request
  • Hot observables are already producing values before the subscription is active
    • E.g. mouse move events

Observables in Angular

  • Observables used exclusively instead of promises
    • E.g. HTTP requests only result in single event (one response) but they are modeled as observables
        this.httpClient.get('url/restapi/resource') // Returns observable
            .subscribe(
                data => { this.data = data}, // Success
                err => console.error(err), // Failure
                () => console.log('done') // Done
            );

Observables in Angular

  • Changes in route parameters are propagated through an observable sequence
  constructor(route: ActivatedRoute) {
      route.params.subscribe(params => this.index = +params['index']);
  }

Exercises

Open exercise instructions


RxJS 5.5

  • RxJS 5.5.0 introduced major change to RxJS called pipeable operators
  • Importing is a mess with 5.5 but RxJS 6 will fix it
  • Read more

Pipeable Operators Example

Pre RxJS 5.5

import { Observable } from 'rxjs/Observable';
import 'rxjs/add/observable/range';
import 'rxjs/add/operator/filter';
import 'rxjs/add/operator/map';

const source$ = Observable.range(0, 10);
source$
  .filter(x => x % 2 === 0)
  .map(x => x + x)
  .subscribe(x => console.log(x))

RxJS 5.5 ->

import { range } from 'rxjs/observable/range';
import { map } from 'rxjs/operators/map';
import { filter } from 'rxjs/operators/filter';

const source$ = range(0, 10);
source$.pipe(
  filter(x => x % 2 === 0),
  map(x => x + x)
).subscribe(x => console.log(x))

RxJS 5.5 Renaming

Some of the operators are reserved words in JavaScript:

  • do -> tap
  • catch -> catchError
  • switch -> switchAll
  • finally -> finalize

RxJS 5.5 Pros

  • No more "prototype patching" -> Tree-shaking possible -> Smaller bundle sizes
  • Custom operators are easier to make
  • Better tooling support by linters and compilers

RxJS 6

  • Released 04/2018
  • Major changes:
  • Simpler imports (import { map, filter } from 'rxjs/operators')
  • Errors thrown asynchronously
  • Deprecations
  • New operator (throwIfEmpty)
  • Provides compatibility library (rxjs-compat) to support the migration from 5 to 6
  • See Ben Lesh's (RxJS 5 and 6 author) presentation in ngConf 2018 for more details (slides, video)
  • rxjs-dev
Morty Proxy This is a proxified and sanitized view of the page, visit original site.