Skip to main content

Delegating Work Using NodeJS and AMQP

 This past weekend I popped over to Goa to chill, take my mind off work, and get in some good food, fresh air, and general beach-side relaxo. I’d not been here a day when I started thinking about message-queues and simple task delegation, so I decided to write a small library, which I have called amp-delegate that simplifies to the point of triviality, the creation and use of AMQP remote workers.

npm install amqp-delegate

An example:

worker.js

const { makeWorker } = require('amqp-delegate')

// waits for 10ms then adds two numbers.

const task = (a, b) =>

  new Promise(resolve =>

    setTimeout(() =>

      resolve(a + b), 10))

const worker = makeWorker({

  name: 'adder',

  task

})

worker.start()

delegate.js

const { makeDelegate } = require('amqp-delegate')

const delegator = makeDelegator()

delegator

  .start()

  .then(() => delegator.invoke('adder', 10, 15))

  .then(result => {

    console.log('result', result)

  })

  .catch(err => {

    console.error('caught', err)

  })

Image for post

You can run as many of the workers as you like, and, because they are all asynchronous, they can do things like scrape web pages, or interact with databases, or whatever you like. The first named worker to pick up the job will do it.

How does it work?

It’s just a simple implementation of the standard remote procedure call pattern, but with the details of the message-queue interactions wrapped up in a simple high-level interface so you don’t have to see, or understand them.

Workers

When you first make a worker you give it a name and a task to perform. The name can be any string, and the task can be any pure asynchronous function that accepts simple parameters and returns a simple result. By simple I mean parameters that can be marshalled into a JSON string and converted to a buffer, buffers being AMQP’s perferred message content format.

When you start a worker it connects to your AMQP server, then starts listening to a queue with the same name as the worker. When it hears a message on that queue it grabs it, racing any other workers with the same name, decodes the message content into an array of parameters, and passes those parameters to the worker’s task.

When the task is complete it marshals the result into JSON, turns it into a buffer, and sends it back to the caller using the incoming requests’ replyTo queue, and supplying the incoming request’s correlationId. This way the delegator knows that this message is the reply to its request and not some other random message on the same queue.

* makeWorker({ url, name, task, onError, onClose }) // => worker

* async worker.start()

* async worker.stop()

Delegators

When you start a delegator you don’t need to tell it much of anything. It just connects to the AMQP server and waits for you to invoke remote workers by name. When you call delegator.invoke you give it the name of the worker you want to invoke, and the params you want the worker to use when performing its task. Then you just await the result.

The delegator marshals the parameters into JSON and then into a buffer, creates a unique correlationId, and starts listening for the message with the right correlationId in the queue’s replyTo queue.

* makeDelegator({ url, onError, onClose }) // => delegator

* async delegator.start()

* async delegator.invoke(name, ...params) // => result

* async delegator.stop()

Error handling

Both a worker and a delegator can be supplied onClose and onError hooks that your code can use to handle error conditions gracefully. Other than that you can use the standard try / catch wrappers around your awaits, or, if you are using promises, use the standard catch handler.


Comments

Popular posts from this blog

4 Ways to Communicate Across Browser Tabs in Realtime

1. Local Storage Events You might have already used LocalStorage, which is accessible across Tabs within the same application origin. But do you know that it also supports events? You can use this feature to communicate across Browser Tabs, where other Tabs will receive the event once the storage is updated. For example, let’s say in one Tab, we execute the following JavaScript code. window.localStorage.setItem("loggedIn", "true"); The other Tabs which listen to the event will receive it, as shown below. window.addEventListener('storage', (event) => { if (event.storageArea != localStorage) return; if (event.key === 'loggedIn') { // Do something with event.newValue } }); 2. Broadcast Channel API The Broadcast Channel API allows communication between Tabs, Windows, Frames, Iframes, and  Web Workers . One Tab can create and post to a channel as follows. const channel = new BroadcastChannel('app-data'); channel.postMessage(data); And oth...

Certbot SSL configuration in ubuntu

  Introduction Let’s Encrypt is a Certificate Authority (CA) that provides an easy way to obtain and install free  TLS/SSL certificates , thereby enabling encrypted HTTPS on web servers. It simplifies the process by providing a software client, Certbot, that attempts to automate most (if not all) of the required steps. Currently, the entire process of obtaining and installing a certificate is fully automated on both Apache and Nginx. In this tutorial, you will use Certbot to obtain a free SSL certificate for Apache on Ubuntu 18.04 and set up your certificate to renew automatically. This tutorial will use a separate Apache virtual host file instead of the default configuration file.  We recommend  creating new Apache virtual host files for each domain because it helps to avoid common mistakes and maintains the default files as a fallback configuration. Prerequisites To follow this tutorial, you will need: One Ubuntu 18.04 server set up by following this  initial ...

Working with Node.js streams

  Introduction Streams are one of the major features that most Node.js applications rely on, especially when handling HTTP requests, reading/writing files, and making socket communications. Streams are very predictable since we can always expect data, error, and end events when using streams. This article will teach Node developers how to use streams to efficiently handle large amounts of data. This is a typical real-world challenge faced by Node developers when they have to deal with a large data source, and it may not be feasible to process this data all at once. This article will cover the following topics: Types of streams When to adopt Node.js streams Batching Composing streams in Node.js Transforming data with transform streams Piping streams Error handling Node.js streams Types of streams The following are four main types of streams in Node.js: Readable streams: The readable stream is responsible for reading data from a source file Writable streams: The writable stream is re...