Skip to main content

Comparing the Stream API and (async) generators in Node.js v10


Introduction

stream is an abstraction of data in programming. The Node.js Stream API has been around for a long time and is used as a uniform API for reading and writing asynchronous data. The Stream API is mostly used internally with other APIs like fs and http.
For example, fs.createReadStream is often used for reading a large file. Another example is http.ServerResponse which implements the Stream API so that the server can respond to large data. A stream is mainly used for large data, but conceptually it can represent the infinite length of data.
There is another abstraction of loops called a generator(introduced in ES2015) that is similar to a stream. A generator returns an iterator where you can loop each item and is also capable of representing the infinite length of data. ES2018 introduced async generator/iterator which can handle asynchronous data. An async generator is supported in Node.js v10.
In this post, we will be learning how to implement a synchronous counter with a pull-based stream and generator. We will also be implementing an asynchronous counter with a push-based stream and async generator in order to compare the Stream API and async generators.

Prerequisites

Before continuing, readers will need to have node.js installed and have a basic understanding of streams.

Implement a stream for the synchronous counter

In general, you would just use a stream provided by a library, in other words, you consume a stream. Now, for the purpose of study, we will provide a stream by ourselves. The documentationdescribes how to implement streams. Let us first make an infinite counter as a readable stream. Create a file, name it “stream-sync-counter.js”.
// stream-sync-counter.js

const { Readable, Writable } = require('stream');

const createCounterReader = () => {
  let count = 0;
  return new Readable({
    objectMode: true,
    read() {
      count += 1;
      console.log('reading:', count);
      this.push(count);
    },
  });
};

const counterReader = createCounterReader();
This is a pull-based stream, which means it will read new values if the buffer is below a certain amount. We used “object mode, so the item is just one number.
Now, let’s define a writable stream to consume this counter.
// stream-sync-counter.js (continued)

const logWriter = new Writable({
  objectMode: true,
  write: (chunk, _, done) => {
    console.log('writing:', chunk);
    done();
  },
});
The function logWriter we defined above does nothing except it outputs numbers to the console.
Now, we connect these streams, also known as a “pipe.”
// stream-sync-counter.js (continued)

counterReader.pipe(logWriter);
If you run this code, you will see numbers counting up infinitely.
$ node stream-sync-counter.js
reading: 1
reading: 2
writing: 1
reading: 3
writing: 2
reading: 4
writing: 3
reading: 5
writing: 4
reading: 6
writing: 5
...
One note is that the readable stream reads several items at once to fill its buffer and waits until some items are consumed. The way readable stream works is 1) read items and store them in the buffer, 2) wait until items are consumed, 3) if some items are consumed and the buffer becomes empty (=” below a certain amount”), it goes back to the step 1). To better see how the buffer works, you can put timeouts in your writable stream.
// modify the function in stream-sync-counter.js

const logWriter = new Writable({
  objectMode: true,
  write: (chunk, _, done) => {
    setTimeout(() => {
      console.log('writing:', chunk);
      done();
    }, 100);
  },
});
If you run with this, you would see an interesting output:
$ node stream-sync-counter.js
reading: 1
reading: 2
reading: 3
...
reading: 31
reading: 32
writing: 1
writing: 2
writing: 3
...
writing: 14
writing: 15
writing: 16
reading: 33
reading: 34
reading: 35
...
reading: 46
reading: 47
reading: 48
writing: 17
...

Implement a generator for the synchronous counter

generator is a feature introduced in ES2015. It’s a general abstraction of loops and allows the implementation of a loop as a function. A generator is a special function to return an iterator.
The following is the code to generate an infinite counter. Create a file, name it “generator-sync-counter.js”.
// generator-sync-counter.js

function* counterGenerator() {
  let count = 0;
  while (true) {
    count += 1;
    console.log('reading:', count);
    yield count;
  
}

const counterIterator = counterGenerator();
Now, let’s create a function to run this iterator and output numbers to the console.
// generator-sync-counter.js (continued)

const logIterator = (iterator) => {
  for (const item of iterator) {
    console.log('writing:', item);
  
};
This is just a for-of loop. In ES2015, you can simply loop an iterator with for-of loop. You can simply invoke the function.
// generator-sync-counter.js (continued)

logIterator(counterIterator);
The result will look something like this:
$ node generator-sync-counter.js
reading: 1
writing: 1
reading: 2
writing: 2
reading: 3
writing: 3
reading: 4
writing: 4
reading: 5
writing: 5
reading: 6
writing: 6
This is slightly different from the behavior of streams and is more intuitive because there’s no buffer.
You can also add timeouts like this:
// modify the function in generator-sync-counter.js

const logIterator = async (iterator) => {
  for (const item of iterator) {
    await new Promise(r => setTimeout(r, 100));
    console.log('writing:', item);
  
};
If you run it, you should get the same result.
We’ve basically created a synchronized infinite counter both with a stream and a generator. It works the same as when we consume the counter, but the internal behavior is slightly different because the stream is buffering.

Implement a stream for an asynchronous counter

Next, we will create an asynchronous counter with a stream at first. The asynchronous counter here means it will count up every second. To create such a stream, we use setInterval. Create a file, name it “stream-async-counter.js”.
// stream-async-counter.js

const { Readable, Writable } = require('stream');

const createCounterReader = (delay) => {
  let counter = 0;
  const reader = new Readable({
    objectMode: true,
    read() {},
  });
  setInterval(() => {
    counter += 1;
    console.log('reading:', counter);
    reader.push(counter);
  }, delay);
  return reader;
};

const counterReader = createCounterReader(1000);
This is a so-called push-based stream. As you might guess, it will push data indefinitely into the buffer, unless you consume data faster than pushing.
We use the logWriter without timeouts because items are pushed from the readable stream, which controls timing.
// stream-async-counter.js (continued)

const logWriter = new Writable({
  objectMode: true,
  write: (chunk, _, done) => {
    console.log('writing:', chunk);
    done();
  },
});

counterReader.pipe(logWriter);
If we run this, we should see the following result with delays.
$ node stream-async-counter.js
reading: 1
writing: 1
reading: 2
writing: 2
reading: 3
writing: 3
reading: 4
writing: 4
reading: 5
writing: 5
...
This is slightly different from the result of the pull-based stream because now we consume data before a new item is added to buffer.
To see if the data is pushed regardless of consuming it, you could change the logWriter as follows.
// modify the function in stream-async-counter.js

const logWriter = new Writable({
  objectMode: true,
  write: (chunk, _, done) => {
    setTimeout(() => {
      console.log('writing:', chunk);
      done();
    }, 5 * 1000);
  },
});

Use an async generator for an asynchronous counter

The for-await-of is a new feature in ES2018. It allows handling promises in iterators. Using an async generator, we can define an asynchronous infinite counter similar to the one in the previous section. Create a file named “generator-async-counter.js”:
// generator-async-counter.js

async function* counterGenerator(delay) {
  let counter = 0;
  while (true) {
    await new Promise(r => setTimeout(r, delay));
    counter += 1;
    console.log('reading:', counter);
    yield counter;
  
} 
    
const counterIterator = counterGenerator(1000);
Notice in the code shown above, we use Promise to wait a second.
To loop this iterator, we use the for-await-of statement.
// generator-async-counter.js (continued)

const logIterator = async (iterator) => {
  for await (const item of iterator) {
    console.log('writing:', item);
  
};

logIterator(counterIterator);
The result is just as expected.
$ node generator-async-counter.js
reading: 1
writing: 1
reading: 2
writing: 2
reading: 3
writing: 3
reading: 4
writing: 4
reading: 5
writing: 5
...
Unlike the push-based stream, the async generator only generates a new item upon a pull. To confirm that, you could modify logIterator as follows.
// modify the function in generator-async-counter.js

const logIterator = async (iterator) => {
  for await (const item of iterator) {
    console.log('writing:', item);
    await new Promise(r => setTimeout(r, 5 * 1000));
  
};

Conclusion

In this article, we implemented four infinite counters and saw how streams and generators behave similarly in this example but are fundamentally different. A stream has more control over the data source, whereas there is more control on the loop in a generator. We also saw the behavior difference, a stream has a buffer but a generator generally doesn’t. There are many other differences which we didn’t include in this article. Readers who want to learn more may want to check the documentation.

Comments

Popular posts from this blog

How to use Ngx-Charts in Angular ?

Charts helps us to visualize large amount of data in an easy to understand and interactive way. This helps businesses to grow more by taking important decisions from the data. For example, e-commerce can have charts or reports for product sales, with various categories like product type, year, etc. In angular, we have various charting libraries to create charts.  Ngx-charts  is one of them. Check out the list of  best angular chart libraries .  In this article, we will see data visualization with ngx-charts and how to use ngx-charts in angular application ? We will see, How to install ngx-charts in angular ? Create a vertical bar chart Create a pie chart, advanced pie chart and pie chart grid Introduction ngx-charts  is an open-source and declarative charting framework for angular2+. It is maintained by  Swimlane . It is using Angular to render and animate the SVG elements with all of its binding and speed goodness and uses d3 for the excellent math functio...

Understand Angular’s forRoot and forChild

  forRoot   /   forChild   is a pattern for singleton services that most of us know from routing. Routing is actually the main use case for it and as it is not commonly used outside of it, I wouldn’t be surprised if most Angular developers haven’t given it a second thought. However, as the official Angular documentation puts it: “Understanding how  forRoot()  works to make sure a service is a singleton will inform your development at a deeper level.” So let’s go. Providers & Injectors Angular comes with a dependency injection (DI) mechanism. When a component depends on a service, you don’t manually create an instance of the service. You  inject  the service and the dependency injection system takes care of providing an instance. import { Component, OnInit } from '@angular/core'; import { TestService } from 'src/app/services/test.service'; @Component({ selector: 'app-test', templateUrl: './test.component.html', styleUrls: ['./test.compon...

How to solve Puppeteer TimeoutError: Navigation timeout of 30000 ms exceeded

During the automation of multiple tasks on my job and personal projects, i decided to move on  Puppeteer  instead of the old school PhantomJS. One of the most usual problems with pages that contain a lot of content, because of the ads, images etc. is the load time, an exception is thrown (specifically the TimeoutError) after a page takes more than 30000ms (30 seconds) to load totally. To solve this problem, you will have 2 options, either to increase this timeout in the configuration or remove it at all. Personally, i prefer to remove the limit as i know that the pages that i work with will end up loading someday. In this article, i'll explain you briefly 2 ways to bypass this limitation. A. Globally on the tab The option that i prefer, as i browse multiple pages in the same tab, is to remove the timeout limit on the tab that i use to browse. For example, to remove the limit you should add: await page . setDefaultNavigationTimeout ( 0 ) ;  COPY SNIPPET The setDefaultNav...