Skip to main content

How to Build a Simple Message Queue in Node.js and RabbitMQ

The producer and consumer do not interact directly with each other, but they interact with message queue. This way of handling messages decouples the producer from the consumer so that they do not need to interact with the message queue simultaneously.

Initializing Node.js Application

Initialize the application using the following commands:

mkdir node-queue 

cd node-queue

npm init -y

touch index.js

Install the AMQP dependency which will be used for integrating Node.js application with RabbitMQ.

npm install –-save amqplib

Installing RabbitMQ

I have installed RabbitMQ to demonstrate this example by downloading here. Once you have installed, create a queue called ‘node_queue’.

How to Build a Simple Message Queue in Node.js and RabbitMQ

RabbitMQ

Implementing Producer

We will now implement a producer in Node.js application, which will put a message on the queue.

const amqp = require('amqplib/callback_api');

amqp.connect('amqp://localhost', function(error, connection) {
  if (error) {
    throw error;
  }
  connection.createChannel(function(error1, channel) {
    if (error1) {
      throw error1;
    }

    let queue = 'node_queue';
    let msg = 'Test message';

    channel.assertQueue(queue, {
      durable: true
    });
    channel.sendToQueue(queue, Buffer.from(msg), {
      persistent: true
    });
    console.log("Sent '%s'", msg);
  });
  setTimeout(function() {
    connection.close();
    process.exit(0)
  }, 500);
});

In the above code snippet, we have imported amqplib. We establish the connection to RabbitMQ server using amqp.connect(). Then we create a channel within which the API for sending message to the queue resides.

channel.assertQueue() asserts a queue into existence. If the queue does not exist, it will create on the server. The message will be sent to the server in byte array format. The connection will be closed after a short period of time.

Now, start the Node.js app by running the command: node index.js.

Navigate to the RabbitMQ dashboard. We will see that one message is ready on the queue.

How to Build a Simple Message Queue in Node.js and RabbitMQ

Shows that one message is available on queue

Implementing Consumer

Let us implement a consumer which will consume the message placed on the queue.

amqp.connect('amqp://localhost', function(error0, connection) {
  if (error0) {
    throw error0;
  }
  connection.createChannel(function(error1, channel) {
    if (error1) {
      throw error1;
    }
    var queue = 'node_queue';

    channel.assertQueue(queue, {
      durable: true
    });
    channel.prefetch(1);

    console.log("Waiting for messages in %s", queue);
    channel.consume(queue, function(msg) {

      console.log("Received '%s'", msg.content.toString());

      setTimeout(function() {
        channel.ack(msg);
      }, 1000);
    });
  });
});

Establishing the connection to RabbitMQ server is same as producer. We establish the connection and declare the queue from which we will consume messages.

We also assert queue here as consumer might start before the publisher, therefore we want to ensure that the queue exists before consumer tries to consume messages from it.

channel.consume() retrieves the message from the server.

Restart the application and navigate to the dashboard. We will see that no messages are left to be consumed.

How to Build a Simple Message Queue in Node.js and RabbitMQ

Shows that no message is waiting to be consumed

In this article, I have implemented both producer and consumer in the same application. Note that they can be configured separately depending on the requirement.

The complete example can be found on this GitHub repository.

Comments

Popular posts from this blog

How to use Ngx-Charts in Angular ?

Charts helps us to visualize large amount of data in an easy to understand and interactive way. This helps businesses to grow more by taking important decisions from the data. For example, e-commerce can have charts or reports for product sales, with various categories like product type, year, etc. In angular, we have various charting libraries to create charts.  Ngx-charts  is one of them. Check out the list of  best angular chart libraries .  In this article, we will see data visualization with ngx-charts and how to use ngx-charts in angular application ? We will see, How to install ngx-charts in angular ? Create a vertical bar chart Create a pie chart, advanced pie chart and pie chart grid Introduction ngx-charts  is an open-source and declarative charting framework for angular2+. It is maintained by  Swimlane . It is using Angular to render and animate the SVG elements with all of its binding and speed goodness and uses d3 for the excellent math functio...

Understand Angular’s forRoot and forChild

  forRoot   /   forChild   is a pattern for singleton services that most of us know from routing. Routing is actually the main use case for it and as it is not commonly used outside of it, I wouldn’t be surprised if most Angular developers haven’t given it a second thought. However, as the official Angular documentation puts it: “Understanding how  forRoot()  works to make sure a service is a singleton will inform your development at a deeper level.” So let’s go. Providers & Injectors Angular comes with a dependency injection (DI) mechanism. When a component depends on a service, you don’t manually create an instance of the service. You  inject  the service and the dependency injection system takes care of providing an instance. import { Component, OnInit } from '@angular/core'; import { TestService } from 'src/app/services/test.service'; @Component({ selector: 'app-test', templateUrl: './test.component.html', styleUrls: ['./test.compon...

How to solve Puppeteer TimeoutError: Navigation timeout of 30000 ms exceeded

During the automation of multiple tasks on my job and personal projects, i decided to move on  Puppeteer  instead of the old school PhantomJS. One of the most usual problems with pages that contain a lot of content, because of the ads, images etc. is the load time, an exception is thrown (specifically the TimeoutError) after a page takes more than 30000ms (30 seconds) to load totally. To solve this problem, you will have 2 options, either to increase this timeout in the configuration or remove it at all. Personally, i prefer to remove the limit as i know that the pages that i work with will end up loading someday. In this article, i'll explain you briefly 2 ways to bypass this limitation. A. Globally on the tab The option that i prefer, as i browse multiple pages in the same tab, is to remove the timeout limit on the tab that i use to browse. For example, to remove the limit you should add: await page . setDefaultNavigationTimeout ( 0 ) ;  COPY SNIPPET The setDefaultNav...