In this article, we will how to implement saga pattern in nodejs microservices. Implementing Saga Pattern in Nodejs Microservices.
Firstly, what is a saga pattern in microservices and why kind of problem that it solves.
Saga Pattern
Let's try to understand the saga pattern with an example. consider an application like Asana where a project contains a task and task contains a subtask.
project details will be in project service and task and subtask will be in subtask service with each service has their own database.
What happens to Task and Subtask service when user deletes the Project. How can you maintain the data consistency across all services.
That is to say, Saga pattern solves the problem of data consistency across different services.
Saga Pattern types
There are two kind of methods that are used in saga pattern. they are,
Orchestration-based Saga
Orchestration based saga is a pattern method where a orchestrator service maintains the communication(command/reply) between services.
So, it helps to mains the data consistency across services.
Choreography-Based Saga
In this method, there is no central orchestrator. each services will have a command/reply events. so, for every reply, it will update the database consistently.
Implementation in Nodejs Microservices
Mainly, we will see an example of nodejs microservices where data consistency is a crucial part.
Complete Source code can be found here
Note : This Code is just to demonstrate how saga pattern will work in nodejs microservices. it doesn't have all the business logics on services. Feel free to complete the service if you are interested. (PR's are always welcome)
In this example, we have an e-commerce application. it contains order service, payment service and stock service.
whenever user places an order, we need to implement the complete flow of order, payment and delivery of items which involves order service, payment service and stock service.
Here, data consistency place a crucial. Let's see how to implement it Orchestator based saga pattern.
Structure
- KafkaBroker - it contains all the kafka producer, consumer and routes logic. All the services will be using this to publish and receive events from kafka.
- orchestatorService - it contains all the logics to implement the orchestration of saga pattern.
- orderService - this service will handle all the order business logics.
- paymentService - it will handles all the payment business logics.
we will be using kafka-node for kafka communication in nodejs. if you prefer kafkajs, feel free to do that.
KafkaBroker
If you are new to kafka, read thisarticle to get a good grasp of it.
Create a directory kafkaHandlerinside kafkaBootstrap. Here, we are going to create producer and consumer logics for kafka.
After that, create a file called producer.js and add the following code
1const Kafka = require("kafka-node")23const Producer = Kafka.Producer4const client = new Kafka.KafkaClient()56// For creating Topics.7// Only admins were able to create topics8const admin = new Kafka.Admin(client)910let producer1112let producerReady1314const bindListeners = function bindListeners() {15 producerReady = new Promise((resolve, reject) => {16 producer.on("ready", () => {17 console.log("producer ready")18 resolve(producer)19 })2021 producer.on("error", err => {22 console.log("producer err", err)23 reject(err)24 })25 })26}2728const initializeProducer = () => {29 producer = new Producer(client)3031 bindListeners()32}3334/*35 * A Higher level producer which sends a message to a particular topic36 */37const ProducerService = function ProducerService() {38 initializeProducer()39}4041/*42 * Sends a message from the kafka instance43 **/44ProducerService.prototype.produce = function produce(45 topic,46 messages,47 partition = 048) {49 // Returns data if producer success50 return producerReady.then(producer => {51 const payload = [{ topic, messages, partition }]52 return new Promise((resolve, reject) => {53 producer.send(payload, function(err, data) {54 if (err) {55 console.log("Error while producing data in this service")56 reject(err)57 }58 resolve(data)59 })60 })61 })62}6364ProducerService.prototype.createTopic = function createTopic(topics) {65 return producerReady.then(producer => {66 return new Promise((resolve, reject) => {67 producer.createTopics(topics, (err, res) => {68 if (err) {69 console.log("Error while creating a topic")70 reject(err)71 }7273 console.log("Topics created successfully")74 resolve(res)75 })76 })77 })78}7980module.exports = ProducerService
Here, we have few methods on the producer. they are,
- Initializing the producer by binding on ready and on errorcall back functions.
- produce method that takes the topic and message and send the message to the specified topic.
- createTopic method that creates a topic if not exists.
create a file called Consumer.js and add the following code
1const kafkaNode = require("kafka-node")23const client = new kafkaNode.KafkaClient()4const offset = new kafkaNode.Offset(client)56const Consumer = kafkaNode.Consumer78let consumer910let consumerReady1112var defaultOptions = {13 encoding: "utf8", // default is utf8, use 'buffer' for binary data14 fromOffset: -1, // default,15 autoCommit: true,16}1718const bindEventListeners = function bindEventListeners(options, topic) {19 consumerReady = new Promise((resolve, reject) => {20 try {21 consumer = new Consumer(client, [], options)22 consumer.on("error", err => {23 console.log(`Error occured on consumer group ${topic}`)24 })25 resolve(consumer)26 } catch (e) {27 reject(e)28 }29 })30}3132const initializeConsumer = function initializeConsumer(defaultTopic) {33 const options = defaultOptions3435 bindEventListeners(options, defaultTopic)36}3738const ConsumerService = function ConsumerService(defaultTopic) {39 console.log("initializing consumer ")40 initializeConsumer(defaultTopic)41}4243ConsumerService.prototype.addTopics = function addTopics(topicArray) {44 return new Promise((resolve, reject) => {45 consumerReady46 .then(consumer => {47 console.log("adding topics ", topicArray)48 consumer.addTopics(topicArray, function(err, added) {49 console.log("topics added ", err, added)50 resolve(added)51 })52 })53 .catch(e => {54 console.log("errror while creating topic ", e)55 })56 })57}5859ConsumerService.prototype.consume = function consume(cb) {60 consumerReady61 .then(consumer => {62 console.log("consumer ready")63 consumer.on("message", message => {64 // console.log('recieved message ', message);65 cb(message)66 })67 })68 .catch(e => {69 console.log("errror while consuming", e)70 })71}7273module.exports = ConsumerService
Here, we have few methods for consumer. they are,
- Initializing the consumer by binding on ready and on errorcall back functions.
- addTopic method will add the topic for the consumer to consume.
- consume method will receive the message from producer and sends it to callback
After that, create a file called kafkaBootstrap.js and add the following code
1const kafka = require("kafka-node")23const Producer = require("../kafkaBroker/kafkaHandler/Producer")45const producer = new Producer()67const topics = [8 { topic: "ORDER_SERVICE", partitions: 1, replicationFactor: 1 },9 { topic: "PAYMENT_SERVICE", partitions: 1, replicationFactor: 1 },10 { topic: "STOCK_SERVICE", partitions: 1, replicationFactor: 1 },11 { topic: "ORCHESTATOR_SERVICE", partitions: 1, replicationFactor: 1 },12]1314producer15 .createTopic(topics)16 .then(res => {})17 .catch(err => {18 console.log(`Error ${err}`)19 })
Here, we create a topic if not exists, run these code for the first time to create topics.
Order Service
- Controller - it handles the request and business logics.
- eventHandler - it helps to handle all the kafka messages and maps it with business logics.
- Model - it contains all the database models.
After that, create a file app.js and add the following code
1const express = require("express")2const bodyParser = require("body-parser")3const mongoose = require("mongoose")45const Consumer = require("../../kafkaBroker/kafkaHandler/Consumer")6const eventHandler = require("./eventHandler")7const CreateOrder = require("./Controller/createOrder")8const app = express()910app.use(bodyParser.json())11app.use(bodyParser.urlencoded({ extended: false }))1213mongoose14 .connect("mongodb://localhost:27017/orderdb", {15 useNewUrlParser: true,16 useUnifiedTopology: true,17 })18 .then(data => {19 app.post("/createorder", CreateOrder)2021 const PORT = 30002223 app.listen(PORT, () => {24 console.log("server is running on port 3000")25 })2627 const consumer = new Consumer()2829 consumer.addTopics(["ORDER_SERVICE", "SERVICE_REPLY"]).then(() => {30 consumer.consume(message => {31 console.log("consumed message", message)32 eventHandler(JSON.parse(message))33 })34 })35 })36 .catch(err => {37 console.log(`Error in Mongo Connection ${err}`)38 })
Here, we setup a mongodb connection and add topics to kafka consumer of order service.
Once it consumes the message, eventHandlers takes those message and performs some business logics.
Further, create a file createOrder.jsin Controller and add the following code,
1const uuidv1 = require("uuid/v1")23const OrderModel = require("../Model/orderModel")4const Producer = require("../../../kafkaBroker/kafkaHandler/routes")5const CreateOrder = async (req, res) => {6 try {7 const name = req.body.name8 const itemCount = req.body.itemCount9 const amount = req.body.amount1011 const order = await new OrderModel({12 name: name,13 itemCount: itemCount,14 transactionId: uuidv1(),15 status: "PENDING",16 })1718 await order.save()1920 res.send(order)2122 Producer({23 topic: "ORDER_CREATION_TRANSACTIONS",24 type: "ORDER_CREATED",25 payload: {26 data: {27 id: order._id,28 transactionId: order.transactionId,29 amount: amount,30 },31 },32 })33 } catch (e) {34 console.log(e)35 }36}37module.exports = CreateOrder
Mainly, controller takes the request and insert the data into database. once it does that, it will sends that data to kafka producer by initiating the create order transaction.
Orchestator service
Like said, Main purpose of having orchestator service to orchestrate the command and reply.
Mainly, Everything is a Transaction here. For every transactions, it will orchestrate the status across different services.
Firstly, create a file bootstrap.jsand add the following code,
1const Consumer = require("../../kafkaBroker/kafkaHandler/Consumer")2const Transactions = require("./Transactions")3try {4 const consumer = new Consumer()56 consumer.addTopics(["ORCHESTATOR_SERVICE"]).then(() => {7 consumer.consume(message => {8 console.log("consumed message", message)9 Transactions(JSON.parse(message.value))10 })11 })1213 console.log("Orchestator Started successfully")14} catch (e) {15 console.log(`Orchestrator Error ${e}`)16}
create a file orderCreationTransactions.js and add the following code,
1const Producer = require("../../../kafkaBroker/kafkaHandler/routes")23module.exports = message => {4 switch (message.type) {5 case "ORDER_CREATED":6 Producer({7 topic: "EXECUTE_PAYMENT",8 payload: {9 data: message.payload.data,10 },11 })12 break13 case "PAYMENT_COMPLETED_STATE":14 Producer({15 topic: "",16 payload: {17 data: message.payload.data,18 },19 })20 default:21 break22 }23}
Once, it receives the state. orchestator will directs the transaction state to appropriate services.
Payment Service
Once payment service receives a command from orchestator service. it will do the business logic and updates the status to orchestaor servcice.
based on the status, it will perform the further actions to respective services.
add the following code in app.js
1const Consumer = require("../../kafkaBroker/kafkaHandler/Consumer")2const eventHandler = require("./eventHandler")3try {4 const consumer = new Consumer()56 consumer.addTopics(["PAYMENT_SERVICE"]).then(() => {7 consumer.consume(message => {8 console.log("consumed message", message)9 eventHandler(JSON.parse(message.value))10 })11 })1213 console.log("Payment service Started Successfully")14} catch (e) {15 console.log(`Orchestrator Error ${e}`)16}
It add the topics and when a message is received. it will send it to eventhandler.
executePayment.js
1const Producer = require("../../../kafkaBroker/kafkaHandler/routes")2module.exports = data => {3 /** Database Layer Logic Comes Here */4 try {5 console.log("data", data)6 Producer({7 topic: "ORDER_CREATION_TRANSACTIONS",8 type: "PAYMENT_COMPLETED_STATE",9 payload: {10 transactionId: data.transactionId,11 },12 })13 } catch (e) {14 console.log(e)15 }16}
Summary
In Conclusion, maintaining microservice that implement saga pattern will be a bit complex. but, it is worth to solve the problem using saga pattern.
Comments
Post a Comment