Skip to main content

Working with Node.js streams

 

Introduction

Streams are one of the major features that most Node.js applications rely on, especially when handling HTTP requests, reading/writing files, and making socket communications. Streams are very predictable since we can always expect data, error, and end events when using streams.

This article will teach Node developers how to use streams to efficiently handle large amounts of data. This is a typical real-world challenge faced by Node developers when they have to deal with a large data source, and it may not be feasible to process this data all at once.

This article will cover the following topics:

  • Types of streams
  • When to adopt Node.js streams
  • Batching
  • Composing streams in Node.js
  • Transforming data with transform streams
  • Piping streams
  • Error handling Node.js streams

Types of streams

The following are four main types of streams in Node.js:

  • Readable streams: The readable stream is responsible for reading data from a source file
  • Writable streams: The writable stream is responsible for writing data in specific formats to files
  • Duplex streams: Duplex streams are streams that implement both readable and writable stream interfaces
  • Transform streams: The transform stream is a type of duplex stream that reads data, transforms the data, and then writes the transformed data in a specified format

When to use Node.js streams

Streams come in handy when we are working with files that are too large to read into memory and process as a whole.

For example, consider Node.js streams a good choice if you are working on a video conference/streaming application that would require the transfer of data in smaller chunks to enable high-volume web streaming while avoiding network latency.

The batching process

Batching is a common pattern for data optimization which involves the collection of data in chunks, storing these data in memory, and writing them to disk once all the data are stored in memory.

Let’s take a look at a typical batching process:

const fs = require("fs");
const https = require("https");
const url = "some file url";
https.get(url, (res) => {
  const chunks = [];
  res
    .on("data", (data) => chunks.push(data))
    .on("end", () =>
      fs.writeFile("file.txt", Buffer.concat(chunks), (err) => {
        err ? console.error(err) : console.log("saved successfully!");
      })
    );
});

Here, all of the data is pushed into an array. When the data event is triggered and once the “end” event is triggered, indicating that we are done receiving the data, we proceed to write the data to a file using the fs.writeFile and Buffer.concat methods.

The major downside with batching is insufficient memory allocation because all the data is stored in memory before writing to disk.

Writing data as we receive it is a more efficient approach to handling large files. This is where streams come in handy.

Composing streams in Node.js

The Node.js fs module exposes some of the native Node Stream API, which can be used to compose streams.

Composing writable streams

const fs = require("fs");
const fileStream = fs.createWriteStream('./file.txt')
for (let i = 0; i <= 20000; i++) {
  fileStream.write("Hello world welcome to Node.js\n"
  );
}

A writeable stream is created using the createWriteStream() method, which requires the path of the file to write to as a parameter.
Running the above snippet will create a file named file.txt in your current directory with 20,000 lines of Hello world welcome to Node.js in it.

Composing readable streams

const fs = require("fs");
const fileStream = fs.createReadStream("./file.txt");
fileStream
  .on("data", (data) => {
    console.log("Read data:", data.toString());
  })
  .on("end", () => { console.log("No more data."); });

Here, the data event handler will execute each time a chunk of data has been read, while the end event handler will execute once there is no more data.
Running the above snippet will log 20,000 lines of the Hello world welcome to Node.js string from ./file.txt to the console.

Composing transform streams

Transform streams have both readable and writable features. It allows the processing of input data followed by outputting data in the processed format.

To create a transform stream, we need to import the Transform class from the Node.js stream module. The transform stream constructor accepts a function containing the data processing/transformation logic:

const fs = require("fs");
const { Transform } = require("stream");
const fileStream= fs.createReadStream("./file.txt");
const transformedData= fs.createWriteStream("./transformedData.txt");

const uppercase = new Transform({
  transform(chunk, encoding, callback) {
    callback(null, chunk.toString().toUpperCase());
  },
});

fileStream.pipe(uppercase).pipe(transformedData);

Here, we create a new transform stream containing a function that expects three arguments: the first being the chunk of data, the second is encoding (which comes in handy if the chunk is a string), followed by a callback which gets called with the transformed results.

Running the above snippet will transform all the text in ./file.txt to uppercase then write it to transformedData.txt.
If we run this script and we open the resulting file, we’ll see that all the text has been transformed to uppercase.

Piping streams

Piping streams is a vital technique used to connect multiple streams together. It comes in handy when we need to break down complex processing into smaller tasks and execute them sequentially. Node.js provides a native pipe method for this purpose:

fileStream.pipe(uppercase).pipe(transformedData);

Refer to the code snippet under Composing transform streams for more detail on the above snippet.

Error handling Node.js streams

Error handling using pipelines

Node 10 introduced the Pipeline API to enhance error handling with Node.js streams. The pipeline method accepts any number of streams followed by a callback function that handles any errors in our pipeline and will be executed once the pipeline has been completed:

pipeline(...streams, callback)


const fs = require("fs");
const { pipeline, Transform } = require("stream");

pipeline(
  streamA,
  streamB,
  streamC,
  (err) => {
    if (err) {
      console.error("An error occured in pipeline.", err);
    } else {
      console.log("Pipeline execcution successful");
    }
  }
);

When using pipeline, the series of streams should be passed sequentially in the order in which they need to be executed.

Error handling using pipes

We can also handle stream errors using pipes as follows:

const fs = require("fs");
const fileStream= fs.createReadStream("./file.txt");
let b = otherStreamType()
let c = createWriteStream()
fileStream.on('error', function(e){handleError(e)})
.pipe(b)
.on('error', function(e){handleError(e)})
.pipe(c)
.on('error', function(e){handleError(e)});

As seen in the above snippet, we have to create an error event handler for each pipe created. With this, we can keep track of the context for errors, which becomes useful when debugging. The drawback with this technique is its verbosity.

Comments

Popular posts from this blog

How to use Ngx-Charts in Angular ?

Charts helps us to visualize large amount of data in an easy to understand and interactive way. This helps businesses to grow more by taking important decisions from the data. For example, e-commerce can have charts or reports for product sales, with various categories like product type, year, etc. In angular, we have various charting libraries to create charts.  Ngx-charts  is one of them. Check out the list of  best angular chart libraries .  In this article, we will see data visualization with ngx-charts and how to use ngx-charts in angular application ? We will see, How to install ngx-charts in angular ? Create a vertical bar chart Create a pie chart, advanced pie chart and pie chart grid Introduction ngx-charts  is an open-source and declarative charting framework for angular2+. It is maintained by  Swimlane . It is using Angular to render and animate the SVG elements with all of its binding and speed goodness and uses d3 for the excellent math functio...

JavaScript new features in ES2019(ES10)

The 2019 edition of the ECMAScript specification has many new features. Among them, I will summarize the ones that seem most useful to me. First, you can run these examples in  node.js ≥12 . To Install Node.js 12 on Ubuntu-Debian-Mint you can do the following: $sudo apt update $sudo apt -y upgrade $sudo apt update $sudo apt -y install curl dirmngr apt-transport-https lsb-release ca-certificates $curl -sL https://deb.nodesource.com/setup_12.x | sudo -E bash - $sudo apt -y install nodejs Or, in  Chrome Version ≥72,  you can try those features in the developer console(Alt +j). Array.prototype.flat && Array.prototype. flatMap The  flat()  method creates a new array with all sub-array elements concatenated into it recursively up to the specified depth. let array1 = ['a','b', [1, 2, 3]]; let array2 = array1.flat(); //['a', 'b', 1, 2, 3] We should also note that the method excludes gaps or empty elements in the array: let array1 ...

Understand Angular’s forRoot and forChild

  forRoot   /   forChild   is a pattern for singleton services that most of us know from routing. Routing is actually the main use case for it and as it is not commonly used outside of it, I wouldn’t be surprised if most Angular developers haven’t given it a second thought. However, as the official Angular documentation puts it: “Understanding how  forRoot()  works to make sure a service is a singleton will inform your development at a deeper level.” So let’s go. Providers & Injectors Angular comes with a dependency injection (DI) mechanism. When a component depends on a service, you don’t manually create an instance of the service. You  inject  the service and the dependency injection system takes care of providing an instance. import { Component, OnInit } from '@angular/core'; import { TestService } from 'src/app/services/test.service'; @Component({ selector: 'app-test', templateUrl: './test.component.html', styleUrls: ['./test.compon...