Skip to main content

Asynchronous task processing in Node.js with Bull

 When handling requests from API clients, you might run into a situation where a request initiates a CPU-intensive operation that could potentially block other requests. Instead of processing such tasks immediately and blocking other requests, you can defer it to be processed in the future by adding information about the task in a processor called a queue. A task consumer will then pick up the task from the queue and process it.

Queues are helpful for solving common application scaling and performance challenges in an elegant way. According to the NestJS documentation, examples of problems that queues can help solve include:

  • Smoothing out processing peaks
  • Breaking up monolithic tasks that may otherwise block the Node.js event loop
  • Providing a reliable communication channel across various services

Bull is a Node library that implements a fast and robust queue system based on Redis. Although it is possible to implement queues directly using Redis commands, Bull is an abstraction/wrapper on top of Redis. It provides an API that takes care of all the low-level details and enriches Redis’ basic functionality so that more complex use cases can be handled easily.

Installation

Before we begin using Bull, we need to have Redis installed. Follow the guide on Redis Labs guide to install Redis, then install Bull using npm or yarn.

npm install bull --save

Or:

yarn add bull

Creating a queue

Create a queue by instantiating a new instance of Bull.

Syntax

Queue(queueName: string, url?: string, opts?: QueueOptions): Queue

The optional url parameter is used to specify the Redis connection string. If no url is specified, bull will try to connect to default Redis server running on localhost:6379

QueueOptions interface

interface QueueOptions {
  limiter?: RateLimiter;
  redis?: RedisOpts;
  prefix?: string = 'bull'; // prefix for all queue keys.
  defaultJobOptions?: JobOpts;
  settings?: AdvancedSettings;
}

RateLimiter

limiter:RateLimiter is an optional field in QueueOptions used to configure maximum number and duration of jobs that can be processed at a time. See RateLimiter for more information.

RedisOption

redis: RedisOpts is also an optional field in QueueOptions. It’s an alternative to Redis url string. See RedisOpts for more information.

AdvancedSettings

settings: AdvancedSettings is an advanced queue configuration settings. It is optional, and Bull warns that shouldn’t override the default advanced settings unless you have a good understanding of the internals of the queue. See AdvancedSettings for more information.

A basic queue would look like this:

const Queue = require(bull);

const videoQueue - new Queue('video');

Creating a queue with QueueOptions

// limit the queue to a maximum of 100 jobs per 10 seconds
const Queue = require(bull);

const videoQueue - new Queue('video', {
  limiter: {
  max: 100,
  duration: 10000
  }
});

Each queue instance can perform three different roles: job producer, job consumer, and/or events listener. Each queue can have one or many producers, consumers, and listeners.

Producers

A job producer creates and adds a task to a queue instance. Redis stores only serialized data, so the task should be added to the queue as a JavaScript object, which is a serializable data format.

add(name?: string, data: object, opts?: JobOpts): Promise<Job>

A task would be executed immediately if the queue is empty. Otherwise, the task would be added to the queue and executed once the processor idles out or based on task priority.

You can add the optional name argument to ensure that only a processor defined with a specific name will execute a task. A named job must have a corresponding named consumer. Otherwise, the queue will complain that you’re missing a processor for the given job.

Job options

Jobs can have additional options associated with them. Pass an options object after the data argument in the add() method.

Job options properties include:

interface JobOpts {
  priority: number; // Optional priority value. ranges from 1 (highest priority) to MAX_INT  (lowest priority). Note that
  // using priorities has a slight impact on performance, so do not use it if not required.

  delay: number; // An amount of miliseconds to wait until this job can be processed. Note that for accurate delays, both
  // server and clients should have their clocks synchronized. [optional].

  attempts: number; // The total number of attempts to try the job until it completes.

  repeat: RepeatOpts; // Repeat job according to a cron specification.

  backoff: number | BackoffOpts; // Backoff setting for automatic retries if the job fails

  lifo: boolean; // if true, adds the job to the right of the queue instead of the left (default false)
  timeout: number; // The number of milliseconds after which the job should be fail with a timeout error [optional]

  jobId: number | string; // Override the job ID - by default, the job ID is a unique
  // integer, but you can use this setting to override it.
  // If you use this option, it is up to you to ensure the
  // jobId is unique. If you attempt to add a job with an id that
  // already exists, it will not be added.

  removeOnComplete: boolean | number; // If true, removes the job when it successfully
  // completes. A number specified the amount of jobs to keep. Default behavior is to keep the job in the completed set.

  removeOnFail: boolean | number; // If true, removes the job when it fails after all attempts. A number specified the amount of jobs to keep
  // Default behavior is to keep the job in the failed set.
  stackTraceLimit: number; // Limits the amount of stack trace lines that will be recorded in the stacktrace.
}


interface RepeatOpts {
  cron?: string; // Cron string
  tz?: string; // Timezone
  startDate?: Date | string | number; // Start date when the repeat job should start repeating (only with cron).
  endDate?: Date | string | number; // End date when the repeat job should stop repeating.
  limit?: number; // Number of times the job should repeat at max.
  every?: number; // Repeat every millis (cron setting cannot be used together with this setting.)
  count?: number; // The start value for the repeat iteration count.
}


interface BackoffOpts {
  type: string; // Backoff type, which can be either `fixed` or `exponential`. A custom backoff strategy can also be specified in `backoffStrategies` on the queue settings.
  delay: number; // Backoff delay, in milliseconds.
}

A basic producer would look like this:

const videoQueue - new Queue('video')

videoQueue.add({video: 'video.mp4'})

A named job can be defined like so:

videoQueue.add('video'. {input: 'video.mp4'})

Below is an example of customizing a job with job options.

videoQueue.add('video'. {input: 'video.mp4'}, {delay: 3000, attempts: 5, lifo: true, timeout: 10000 })

Consumers

A job consumer, also called a worker, defines a process function (processor). The process function is responsible for handling each job in the queue.

process(processor: ((job, done?) => Promise<any>) | string)

If the queue is empty, the process function will be called once a job is added to the queue. Otherwise, it will be called every time the worker is idling and there are jobs in the queue to be processed.

The process function is passed an instance of the job as the first argument. A job includes all relevant data the process function needs to handle a task. The data is contained in the data property of the job object. A job also contains methods such as progress(progress?: number) for reporting the job’s progress, log(row: string) for adding a log row to this job-specific job, moveToCompletedmoveToFailed, etc.

Bull processes jobs in the order in which they were added to the queue. If you want jobs to be processed in parallel, specify a concurrency argument. Bull will then call the workers in parallel, respecting the maximum value of the RateLimiter .

process(concurrency: number, processor: ((job, done?) => Promise<any>) | string)

As shown above, a job can be named. A named job can only be processed by a named processor. Define a named processor by specifying a name argument in the process function.

process(name: string, concurrency: number, processor: ((job, done?) => Promise<any>) | string)

Event listeners

Throughout the lifecycle of a queue and/or job, Bull emits useful events that you can listen to using event listeners. An event can be local to a given queue instance (worker). Listeners to a local event will only receive notifications produced in the given queue instance.

Below is a local progress event.

queue.on('progress', function(job, progress){
  console.log(`${jod.id} is in progress`)
})

Other possible events types include errorwaitingactivestalledcompletedfailedpausedresumedcleaneddrained, and removed.

By prefixing global: to the local event name, you can listen to all events produced by all the workers on a given queue.

Below is a global progress event.

queue.on('global:progress', function(jobId){
  console.log(`${jobId} is in progress`)
})

Notice that for a global event, the jobId is passed instead of a the job object.

A practical example

Let’s say an e-commerce company wants to encourage customers to buy new products in its marketplace. The company decided to add an option for users to opt into emails about new products.

Because outgoing email is one of those internet services that can have very high latencies and fail, we need to keep the act of sending emails for new marketplace arrivals out of the typical code flow for those operations. To do this, we’ll use a task queue to keep a record of who needs to be emailed.

const Queue = require('bull');
const sgMail = require('@sendgrid/mail');
sgMail.setApiKey(process.env.SENDGRID_API_KEY);

export class EmailQueue{
  constructor(){
    // initialize queue
    this.queue = new Queue('marketplaceArrival');
    // add a worker
    this.queue.process('email', job => {
      this.sendEmail(job)
    })
  }
  addEmailToQueue(data){
    this.queue.add('email', data)
  }
  async sendEmail(job){
    const { to, from, subject, text, html} = job.data;
    const msg = {
      to,
      from,
      subject,
      text,
      html
    };
    try {
      await sgMail.send(msg)
      job.moveToCompleted('done', true)
    } catch (error) {
      if (error.response) {
        job.moveToFailed({message: 'job failed'})
      }
    }
  }
}

Conclusion

By now, you should have a solid, foundational understanding of what Bull does and how to use it.

To learn more about implementing a task queue with Bull, check out some common patterns on GitHub.


Comments

Popular posts from this blog

How to use Ngx-Charts in Angular ?

Charts helps us to visualize large amount of data in an easy to understand and interactive way. This helps businesses to grow more by taking important decisions from the data. For example, e-commerce can have charts or reports for product sales, with various categories like product type, year, etc. In angular, we have various charting libraries to create charts.  Ngx-charts  is one of them. Check out the list of  best angular chart libraries .  In this article, we will see data visualization with ngx-charts and how to use ngx-charts in angular application ? We will see, How to install ngx-charts in angular ? Create a vertical bar chart Create a pie chart, advanced pie chart and pie chart grid Introduction ngx-charts  is an open-source and declarative charting framework for angular2+. It is maintained by  Swimlane . It is using Angular to render and animate the SVG elements with all of its binding and speed goodness and uses d3 for the excellent math functio...

Understand Angular’s forRoot and forChild

  forRoot   /   forChild   is a pattern for singleton services that most of us know from routing. Routing is actually the main use case for it and as it is not commonly used outside of it, I wouldn’t be surprised if most Angular developers haven’t given it a second thought. However, as the official Angular documentation puts it: “Understanding how  forRoot()  works to make sure a service is a singleton will inform your development at a deeper level.” So let’s go. Providers & Injectors Angular comes with a dependency injection (DI) mechanism. When a component depends on a service, you don’t manually create an instance of the service. You  inject  the service and the dependency injection system takes care of providing an instance. import { Component, OnInit } from '@angular/core'; import { TestService } from 'src/app/services/test.service'; @Component({ selector: 'app-test', templateUrl: './test.component.html', styleUrls: ['./test.compon...

How to solve Puppeteer TimeoutError: Navigation timeout of 30000 ms exceeded

During the automation of multiple tasks on my job and personal projects, i decided to move on  Puppeteer  instead of the old school PhantomJS. One of the most usual problems with pages that contain a lot of content, because of the ads, images etc. is the load time, an exception is thrown (specifically the TimeoutError) after a page takes more than 30000ms (30 seconds) to load totally. To solve this problem, you will have 2 options, either to increase this timeout in the configuration or remove it at all. Personally, i prefer to remove the limit as i know that the pages that i work with will end up loading someday. In this article, i'll explain you briefly 2 ways to bypass this limitation. A. Globally on the tab The option that i prefer, as i browse multiple pages in the same tab, is to remove the timeout limit on the tab that i use to browse. For example, to remove the limit you should add: await page . setDefaultNavigationTimeout ( 0 ) ;  COPY SNIPPET The setDefaultNav...