Skip to main content

Getting started with Cloud Pub/Sub in Node

Introduction

In recent years, EDA(event-driven architecture) has been on the rise due to the popularity of microservices. Microservices are made up of small independent units deployed independently communicating together to make up a full-fledged application. Maintaining resilient communications with these services can be a hassle due to their individuality and modularity. This is why message brokers are needed.

A message broker, simply put, is an intermediary software that relays messages from one or more services to other services. An example of such software is Cloud Pub/Sub.

Cloud Pub/Sub is a fully managed message broker by Google which relays information passed from a service/services to other subscribing services using the publisher-subscriber model. Pub/Sub makes it easy for teams to easily set up communication channels between services and not have to worry about individual service uptime, retry policies, and idempotency.

In this article, we will be taking a look at how we can implement inter-service communications using Cloud Pub/Sub in Node.js.

Prerequisites

This tutorial uses the following:

  1. Basic knowledge of Node.js
  2. Yarn or NPM installed (we’ll be using yarn)
  3. Git installed
  4. Ngrok installed
  5. A Google account
  6. A development machine configured to run Node.js

Installation

Run the following commands in any terminal of choice to initialize the project directory:

$ git clone -b boilerplate http://github.com/enkaypeter/cloud-pub-sub-tut/
$ cd cloud-pub-sub-tut && yarn

Packages installed:

  • express — A lightweight Node.js web framework for spinning up RESTful APIs. We will use this to handle routing in our backend API
  • nodemon — This package will help us automatically restart our development server when we make code changes/edits
  • body-parser — A middleware to parse incoming request inputs into our req.body object
  • morgan — HTTP request logger middleware for Node.js. This will help us debug our API while in development
  • helmet — This is a collection of middlewares for which our express-based server by setting HTTP headers which conforms to best security practices
  • cors — This package will help enable cross-origin resource sharing on our server
  • dotenv — This package will enable us to have access to the environment defined in a .env file from our Node application via the process.env object
  • google-cloud/pubsub — This is the Node.js client for Cloud Pub/Sub. We will be using this to publish messages and subscribe to topics defined in our pub/sub console

Setting up a service account for Pub/Sub on GCP

To initialize Pub/Sub in our Node app, we will need to get a service account configured with Pub/Sub access on our GCP (Google Cloud Platform) console. We will do so in the following steps:

  • Log in to the Google Cloud Console and select a project or follow the prompt to create one if it’s your first time
  • Navigate to the Pub/Sub Section to enable the API for your project
  • Head over to the Service Accounts section to select a project and create a service account like so:

Enter a service account name e.g pub-sub-key and click on create.

  • Grant access Pub/Sub admin access to the services account like so:

grant access to the pub adminClick on Continue to give grant access to the service account

  • Click on Create Key, select the JSON option, and download the JSON object. We will be needing this file inside our Node project directory to authenticate the pub/sub client

Definition of terms

In this section, we will define a couple of terms that will be used throughout this article. We will also take a look at a typical message flow from the publisher to subscriber.

  • Message — This is the data entity which is relayed to subscribing services on a particular topic
  • Topic — As with every conversation, there’s is a theme of communication. A topic is a theme which represents a feed of messages
  • Subscription — This is a coalition of subscriber entities which receives published messages on a particular topic
  • Subscriber — This is an entity which is set on course to receive and acknowledge messages from a particular topic either by push or pull
  • Publisher — This is an entity which creates and broadcasts a message to subscribing services on a topic

To understand properly how a message is being passed from a publisher to subscriber, we will take a look at a flow diagram thePub/Sub documentation:

pub sub flow

  1. The publisher application (e.g an orders service) sends a message (orders object) to a topic (orders_topic)
  2. Pub/Sub ensures messages are retained in storage until services subscribing to the said topic (orders_topic) acknowledges message receipt
  3. Pub/Sub then fans-out the message to all subscribing entities in the orders_topic
  4. A subscriber then receives the message from Pub/Sub. This is either done by pushing to a subscriber configured endpoint or by the subscriber pulling the said message from Pub/Sub
  5. The subscriber sends an acknowledgment to Pub/Sub for each message received and then removes the acknowledged message from the message queue

Problem domain and proposed solution

In this section, we will state our problem domain and proposed solution (proof of concept) which we will be building together as we go further.

Problem domain

Say for instance an e-commerce software/solution with multiple services. An order service, delivery service, and notification service. The orders service receives the user’s order, processes it, and sends it to the delivery service for processing. The delivery services will then process it and from time to time notifies (notification service) the user on the delivery status of the package.

Implementing this software architecture by making regular HTTP calls between these services will cause an anomalous system behavior when edge-cases such as service downtimes, the addition of more services, etc. are introduced.

Proposed solution

Google Pub/Sub being a message broker can receive this message from the order service, and act as a relay between the order service and the other services (delivery, notification). Pub/Sub has message retry policies that help curb missing orders due to service downtime/failure. It also makes it possible for teams to add more services to the application stack and still keep the entire system in rhythm by making it possible for new and existing services to subscribe to one or more topics.

Building the publisher and subscriber

In this section, we will build out our publisher and subscribers (push and pull) logic which will contain three entry point files — orders.js (publisher), delivery-sub.js (subscriber A), and notification-sub.js (subscriber B). These three files signify our three services in our microservice architecture. Each service has their respective routes and controllers but shares the same pub-sub repository. The pub-sub repository houses re-usable pub-sub functions for publishing messages and receiving published messages. A pictorial representation of the boilerplate branch can be seen below:

project directory structure

Pub/Sub repository

This is where we define all the functions that’ll enable us to carry out all our Pub/Sub related tasks e.g publishing a message and listening for push or pull subscriptions:

// src/repositories/pub-sub-repo.js

module.exports = {
    publishMessage: async (pubSubClient, topicName, payload) => {
        const dataBuffer = Buffer.from(JSON.stringify(payload));
        const messageId = await pubSubClient.topic(topicName).publish(dataBuffer);
        console.log(`Message ${messageId} published.`);
        return messageId;
    }
 ...
};

The snippet above shows the publishMessage function which accepts three parameters, the pubSubClienttopicName, and payload. This function serializes the JSON payload into a buffer (which is the Pub/Sub desired message format) then publishes it to the specified topic on execution:

// src/repositories/pub-sub-repo.js

module.exports = {
  ...

  listenForPullMessages: (pubSubClient, subscriptionName,  timeout) => {
      const subscription = pubSubClient.subscription(subscriptionName);
      let messageCount = 0;
      const messageHandler = message => {
          console.log(`Received message ${message.id}:`);
          console.log(`\tData: ${message.data}`);
          console.log(`\tAttributes: ${message.attributes}`);
          messageCount += 1;
          message.ack();
      };
      subscription.on('message', messageHandler);
      setTimeout(() => {
          subscription.removeListener('message', messageHandler);
          console.log(`${messageCount} message(s) received.`);
      }, timeout * 1000);
  }

  ...

};

The snippet above shows a subscriber function that pulls messages broadcasted to a subscription tied to a topic. On execution, this function listens to messages fanned-out from the publisher for t * 1000ms ( if t=60; the listener will listen for 60 seconds which equals 1 minute):

// src/repositories/pub-sub-repo.js

module.exports = {
  ...

  listenForPushMessages: (payload) => {
    const message = Buffer.from(payload, 'base64').toString(
        'utf-8'
    );
    let parsedMessage = JSON.parse(message);
    console.log(parsedMessage);
    return parsedMessage;
  }

  ...

}

The snippet above accepts a message from a configured subscriber endpoint and parses the buffer into JSON format for consumption by the individual subscribers.

Building the publisher

The base logic for our publisher lies in the src/controllers/orders-controllers.js file. This acts as the orders service which accepts orders from users, processes the order and then sends a message to concerned services (delivery and notifications) notifying them of a new order:

// src/controllers/orders-controller.js

const { PubSub } = require("@google-cloud/pubsub");
const pubsubRepository = require("../repositories/pub-sub-repo");

const pubSubClient = new PubSub();
const topicName = "orders_topic";
const { publishMessage } = pubsubRepository;

module.exports = {
    ...

    createOrders: async (req, res) => {
        let ordersObj = req.body;
        let messageId = await publishMessage(pubSubClient, topicName, ordersObj);
        return res.status(200).json({
            success: true,
            message: `Message ${messageId} published :)`
        })
    }

};

The snippet above shows the createOrders method which accepts the orders request body and publishes the object to the orders_topic.

Building the subscribers

There are two subscribers that represent two standalone services — delivery and notifications. We will be building the delivery service alone in this section because the same steps can be recreated for notifications service:

// src/controllers/delivery-controller.js

const { PubSub  } = require("@google-cloud/pubsub");
const pubSubClient = new PubSub();
const subscriptionName = "delivery_sub";
const timeout = 60;
const pubsubRepository = require("../repositories/pub-sub-repo");
const { listenForPullMessages, listenForPushMessages } = pubsubRepository;
module.exports = {
    ...

    pullDelivery: (req, res) => {
        try {
            listenForPullMessages(pubSubClient, subscriptionName, timeout);            
        } catch (error) {
            return res.status(500).json({
                success: false,
                message: "Couldn't receive orders object :(",
                data: error
            })                        
        }

    },
    pushDelivery: async (req, res) => {
        try {
            let messageResponse = await listenForPushMessages(req.body.message.data);
            return res.status(200).json({
                success: true,
                message: "Message received successfully :)",
                data: messageResponse
            })

        } catch (error) {
            return res.status(500).json({
                success: false,
                message: "Couldn't receive orders object :(",
                data: error
            })                        
        }
    }
};

The pullDelivery function executes the listenForPushMessagesfunction from the pubsubRepository which accepts three arguments; the pubSubClient, the name of the subscription (the notifications service will be called notifications_sub), and a timeout of 60 seconds.

The pushDelivery function, since it’s a webhook, accepts the message gotten from pub/sub as an argument to the listenForPushMessages function for deserialization into JSON.

Connecting the dots

In this section, we’ll head over to our Google Cloud Console to create a topic and subscribers. We will also go learn how to run our three services respectively for a proof of concept.

To create a topic we will navigate to the Pub/Sub section on the cloud console and create the orders_topic like so:

create orders topic

After you click on create topic, you’ll be routed to the newly created orders_topic page where you’ll create a subscription as seen below:

order topic page

HTTPS_URL represents the host URL where our delivery service is hosted on. Pub/Sub requires all push endpoints to be deployed on HTTPS. For our notifications service, we will just repeat the step above and replace Subscription ID with “notification_sub” and Endpoint URL with {{HTTPS_URL}}/api/delivery/push.

To get a {{HTTPS_URL}} we will be deploying our subscribers with ngrok in the next section

Running our services

To demonstrate three micro-services we created three entry-points (orders.js, notifications-sub.js, and delivery-sub.js) in our project directory as opposed to having just on app.js file.

These files have already been created and bootstrapped in our project repository. Below is how our orders services is defined:

// src/orders.js

const express = require('express');
const morgan = require('morgan');
const bodyParser = require('body-parser');
require('dotenv').config();
const app = express();
const ordersRoute = require('./routes/orders');
const helmet = require('helmet');
const cors = require('cors');
app.use(bodyParser.urlencoded({ extended: false }));
app.use(bodyParser.json());
const { MAIN_PORT, NODE_ENV } = process.env;
NODE_ENV !== "production" ? app.use(morgan('dev')) : app.use(morgan('combined'));
app.use(helmet());
app.use(cors());
app.use('/api/orders', ordersRoute);


app.listen(MAIN_PORT);
if (NODE_ENV !== "production" ) {
    console.log(`Orders service is running at http://localhost:${MAIN_PORT}`);
}

If you’re still in the boilerplate branch, please switch to the master branch like so by running the command below

$ git fetch origin master && git checkout master

Before we run our application, we need to create our .env file and copy in our service account key into our project directory. The .env file should look something like this:

MAIN_PORT=8001
PORT_1=8002
PORT_2=8003

GCP_PROJ_ID=PROJECT_ID
GOOGLE_APPLICATION_CREDENTIALS=FILENAME.json

Where PROJECT_ID = GCP project id and FILENAME = service account file name as both created in the service account setup section.

Now that this is out of the way, let’s set up six terminals to run our three services concurrently. On a Mac machine using iTerm2, you can create six terminals by splitting a single-window into horizontal halves using CMD+SHIFT+D. Then split each horizontal halve into three places vertically by using CMD+D twice on each horizontal halve. If all goes well your terminal should look like the image below:

iTerm2 to create six terminals

Next up we will run our services locally in the upper half of the terminal by running the following commands on each section like so:

//upper terminal 1 (order service)
$ yarn start:main
//upper terminal 2 (delivery service)
$ yarn start:delivery
//upper terminal 3 (notification service)
$ yarn start:notification

Then in the lower half, we will provision public URLs for our localhost servers using ngrok by running the following commands in each section like so:

//upper terminal 1 (order service)
$ ngrok http 8001
//upper terminal 2 (delivery service)
$ ngrok http 8002
//upper terminal 3 (notification service)
$ ngrok http 8003

Running the commands in the snippet above should give you a terminal that looks like the image below:
six terminals after ngrok commands are run

A sample order request and response on the orders service can be seen in the image below:

Conclusion

In this tutorial, we’ve learned what Cloud Pub/Sub is and how to build a simple case to demonstrate its usage in a service-oriented/microservice architecture. Should you want to get more information on the Cloud Pub/Sub, you can visit the official documentation. For more Pub/Sub related content, you can check out the Pub/Sub Made Easy series on YouTube.

The source code for this tutorial is available on GitHub as well. Feel free to clone it, fork it, or submit an issue.

Comments

Popular posts from this blog

How to use Ngx-Charts in Angular ?

Charts helps us to visualize large amount of data in an easy to understand and interactive way. This helps businesses to grow more by taking important decisions from the data. For example, e-commerce can have charts or reports for product sales, with various categories like product type, year, etc. In angular, we have various charting libraries to create charts.  Ngx-charts  is one of them. Check out the list of  best angular chart libraries .  In this article, we will see data visualization with ngx-charts and how to use ngx-charts in angular application ? We will see, How to install ngx-charts in angular ? Create a vertical bar chart Create a pie chart, advanced pie chart and pie chart grid Introduction ngx-charts  is an open-source and declarative charting framework for angular2+. It is maintained by  Swimlane . It is using Angular to render and animate the SVG elements with all of its binding and speed goodness and uses d3 for the excellent math functio...

Understand Angular’s forRoot and forChild

  forRoot   /   forChild   is a pattern for singleton services that most of us know from routing. Routing is actually the main use case for it and as it is not commonly used outside of it, I wouldn’t be surprised if most Angular developers haven’t given it a second thought. However, as the official Angular documentation puts it: “Understanding how  forRoot()  works to make sure a service is a singleton will inform your development at a deeper level.” So let’s go. Providers & Injectors Angular comes with a dependency injection (DI) mechanism. When a component depends on a service, you don’t manually create an instance of the service. You  inject  the service and the dependency injection system takes care of providing an instance. import { Component, OnInit } from '@angular/core'; import { TestService } from 'src/app/services/test.service'; @Component({ selector: 'app-test', templateUrl: './test.component.html', styleUrls: ['./test.compon...

How to solve Puppeteer TimeoutError: Navigation timeout of 30000 ms exceeded

During the automation of multiple tasks on my job and personal projects, i decided to move on  Puppeteer  instead of the old school PhantomJS. One of the most usual problems with pages that contain a lot of content, because of the ads, images etc. is the load time, an exception is thrown (specifically the TimeoutError) after a page takes more than 30000ms (30 seconds) to load totally. To solve this problem, you will have 2 options, either to increase this timeout in the configuration or remove it at all. Personally, i prefer to remove the limit as i know that the pages that i work with will end up loading someday. In this article, i'll explain you briefly 2 ways to bypass this limitation. A. Globally on the tab The option that i prefer, as i browse multiple pages in the same tab, is to remove the timeout limit on the tab that i use to browse. For example, to remove the limit you should add: await page . setDefaultNavigationTimeout ( 0 ) ;  COPY SNIPPET The setDefaultNav...