Skip to main content

Delegating Work Using NodeJS and AMQP

 This past weekend I popped over to Goa to chill, take my mind off work, and get in some good food, fresh air, and general beach-side relaxo. I’d not been here a day when I started thinking about message-queues and simple task delegation, so I decided to write a small library, which I have called amp-delegate that simplifies to the point of triviality, the creation and use of AMQP remote workers.

npm install amqp-delegate

An example:

worker.js

const { makeWorker } = require('amqp-delegate')

// waits for 10ms then adds two numbers.

const task = (a, b) =>

  new Promise(resolve =>

    setTimeout(() =>

      resolve(a + b), 10))

const worker = makeWorker({

  name: 'adder',

  task

})

worker.start()

delegate.js

const { makeDelegate } = require('amqp-delegate')

const delegator = makeDelegator()

delegator

  .start()

  .then(() => delegator.invoke('adder', 10, 15))

  .then(result => {

    console.log('result', result)

  })

  .catch(err => {

    console.error('caught', err)

  })

Image for post

You can run as many of the workers as you like, and, because they are all asynchronous, they can do things like scrape web pages, or interact with databases, or whatever you like. The first named worker to pick up the job will do it.

How does it work?

It’s just a simple implementation of the standard remote procedure call pattern, but with the details of the message-queue interactions wrapped up in a simple high-level interface so you don’t have to see, or understand them.

Workers

When you first make a worker you give it a name and a task to perform. The name can be any string, and the task can be any pure asynchronous function that accepts simple parameters and returns a simple result. By simple I mean parameters that can be marshalled into a JSON string and converted to a buffer, buffers being AMQP’s perferred message content format.

When you start a worker it connects to your AMQP server, then starts listening to a queue with the same name as the worker. When it hears a message on that queue it grabs it, racing any other workers with the same name, decodes the message content into an array of parameters, and passes those parameters to the worker’s task.

When the task is complete it marshals the result into JSON, turns it into a buffer, and sends it back to the caller using the incoming requests’ replyTo queue, and supplying the incoming request’s correlationId. This way the delegator knows that this message is the reply to its request and not some other random message on the same queue.

* makeWorker({ url, name, task, onError, onClose }) // => worker

* async worker.start()

* async worker.stop()

Delegators

When you start a delegator you don’t need to tell it much of anything. It just connects to the AMQP server and waits for you to invoke remote workers by name. When you call delegator.invoke you give it the name of the worker you want to invoke, and the params you want the worker to use when performing its task. Then you just await the result.

The delegator marshals the parameters into JSON and then into a buffer, creates a unique correlationId, and starts listening for the message with the right correlationId in the queue’s replyTo queue.

* makeDelegator({ url, onError, onClose }) // => delegator

* async delegator.start()

* async delegator.invoke(name, ...params) // => result

* async delegator.stop()

Error handling

Both a worker and a delegator can be supplied onClose and onError hooks that your code can use to handle error conditions gracefully. Other than that you can use the standard try / catch wrappers around your awaits, or, if you are using promises, use the standard catch handler.


Comments

Popular posts from this blog

How to use Ngx-Charts in Angular ?

Charts helps us to visualize large amount of data in an easy to understand and interactive way. This helps businesses to grow more by taking important decisions from the data. For example, e-commerce can have charts or reports for product sales, with various categories like product type, year, etc. In angular, we have various charting libraries to create charts.  Ngx-charts  is one of them. Check out the list of  best angular chart libraries .  In this article, we will see data visualization with ngx-charts and how to use ngx-charts in angular application ? We will see, How to install ngx-charts in angular ? Create a vertical bar chart Create a pie chart, advanced pie chart and pie chart grid Introduction ngx-charts  is an open-source and declarative charting framework for angular2+. It is maintained by  Swimlane . It is using Angular to render and animate the SVG elements with all of its binding and speed goodness and uses d3 for the excellent math functio...

Understand Angular’s forRoot and forChild

  forRoot   /   forChild   is a pattern for singleton services that most of us know from routing. Routing is actually the main use case for it and as it is not commonly used outside of it, I wouldn’t be surprised if most Angular developers haven’t given it a second thought. However, as the official Angular documentation puts it: “Understanding how  forRoot()  works to make sure a service is a singleton will inform your development at a deeper level.” So let’s go. Providers & Injectors Angular comes with a dependency injection (DI) mechanism. When a component depends on a service, you don’t manually create an instance of the service. You  inject  the service and the dependency injection system takes care of providing an instance. import { Component, OnInit } from '@angular/core'; import { TestService } from 'src/app/services/test.service'; @Component({ selector: 'app-test', templateUrl: './test.component.html', styleUrls: ['./test.compon...

How to solve Puppeteer TimeoutError: Navigation timeout of 30000 ms exceeded

During the automation of multiple tasks on my job and personal projects, i decided to move on  Puppeteer  instead of the old school PhantomJS. One of the most usual problems with pages that contain a lot of content, because of the ads, images etc. is the load time, an exception is thrown (specifically the TimeoutError) after a page takes more than 30000ms (30 seconds) to load totally. To solve this problem, you will have 2 options, either to increase this timeout in the configuration or remove it at all. Personally, i prefer to remove the limit as i know that the pages that i work with will end up loading someday. In this article, i'll explain you briefly 2 ways to bypass this limitation. A. Globally on the tab The option that i prefer, as i browse multiple pages in the same tab, is to remove the timeout limit on the tab that i use to browse. For example, to remove the limit you should add: await page . setDefaultNavigationTimeout ( 0 ) ;  COPY SNIPPET The setDefaultNav...