Skip to main content

Understanding RxJS Observables and why you need them

rxjsobservables

What is RxJS?

RxJS is a framework for reactive programming that makes use of Observables, making it really easy to write asynchronous code. According to the official documentation, this project is a kind of reactive extension to JavaScript with better performance, better modularity, better debuggable call stacks, while staying mostly backwards compatible, with some breaking changes that reduce the API surface. It is the official library used by Angular to handle reactivity, converting pull operations for call-backs into Observables.

Prerequisites

To be able to follow through in this article’s demonstration you should have:
// run the command in a terminal
ng version
Confirm that you are using version 7, and update to 7 if you are not.
  • Download this tutorial’s starter project here to follow through the demonstrations
  • Unzip the project and initialize the node modules in your terminal with this command
npm install
Other things that will be nice to have are:
  • Working knowledge of the Angular framework at a beginner level

Understanding Observables: pull vs push

To understand Observables, you have to first understand the pull and push context. In JavaScript, there are two systems of communication called push and pull.
pull system is basically a function. A function is usually first defined (a process called production) and then somewhere along the line called (this process is called consumption)to return the data or value in the function. For functions, the producer (which is the definition) does not have any idea of when the data is going to be consumed, so the function call literally pulls the return value or data from the producer.
push system, on the other hand, control rests on the producer, the consumer does not know exactly when the data will get passed to it. A common example is promises in JavaScript, promises (producers) push already resolved value to call-backs (consumers). Another example is RxJS Observables, Observables produces multiple values called a stream (unlike promises that return one value) and pushes them to observers which serve as consumers.

What is a Stream?

A stream is basically a sequence of data values over time, this can range from a simple increment of numbers printed in 6 seconds (0,1,2,3,4,5) or coordinates printed over time, and even the data value of inputs in a form or chat texts passed through web sockets or API responses. These all represent data values that will be collected over time, hence the name stream.

What are Observables?

Streams are important to understand because they are facilitated by RxJS Observables. An Observable is basically a function that can return a stream of values to an observer over time, this can either be synchronously or asynchronously. The data values returned can go from zero to an infinite range of values.

Observers and subscriptions

For Observables to work there needs to be observers and subscriptions. Observables are data source wrappers and then the observer executes some instructions when there is a new value or a change in data values. The Observable is connected to the observer who does the execution through subscription, with a subscribe method the observer connects to the observable to execute a code block.

Observable lifecycle

With some help from observers and subscriptions the Observable instance passes through these four stages throughout its lifetime:
  • Creation
  • Subscription
  • Execution
  • Destruction

Creating Observables

If you followed this post from the start, you must have opened the Angular starter project in VS Code. To create an Observable, you have to first import Observable from RxJS in the .ts file of the component you want to create it in. The creation syntax looks something like this:
import { Observable } from "rxjs";

var observable = Observable.create((observer:any) => {
    observer.next('Hello World!')
})
Open your app.component.ts file and copy the code block below into it:
import { Component, OnInit } from '@angular/core';
import { Observable } from "rxjs/";
@Component({
  selector: 'app-root',
  templateUrl: './app.component.html',
  styleUrls: ['./app.component.css']
})
export class AppComponent implements OnInit{
  title = 'ngcanvas';
  ngOnInit(): void {
    var observable = Observable.create()
  }
  
}

Subscribing to Observables

To tell RxJS to execute the code block on the Observable, or in a simpler term, to call the Observable to begin execution you have to use the subscribe method like this:
export class AppComponent implements OnInit{
  title = 'ngcanvas';
  ngOnInit(): void {
    var observable = Observable.create((observer:any) => {
      observer.next('Hello World!')
  })
  observable.subscribe(function logMessage(message:any) {
    console.log(message);
  })
}
This subscribe method will cause “hello world” to be logged in the console.

Executing Observables

The observer is in charge of executing instructions in the Observable, so each observer that subscribes can deliver three values to the Observable:
  1. Next value: With the next value, observer sends a value that can be a number, a string or an object. There can be more than one next notifications set on a particular Observable
  2. Error value: With the error value, the observer sends a JavaScript exception. If an error is found in the Observable, nothing else can be delivered to the Observable
  3. Complete value: With the complete value, the observer sends no value. This usually signals that the subscriptions for that particular Observable is complete. If the complete value is sent, nothing else can be delivered to the Observable.
This can be illustrated with the code block below:
export class AppComponent implements OnInit{
  title = 'ngcanvas';
  ngOnInit(): void {
    var observable = Observable.create((observer:any) => {
      observer.next('I am number 1')
      observer.next('I am number 2')
      observer.error('I am number 3')
      observer.complete('I am number 4')
      observer.next('I am number 5')
  })
  observable.subscribe(function logMessage(message:any) {
    console.log(message);
  })
}
}
If you run the application at this point in the dev server with
ng serve
When you open up the console in the developer tools your log will look like this:
error in console
You will notice that either the error value or complete value automatically stops execution and so the number 5 never shows up in the console. This is a simple synchronous exercise. To make it asynchronous, let us wrap timers around some of the values.
export class AppComponent implements OnInit{
  title = 'ngcanvas';
  ngOnInit(): void {
    var observable = Observable.create((observer:any) => {
      observer.next('I am number 1')
      observer.next('I am number 2')
      setInterval(() => {
        observer.next('Random Async log message')
    }, 2000)
    observer.next('I am number 3')
    observer.next('I am number 4')
      setInterval(() => {
        observer.error('This is the end')
    }, 6001)
    observer.next('I am number 5')
  })
  observable.subscribe(function logMessage(message:any) {
    console.log(message);
  })
}
}
This will appear like this in your browser console:
console errors
Notice that the display of value was done here asynchronously, with the help of the setInterval module.

Destroying an Observable

To destroy an Observable is to essentially remove it from the DOMby unsubscribing to it. Normally for asynchronous logic, RxJS takes care of unsubscribing and immediately after an error or a complete notification your observable gets unsubscribed. For the knowledge, you can manually trigger unsubscribe with something like this:
return function unsubscribe() {
    clearInterval(observable);
  };

Why Observables are so vital

  • Emitting multiple values asynchronously is very easily handled with Observables
  • Error handlers can also easily be done inside Observables rather than a construct like promises
  • Observables are considered lazy, so in case of no subscription there will be no emission of data values
  • Observables can be resolved multiple times as opposed to functions or even promises

Conclusion

We have been given a thorough introduction to Observables, observers and subscriptions in RxJS. We have also been shown the lifecycle process of Observables with practical illustrations. More RxJS posts can be found on the blog, happy hacking!

Comments

Popular posts from this blog

How to use Ngx-Charts in Angular ?

Charts helps us to visualize large amount of data in an easy to understand and interactive way. This helps businesses to grow more by taking important decisions from the data. For example, e-commerce can have charts or reports for product sales, with various categories like product type, year, etc. In angular, we have various charting libraries to create charts.  Ngx-charts  is one of them. Check out the list of  best angular chart libraries .  In this article, we will see data visualization with ngx-charts and how to use ngx-charts in angular application ? We will see, How to install ngx-charts in angular ? Create a vertical bar chart Create a pie chart, advanced pie chart and pie chart grid Introduction ngx-charts  is an open-source and declarative charting framework for angular2+. It is maintained by  Swimlane . It is using Angular to render and animate the SVG elements with all of its binding and speed goodness and uses d3 for the excellent math functio...

Understand Angular’s forRoot and forChild

  forRoot   /   forChild   is a pattern for singleton services that most of us know from routing. Routing is actually the main use case for it and as it is not commonly used outside of it, I wouldn’t be surprised if most Angular developers haven’t given it a second thought. However, as the official Angular documentation puts it: “Understanding how  forRoot()  works to make sure a service is a singleton will inform your development at a deeper level.” So let’s go. Providers & Injectors Angular comes with a dependency injection (DI) mechanism. When a component depends on a service, you don’t manually create an instance of the service. You  inject  the service and the dependency injection system takes care of providing an instance. import { Component, OnInit } from '@angular/core'; import { TestService } from 'src/app/services/test.service'; @Component({ selector: 'app-test', templateUrl: './test.component.html', styleUrls: ['./test.compon...

How to solve Puppeteer TimeoutError: Navigation timeout of 30000 ms exceeded

During the automation of multiple tasks on my job and personal projects, i decided to move on  Puppeteer  instead of the old school PhantomJS. One of the most usual problems with pages that contain a lot of content, because of the ads, images etc. is the load time, an exception is thrown (specifically the TimeoutError) after a page takes more than 30000ms (30 seconds) to load totally. To solve this problem, you will have 2 options, either to increase this timeout in the configuration or remove it at all. Personally, i prefer to remove the limit as i know that the pages that i work with will end up loading someday. In this article, i'll explain you briefly 2 ways to bypass this limitation. A. Globally on the tab The option that i prefer, as i browse multiple pages in the same tab, is to remove the timeout limit on the tab that i use to browse. For example, to remove the limit you should add: await page . setDefaultNavigationTimeout ( 0 ) ;  COPY SNIPPET The setDefaultNav...