Skip to main content

Comparing the Stream API and (async) generators in Node.js v10


Introduction

stream is an abstraction of data in programming. The Node.js Stream API has been around for a long time and is used as a uniform API for reading and writing asynchronous data. The Stream API is mostly used internally with other APIs like fs and http.
For example, fs.createReadStream is often used for reading a large file. Another example is http.ServerResponse which implements the Stream API so that the server can respond to large data. A stream is mainly used for large data, but conceptually it can represent the infinite length of data.
There is another abstraction of loops called a generator(introduced in ES2015) that is similar to a stream. A generator returns an iterator where you can loop each item and is also capable of representing the infinite length of data. ES2018 introduced async generator/iterator which can handle asynchronous data. An async generator is supported in Node.js v10.
In this post, we will be learning how to implement a synchronous counter with a pull-based stream and generator. We will also be implementing an asynchronous counter with a push-based stream and async generator in order to compare the Stream API and async generators.

Prerequisites

Before continuing, readers will need to have node.js installed and have a basic understanding of streams.

Implement a stream for the synchronous counter

In general, you would just use a stream provided by a library, in other words, you consume a stream. Now, for the purpose of study, we will provide a stream by ourselves. The documentationdescribes how to implement streams. Let us first make an infinite counter as a readable stream. Create a file, name it “stream-sync-counter.js”.
// stream-sync-counter.js

const { Readable, Writable } = require('stream');

const createCounterReader = () => {
  let count = 0;
  return new Readable({
    objectMode: true,
    read() {
      count += 1;
      console.log('reading:', count);
      this.push(count);
    },
  });
};

const counterReader = createCounterReader();
This is a pull-based stream, which means it will read new values if the buffer is below a certain amount. We used “object mode, so the item is just one number.
Now, let’s define a writable stream to consume this counter.
// stream-sync-counter.js (continued)

const logWriter = new Writable({
  objectMode: true,
  write: (chunk, _, done) => {
    console.log('writing:', chunk);
    done();
  },
});
The function logWriter we defined above does nothing except it outputs numbers to the console.
Now, we connect these streams, also known as a “pipe.”
// stream-sync-counter.js (continued)

counterReader.pipe(logWriter);
If you run this code, you will see numbers counting up infinitely.
$ node stream-sync-counter.js
reading: 1
reading: 2
writing: 1
reading: 3
writing: 2
reading: 4
writing: 3
reading: 5
writing: 4
reading: 6
writing: 5
...
One note is that the readable stream reads several items at once to fill its buffer and waits until some items are consumed. The way readable stream works is 1) read items and store them in the buffer, 2) wait until items are consumed, 3) if some items are consumed and the buffer becomes empty (=” below a certain amount”), it goes back to the step 1). To better see how the buffer works, you can put timeouts in your writable stream.
// modify the function in stream-sync-counter.js

const logWriter = new Writable({
  objectMode: true,
  write: (chunk, _, done) => {
    setTimeout(() => {
      console.log('writing:', chunk);
      done();
    }, 100);
  },
});
If you run with this, you would see an interesting output:
$ node stream-sync-counter.js
reading: 1
reading: 2
reading: 3
...
reading: 31
reading: 32
writing: 1
writing: 2
writing: 3
...
writing: 14
writing: 15
writing: 16
reading: 33
reading: 34
reading: 35
...
reading: 46
reading: 47
reading: 48
writing: 17
...

Implement a generator for the synchronous counter

generator is a feature introduced in ES2015. It’s a general abstraction of loops and allows the implementation of a loop as a function. A generator is a special function to return an iterator.
The following is the code to generate an infinite counter. Create a file, name it “generator-sync-counter.js”.
// generator-sync-counter.js

function* counterGenerator() {
  let count = 0;
  while (true) {
    count += 1;
    console.log('reading:', count);
    yield count;
  
}

const counterIterator = counterGenerator();
Now, let’s create a function to run this iterator and output numbers to the console.
// generator-sync-counter.js (continued)

const logIterator = (iterator) => {
  for (const item of iterator) {
    console.log('writing:', item);
  
};
This is just a for-of loop. In ES2015, you can simply loop an iterator with for-of loop. You can simply invoke the function.
// generator-sync-counter.js (continued)

logIterator(counterIterator);
The result will look something like this:
$ node generator-sync-counter.js
reading: 1
writing: 1
reading: 2
writing: 2
reading: 3
writing: 3
reading: 4
writing: 4
reading: 5
writing: 5
reading: 6
writing: 6
This is slightly different from the behavior of streams and is more intuitive because there’s no buffer.
You can also add timeouts like this:
// modify the function in generator-sync-counter.js

const logIterator = async (iterator) => {
  for (const item of iterator) {
    await new Promise(r => setTimeout(r, 100));
    console.log('writing:', item);
  
};
If you run it, you should get the same result.
We’ve basically created a synchronized infinite counter both with a stream and a generator. It works the same as when we consume the counter, but the internal behavior is slightly different because the stream is buffering.

Implement a stream for an asynchronous counter

Next, we will create an asynchronous counter with a stream at first. The asynchronous counter here means it will count up every second. To create such a stream, we use setInterval. Create a file, name it “stream-async-counter.js”.
// stream-async-counter.js

const { Readable, Writable } = require('stream');

const createCounterReader = (delay) => {
  let counter = 0;
  const reader = new Readable({
    objectMode: true,
    read() {},
  });
  setInterval(() => {
    counter += 1;
    console.log('reading:', counter);
    reader.push(counter);
  }, delay);
  return reader;
};

const counterReader = createCounterReader(1000);
This is a so-called push-based stream. As you might guess, it will push data indefinitely into the buffer, unless you consume data faster than pushing.
We use the logWriter without timeouts because items are pushed from the readable stream, which controls timing.
// stream-async-counter.js (continued)

const logWriter = new Writable({
  objectMode: true,
  write: (chunk, _, done) => {
    console.log('writing:', chunk);
    done();
  },
});

counterReader.pipe(logWriter);
If we run this, we should see the following result with delays.
$ node stream-async-counter.js
reading: 1
writing: 1
reading: 2
writing: 2
reading: 3
writing: 3
reading: 4
writing: 4
reading: 5
writing: 5
...
This is slightly different from the result of the pull-based stream because now we consume data before a new item is added to buffer.
To see if the data is pushed regardless of consuming it, you could change the logWriter as follows.
// modify the function in stream-async-counter.js

const logWriter = new Writable({
  objectMode: true,
  write: (chunk, _, done) => {
    setTimeout(() => {
      console.log('writing:', chunk);
      done();
    }, 5 * 1000);
  },
});

Use an async generator for an asynchronous counter

The for-await-of is a new feature in ES2018. It allows handling promises in iterators. Using an async generator, we can define an asynchronous infinite counter similar to the one in the previous section. Create a file named “generator-async-counter.js”:
// generator-async-counter.js

async function* counterGenerator(delay) {
  let counter = 0;
  while (true) {
    await new Promise(r => setTimeout(r, delay));
    counter += 1;
    console.log('reading:', counter);
    yield counter;
  
} 
    
const counterIterator = counterGenerator(1000);
Notice in the code shown above, we use Promise to wait a second.
To loop this iterator, we use the for-await-of statement.
// generator-async-counter.js (continued)

const logIterator = async (iterator) => {
  for await (const item of iterator) {
    console.log('writing:', item);
  
};

logIterator(counterIterator);
The result is just as expected.
$ node generator-async-counter.js
reading: 1
writing: 1
reading: 2
writing: 2
reading: 3
writing: 3
reading: 4
writing: 4
reading: 5
writing: 5
...
Unlike the push-based stream, the async generator only generates a new item upon a pull. To confirm that, you could modify logIterator as follows.
// modify the function in generator-async-counter.js

const logIterator = async (iterator) => {
  for await (const item of iterator) {
    console.log('writing:', item);
    await new Promise(r => setTimeout(r, 5 * 1000));
  
};

Conclusion

In this article, we implemented four infinite counters and saw how streams and generators behave similarly in this example but are fundamentally different. A stream has more control over the data source, whereas there is more control on the loop in a generator. We also saw the behavior difference, a stream has a buffer but a generator generally doesn’t. There are many other differences which we didn’t include in this article. Readers who want to learn more may want to check the documentation.

Comments

Popular posts from this blog

4 Ways to Communicate Across Browser Tabs in Realtime

1. Local Storage Events You might have already used LocalStorage, which is accessible across Tabs within the same application origin. But do you know that it also supports events? You can use this feature to communicate across Browser Tabs, where other Tabs will receive the event once the storage is updated. For example, let’s say in one Tab, we execute the following JavaScript code. window.localStorage.setItem("loggedIn", "true"); The other Tabs which listen to the event will receive it, as shown below. window.addEventListener('storage', (event) => { if (event.storageArea != localStorage) return; if (event.key === 'loggedIn') { // Do something with event.newValue } }); 2. Broadcast Channel API The Broadcast Channel API allows communication between Tabs, Windows, Frames, Iframes, and  Web Workers . One Tab can create and post to a channel as follows. const channel = new BroadcastChannel('app-data'); channel.postMessage(data); And oth...

Certbot SSL configuration in ubuntu

  Introduction Let’s Encrypt is a Certificate Authority (CA) that provides an easy way to obtain and install free  TLS/SSL certificates , thereby enabling encrypted HTTPS on web servers. It simplifies the process by providing a software client, Certbot, that attempts to automate most (if not all) of the required steps. Currently, the entire process of obtaining and installing a certificate is fully automated on both Apache and Nginx. In this tutorial, you will use Certbot to obtain a free SSL certificate for Apache on Ubuntu 18.04 and set up your certificate to renew automatically. This tutorial will use a separate Apache virtual host file instead of the default configuration file.  We recommend  creating new Apache virtual host files for each domain because it helps to avoid common mistakes and maintains the default files as a fallback configuration. Prerequisites To follow this tutorial, you will need: One Ubuntu 18.04 server set up by following this  initial ...

Working with Node.js streams

  Introduction Streams are one of the major features that most Node.js applications rely on, especially when handling HTTP requests, reading/writing files, and making socket communications. Streams are very predictable since we can always expect data, error, and end events when using streams. This article will teach Node developers how to use streams to efficiently handle large amounts of data. This is a typical real-world challenge faced by Node developers when they have to deal with a large data source, and it may not be feasible to process this data all at once. This article will cover the following topics: Types of streams When to adopt Node.js streams Batching Composing streams in Node.js Transforming data with transform streams Piping streams Error handling Node.js streams Types of streams The following are four main types of streams in Node.js: Readable streams: The readable stream is responsible for reading data from a source file Writable streams: The writable stream is re...