.Title

Having fun with nodejs child processes

When using nodejs child processes you can enable an IPC channel (Inter-process communication). This is great because it gives you the ability to exchange informations between sub processes.

With nodejs we often talk about micro-services, and I think that most of the time those micro-services can be represented in micro-processes, especially those with an asynchronous behaviour.

A few weeks ago I’ve studied node internals and IPC and I’ve tried to ease this process implementation with relieve. While doing this, I’ve learned that:

# Example: Emailing from a child process

We often do mailing when building a website. A mail can take some time before it has been sent. It can also fail, and try to mail again a few seconds after.

Normally, you’ll :

That said, we don’t want the HTTP server to wait until the mail has been sent before giving a response. Later, if an error occurs, the response could be stored in a database, sent in real time through websockets, or even sent through a new mail.

To do so, we can :

The sub process will handle the formatting, mailing and the notification We will handle the sub process with relieve.

Let’s start with a mailing Task:

//tasks/mail.js
'use strict'
const nodemailer = require('nodemailer')
const transporter = nodemailer.createTransport()
const markdown = require('nodemailer-markdown').markdown
transporter.use('compile', markdown({useEmbeddedImages: true}))

function Mail() { this.channel = null }

/**
* This sets the channel the server sent us when the task starts
*/
Mail.prototype.setChannel = c => this.channel = c

Mail.prototype.sendMail = (from, to, subject, body) => {
  transporter.sendMail({
    from: from,
    to: to,
    subject: subject,
      markdown: body
  }, function(error, info) {
    if(error){
      return channel.send('error', {message: error.message, code: error.code})
    }

    channel.send('info', {message: 'Mail sent'})
  })
}

module.exports = Mail

I’m using the famous nodemailer to send the email, then I notify something through the channel. The channel is an IPCEE instance which is automagically set by the CallableTask (see below).

Now, we can bind this to our HTTP server which handles the request. To keep this readable I’m skipping the server configuration.

We assume it’s a contact page, where the recipient is ourselves. Here’s the route (using express and express-validator):

//router.js
'use strict'
app.post('/contact', function(req, res) {
    req.checkBody('emitter', 'Invalid emitter email')
      .notEmpty().isEmail()
    req.checkBody('body', 'Invalid body (must be at least 30 characters long)')
      .notEmpty().isLength([30])

    let errors = req.validationErrors();
    if (errors) {
      return res.status(400).send('There have been validation errors: ' + util.inspect(errors)) 
    }

    if(!req.body.subject)
      req.body.subject = 'No subject'

    //here we have to send the mail

    res.status(202)
})

Sending the mail will be done through the previous task. To do so, I’ll set up a worker with this task:

//worker.js
'use strict'
const relieve = require('relieve')
const CallableTask = relieve.tasks.CallableTask
const Worker = relieve.workers.CloudWorker

let worker = new Worker()
let task = new CallableTask('tasks/mail.js', {restart: true})
task.name = 'mail'
worker.add(task)

//listening on the worker will listen on every tasks it holds
worker.on('info', data => console.log(data))
worker.on('error', data => console.error(data))

module.exports = worker

As you’d notice, I use the CloudWorker and a CallableTask. The CallableTask allows me to call a method of my Task (here Mail.sendMail). There are two ways to call a method from a CallableTask:

The CloudWorker is a Worker that is directed by a WeightStrategy. This means that if I add 4 tasks (ie 4 child processes), and that I call worker.call('myMethod', args...), it’ll take the one that is the less busy.

Note that I set the restart: true option so that if my Mail task fails, it’ll be restarted by the worker.

Let’s add a few mailing processes to the worker just for fun:

//worker.js
'use strict'
const relieve = require('relieve')
const CallableTask = relieve.tasks.CallableTask
const Worker = relieve.workers.CloudWorker

let worker = new Worker()
let i = 0
let len = 4
for (; i < len; i++) {
  let task = new CallableTask('tasks/mail.js', {restart: true})
  task.name = 'mail'+i
  worker.add(task)
}

worker.on('info', data => console.log(data))
worker.on('error', data => console.error(data))

module.exports = worker

Now, put the router and the worker together:

//router.js
'use strict'
const worker = require('./worker.js')

app.post('/contact', function(req, res) {
    req.checkBody('emitter', 'Invalid emitter email')
      .notEmpty().isEmail()
    req.checkBody('body', 'Invalid body (must be at least 30 characters long)')
      .notEmpty().isLength([30])

    let errors = req.validationErrors();
    if (errors) {
      return res.status(400).send('There have been validation errors: ' + util.inspect(errors)) 
    }

    if(!req.body.subject)
      req.body.subject = 'No subject'

    worker.call('sendMail', req.body.emitter, 'me@example.com', req.body.subject, req.body.body)

    res.status(202)
})

# Sending data from the child process to the client

In the previous example, Worker is listening on info and error events. When an event is emitted from the Task, the Worker can then transfer the data to the client (using a Socket for example).

What’s nice with IPC is that it allows to transfer javascript instances on top of default strings. This gives the ability to send anet.Socket to our child process! By doing this, the Task (child) will be able to send a result to the client without using the Worker (master).

Here’s an example:

//worker.js
'use strict'
var relieve = require('../../index.js')
var CallableTask = relieve.tasks.CallableTask
var Worker = relieve.workers.CloudWorker
var worker = new Worker()

let i = 0
let len = 4
for (; i < len; i++) {
  let task = new CallableTask(__dirname + '/task.js', {restart: true})
  task.name = 'task'+i
  worker.add(task)
}

worker.run()

var server = net.createServer()

server.on('connection', function(socket) {
  worker.send('socket', socket)
  .then(function(task) {
    task.call('doHeavyStuff', 1e6)
  })
})

server.listen(function() {
  console.log('server listening on %j', server.address());  
})

And now we can handle sockets:

//task.js
'use strict'
var socks = []

module.exports = {
  setChannel: function(channel) {
    channel.on('socket', function(socket) {
      socks.push(socket)
    })
  },
  doHeavyStuff: function(num) {
    let sock = socks.shift()
    //doing heavy stuff
    sock.write(`Some answer`)
    sock.end()
  }
}

On connection on the server, the socket is sent to the Task. Once the message is delivered the worker asks the Task to doHeavyStuff. When the Task is done, it writes an answer directly to the socket and closes it.

# A Word

It was really fun to write relieve. I’m using it in a real life use case to handle plugins. Say you have an application that needs to compress/decompress data in/from a directory. Typically, you would have two modules, two tasks. Your server does not need to handle the long process of compressing, it can just dedicate this to another service (which can be a module), and get the answer when it’s done. If the services crashes it can be handled by the worker. The opposite will only work when {detached: true} (see here) is set.

The forcefulness of ipc is really amazing when you see it in action! You can find more use cases on the git repository!