Skip to main content

Divide and conquer: Scale your Node.js app using distributed queues

 I talked about how to run background tasks/jobs in Node.js (with the new worker_threads module in particular). But what happens if you are reaching the limits of the machine your Node.js instance is running in? Then you need to either move to a bigger machine (known as scaling vertically)or scale horizontally. Scaling vertically always has a limit, so at some point, you’ll need to scale horizontally.
But how? If your app is, for example, a web server that needs to send responses almost immediately, then you need something like a load balancer. In contrast, if your app needs to do work but it is not required to be done immediately, then you can spread the work to “worker” nodes and distribute it using queues.
Some use cases include generating daily reports, recalculating things for users on a daily basis (e.g. recommendations), processing things a user has uploaded (e.g a large csv file, importing data when a user migrates to a service, importing data when the user signs in).
A distributed queue is like storage of job descriptions that contain enough information to do the job, or enough information to figure out all of the things required to do the job. For example:
const jobDataSendWelcomeEmail = {
  userId: 1234,
  userName: 'John Smith',
  email: 'user@example.com'
}
Usually, the main app (or any part of a more complex system), puts jobs into the queue. Other apps running in different machines are connected to the queue and receive those jobs. These consumers are able to process the job with the information received, or at least they are able to figure out all of the information they need and obtain it. This simple architecture has important benefits:
  • Your app is divided now into two logic pieces that can be distributed in different machines
  • You can scale from one to many workers without touching any code and without disrupting the execution of the main app. The queue takes care of sending the jobs to the workers through the network and in most implementations, takes care of sending the same job once to a worker
Note: Each vendor has its own jargon for queues (topics, channels), jobs (tasks, messages) and workers (consumers).

Doing it yourself?

You might be thinking that you can implement this architecture yourself with your existent database and without adding complexity to the system. You can create a “jobs” table with two columns, an “id” primary key column and a “data” column with all of the job information. The main app just writes to the table and every X seconds the workers read from it to peek at the next job that is to be executed. In order to prevent other workers from reading the job, you make the operation in a transaction that also deletes the job from the table.
Voilá! Problem solved, right? Well, first of all, you are querying and waiting every X seconds. That’s not ideal, but could be okay in basic use cases. More importantly, the problem is, what happens if the worker crashes while processing the job? The job has already been deleted when it was pulled from the table and we cannot recover it… this (along with other things) is nicely solved by the libraries and services implemented for the matter and you don’t have to reinvent the wheel.

Reasons to use a queue service

One great thing about queue systems is how they handle error scenarios. When you receive a job, this is not deleted from the queue, but it is “locked” or invisible to the rest of the workers until one of these happens, either the worker deletes it after the work is done, or there is a timeout that you can configure. So, if a worker crashes, the timeout happens and the job goes back to the queue to be consumed by other workers. When everything is fine, the worker just deletes the job once the data is processed.
That is great if the problem was in the worker (the machine was shut down, ran out of resources,etc…) but what if the problem is in the code that processes the jobs, and every time the queue sends it to a worker, the worker crashes?
Then we are in an infinite loop of failures, right? Nope, distributed queues usually have a configuration option to set a maximum number of retries. If the maximum number of retries is reached then depending on the queue you can configure different things. A typical adjustment is moving those jobs to a “failure queue” for manual inspection or to consume it for workers that just notify errors.
Not only are distributed queue implementations great for handling these errors but also, they use different mechanisms to send jobs to workers as soon as possible. Some implementations use sockets, others use HTTP long polling, and others might use other mechanisms. This is an implementation detail, but I want to highlight that is not trivial to implement, so you better use existing and battle-tested implementations rather than implementing your own.

What to put in the job data

Many times I find myself wondering what to put in the job data. The answer depends on your use case, but it always boils down to two principles:
  • Don’t put too much. The amount of data you can put in the job data is limited. Check the queuing system you are using for more information. Usually, it’s big enough that we won’t reach the limit, but sometimes we are tempted to put too much. For example, if you need to process a big CSV file, you cannot put it in the queue. You’ll need to upload it first to a storage service and then create a job with a URL to the file and additional information you need such as the user that uploaded it, etc.
  • Don’t put too little. If you have immutable data (e.g. a createdAt date) or data that rarely changes (e.g. usernames) you can put it in your job data. The job should be processed in a matter of seconds or minutes so usually, it is ok to put some data that might change, like a user name, but it is not critical if it’s not updated to the second. You can save queries to the database, or remove any query completely. However, if there’s information that affects how the data is processed, you should query it inside the job processor.

Make your jobs small and fast to process


If you need to process big sets of data, divide them into smaller pieces. If you have to process a big CSV file, first, divide it into chunks of a certain number of rows and create a job per chunk. There are a few benefits of doing it this way:
  • The data will be processed faster because it can be processed in parallel
  • You make better use of your resources. It’s better to have Nworkers doing smaller jobs than having one worker doing heavy processing while the rest are idle or underused
  • It’s also faster and more efficient to retry a small job that has failed as opposed to a big job that has failed
If you need an aggregated result from all of those small chunks you can put all of the intermediate results in a database, and when they are all done you can trigger a new job in another queue that aggregates the result. This is map/reduce in essence. “Map” is the step that divides a large job into smaller jobs and then “reduce” is the step that aggregates the result of those smaller jobs.
If you cannot divide your data beforehand you should still do the processing in small jobs. For example, if you need to use an external API that uses cursors for paginating results, calculating all of the cursors beforehand is impractical. You can process one page of results per job and once the job is processed you get the cursor to the next page and you create a new job with that cursor, so the next job will process the next page and so on.

Delayed jobs

Another interesting feature of distributed queues is that you can usually delay jobs. There’s normally a limit on this so you cannot delay a job for two years, but there are some use cases where this is useful. Some examples include:
  • You want to send a welcome email to a user that signed up but not immediately just at a later time. Just create a delayed job that sends an email
  • When processing a job you hit a rate limit from an API. You will probably be told when the rate limit ends so you can put the job back to the queue, but delayed that specific time
  • In general, if you want to trigger something at a specific time in the future such as schedule a backup, a notification, a reminder, etc…

Job priority

Most queue implementations do not guarantee the order of execution of the jobs, so don’t rely on that. However, they usually implement some way of prioritizing some jobs over others. This depends highly on the implementation, so take a look at the docs of the system you are using to see how you can achieve it if you need to.

Show me the code

Let’s look at some examples. Even though all queuing systems have similar features there’s not a common API for them, so we are going to see a few different examples.

The kue library

Kue is a nice library developed by Automattic (the company behind WordPress) that implements a queuing system on top of Redis. Redis is an in-memory database that can be persisted and many times is already being used for things like session storage in your application. For this reason, choosing this library can be a no-brainer. Besides, even if you are not using Redis yet, there are a few cloud providers that allow you to spin up a managed Redis server easily (e.g. Heroku or AWS). Finally, another benefit of using kue is that your stack is 100% open source so you don’t fall into any vendor lock-in.
If you need to handle a lot of work and you still want an open source solution, then I would choose RabbitMQ. I haven’t chosen it for the examples in this article because Redis is usually easier to set up and more common. However RabbitMQ has been designed specifically for these use cases, so by design, it’s technically superior.
Let’s see how to create and consume jobs using kue.
Create the queue and put a job on it:
const kue = require('kue')
const queue = kue.createQueue()
const job = queue.create('queue_name', jobData).save(err => {
  if (!err) console.log(job.id)
})
Consume jobs from the queue:
const kue = require('kue')
const queue = kue.createQueue()
queue.process('queue_name', (job, done) => {
  // Do something
  done() // This deletes the job from the queue
})

Azure using its Service Bus

Microsoft Azure offers two queue services. There’s a great comparison here. I’ve chosen to use Service Bus because it guarantees that a job is delivered at most to one worker.
Let’s see how to create and consume jobs using Service Bus.

Create the queue and put a job on it

With Microsoft Azure we can create the queue programmatically with the createTopicIfNotExistsmethod. Once it is created, we can start sending messages:
const azure = require('azure')
const serviceBusService = azure.createServiceBusService()
const jobData = { hello: 'world' }
serviceBusService.createTopicIfNotExists('queue_name', err => {
  if (err) console.log(err)
  serviceBusService.sendTopicMessage('queue_name', jobData, err => {
    if (err) console.log(err)
  })
})

Consume jobs from the queue

Some implementations, like this one, are required to create a subscription. Check out the Azure docs for more information on this topic:
const azure = require('azure')
const serviceBusService = azure.createServiceBusService()
serviceBusService.createSubscription('queue_name', 'subscription_name', err => {
  if (err) return console.log(err)
  // If the `isPeekLock` option is not set to true, the message will be deleted when peeked
  serviceBusService.receiveSubscriptionMessage('queue_name', 'subscription_name', { isPeekLock: true }, (err, message) => {
    if (err) return console.log(err)
    // Do something with `message` and then delete it
    serviceBusService.deleteMessage(lockedMessage, err => {
      if (err) return console.log(err)
    })
  })
})

Amazon, using its SQS service

The Amazon distributed queue service is called Simple Queue Service (SQS). It can be used directly but it is also possible to configure it with other AWS services for doing interesting workflows. For example, you can configure an S3 bucket to automatically send jobs to an SQS queue when a new file (object) is stored. This, for example, can be useful to process files easily (videos, images, CSVs,…).
Let’s see how we can programmatically add and consume jobs on a queue.
Create the queue and put a job on it:
const AWS = require('aws-sdk')
AWS.config.update({region: 'REGION'})
const sqs = new AWS.SQS({ apiVersion: '2012-11-05' })
const queueParams = {
  QueueName: 'queue_name',
  Attributes: {
    'DelaySeconds': '60',
    'MessageRetentionPeriod': '86400'
  }
}
sqs.createQueue(queueParams, (err, data) => {
  if (err) return console.log(err)
  console.log('queue url', data.QueueUrl)
  const jobParams = {
    MessageBody: JSON.stringify({ hello: 'world' }),
    QueueUrl: data.QueueUrl
  }
  sqs.sendMessage(jobParams, (err, data) => {
    if (err) return console.log(err)
  })
})
Consume jobs from the queue:
const AWS = require('aws-sdk')
AWS.config.update({region: 'REGION'})
const sqs = new AWS.SQS({apiVersion: '2012-11-05'})
const queueURL = 'SQS_QUEUE_URL' // Obtained when the queue was created
const params = {
 AttributeNames: [
  'SentTimestamp'
 ],
 MaxNumberOfMessages: 1, // receive only one message at a time
 MessageAttributeNames: [
    "All"
 ],
 QueueUrl: queueURL,
 VisibilityTimeout: 20,
 WaitTimeSeconds: 0
}
sqs.receiveMessage(params, (err, data) => {
  if (err) return console.log(err)
  if (data.Messages) {
    // Do something when the messages and then delete them
    const message = data.Messages[0]
    
    const deleteParams = {
      QueueUrl: queueURL,
      ReceiptHandle: message.ReceiptHandle
    }
    sqs.deleteMessage(deleteParams, function(err, data) {
      if (err) return console.log(err)
    })
  }
})
Check the Node.js docs on SQS for more information.

Google Cloud, using its pub/sub service

Google Cloud, like Azure, also requires to create subscriptions (see the docs for more information). In fact, you need to create the subscription first, before sending messages to the topic/queue or they will not be available.
The documentation suggests to create both the topic and the subscription from the command line:
gcloud pubsub topics create queue_name
and
gcloud pubsub subscriptions create subscription_name --topic queue_name
Nevertheless, you can also create them programmatically, but now let’s see how to insert and consume jobs assuming that we have already created the queue (topic) and the subscription.
Create the queue and put a job on it:
const { PubSub } = require('@google-cloud/pubsub')
const pubsub = new PubSub()
const jobData = Buffer.from(JSON.stringify({ hello: 'world' }))
const messageId = await pubsub.topic('queue_name').publish(jobData)

Consume jobs from the queue

Google Cloud Pub/Sub guarantees that a message/job is delivered at least once for every subscription, but the message could be delivered more than once (as always, check the documentation for more information):
const { PubSub } = require('@google-cloud/pubsub')
const pubsub = new PubSub()
const subscription = pubsub.subscription('subscription_name')
subscription.on('message', message => {
  const { data } = message
  // Do something and then
  message.ack() // This deletes the message from the queue
})

Conclusions

Distributed queues are a great way of scaling your application for a few reasons:
  • They allow you to divide your application into logical pieces that can be scaled individually and gracefully
  • They have solid mechanisms to handle errors gracefully
  • They provide other interesting features such as delayed jobs and prioritization
  • There are many services with similar functionalities and also open source libraries that you can use without worrying about vendor lock-in

Comments

Popular posts from this blog

How to use Ngx-Charts in Angular ?

Charts helps us to visualize large amount of data in an easy to understand and interactive way. This helps businesses to grow more by taking important decisions from the data. For example, e-commerce can have charts or reports for product sales, with various categories like product type, year, etc. In angular, we have various charting libraries to create charts.  Ngx-charts  is one of them. Check out the list of  best angular chart libraries .  In this article, we will see data visualization with ngx-charts and how to use ngx-charts in angular application ? We will see, How to install ngx-charts in angular ? Create a vertical bar chart Create a pie chart, advanced pie chart and pie chart grid Introduction ngx-charts  is an open-source and declarative charting framework for angular2+. It is maintained by  Swimlane . It is using Angular to render and animate the SVG elements with all of its binding and speed goodness and uses d3 for the excellent math functio...

Understand Angular’s forRoot and forChild

  forRoot   /   forChild   is a pattern for singleton services that most of us know from routing. Routing is actually the main use case for it and as it is not commonly used outside of it, I wouldn’t be surprised if most Angular developers haven’t given it a second thought. However, as the official Angular documentation puts it: “Understanding how  forRoot()  works to make sure a service is a singleton will inform your development at a deeper level.” So let’s go. Providers & Injectors Angular comes with a dependency injection (DI) mechanism. When a component depends on a service, you don’t manually create an instance of the service. You  inject  the service and the dependency injection system takes care of providing an instance. import { Component, OnInit } from '@angular/core'; import { TestService } from 'src/app/services/test.service'; @Component({ selector: 'app-test', templateUrl: './test.component.html', styleUrls: ['./test.compon...

How to solve Puppeteer TimeoutError: Navigation timeout of 30000 ms exceeded

During the automation of multiple tasks on my job and personal projects, i decided to move on  Puppeteer  instead of the old school PhantomJS. One of the most usual problems with pages that contain a lot of content, because of the ads, images etc. is the load time, an exception is thrown (specifically the TimeoutError) after a page takes more than 30000ms (30 seconds) to load totally. To solve this problem, you will have 2 options, either to increase this timeout in the configuration or remove it at all. Personally, i prefer to remove the limit as i know that the pages that i work with will end up loading someday. In this article, i'll explain you briefly 2 ways to bypass this limitation. A. Globally on the tab The option that i prefer, as i browse multiple pages in the same tab, is to remove the timeout limit on the tab that i use to browse. For example, to remove the limit you should add: await page . setDefaultNavigationTimeout ( 0 ) ;  COPY SNIPPET The setDefaultNav...