Having fun with nodejs child processes
When using nodejs child processes you can enable an IPC channel (Inter-process communication). This is great because it gives you the ability to exchange informations between sub processes.
With nodejs we often talk about micro-services, and I think that most of the time those micro-services can be represented in micro-processes, especially those with an asynchronous behaviour.
A few weeks ago I’ve studied node internals and IPC and I’ve tried to ease this process implementation with relieve. While doing this, I’ve learned that:
- forking is slow, and so, interacting with processes that you launch once is more effective
- the default IPC implementation lacks of simplicity (that’s why IPCEE exists)
- when the master dies, if forks are detached, it’s not possible to re-attach the child (actually the IPC channel uses streams so it should be possible somehow)
# Example: Emailing from a child process
We often do mailing when building a website. A mail can take some time before it has been sent. It can also fail, and try to mail again a few seconds after.
Normally, you’ll :
- handle the request (get message, object, recipient)
- validate the emitter/recipient email address, the message length etc.
- format from markdown to html
- send the mail
- give a 200 response to the client after everything went alright
That said, we don’t want the HTTP server to wait until the mail has been sent before giving a response. Later, if an error occurs, the response could be stored in a database, sent in real time through websockets, or even sent through a new mail.
To do so, we can :
- handle the request (get message, object, recipient)
- validate the emitter/recipient email address, the message length etc.
- send data to a sub-process that handles the mailing part
- give a 202 response (202 Accepted but still processing)
The sub process will handle the formatting, mailing and the notification We will handle the sub process with relieve.
Let’s start with a mailing Task:
//tasks/mail.js
'use strict'
const nodemailer = require('nodemailer')
const transporter = nodemailer.createTransport()
const markdown = require('nodemailer-markdown').markdown
transporter.use('compile', markdown({useEmbeddedImages: true}))
function Mail() { this.channel = null }
/**
* This sets the channel the server sent us when the task starts
*/
Mail.prototype.setChannel = c => this.channel = c
Mail.prototype.sendMail = (from, to, subject, body) => {
transporter.sendMail({
from: from,
to: to,
subject: subject,
markdown: body
}, function(error, info) {
if(error){
return channel.send('error', {message: error.message, code: error.code})
}
channel.send('info', {message: 'Mail sent'})
})
}
module.exports = Mail
I’m using the famous nodemailer to send the email, then I notify something through the channel. The channel is an IPCEE instance which is automagically set by the CallableTask (see below).
Now, we can bind this to our HTTP server which handles the request. To keep this readable I’m skipping the server configuration.
We assume it’s a contact page, where the recipient is ourselves. Here’s the route (using express and express-validator):
//router.js
'use strict'
app.post('/contact', function(req, res) {
req.checkBody('emitter', 'Invalid emitter email')
.notEmpty().isEmail()
req.checkBody('body', 'Invalid body (must be at least 30 characters long)')
.notEmpty().isLength([30])
let errors = req.validationErrors();
if (errors) {
return res.status(400).send('There have been validation errors: ' + util.inspect(errors))
}
if(!req.body.subject)
req.body.subject = 'No subject'
//here we have to send the mail
res.status(202)
})
Sending the mail will be done through the previous task. To do so, I’ll set up a worker with this task:
//worker.js
'use strict'
const relieve = require('relieve')
const CallableTask = relieve.tasks.CallableTask
const Worker = relieve.workers.CloudWorker
let worker = new Worker()
let task = new CallableTask('tasks/mail.js', {restart: true})
task.name = 'mail'
worker.add(task)
//listening on the worker will listen on every tasks it holds
worker.on('info', data => console.log(data))
worker.on('error', data => console.error(data))
module.exports = worker
As you’d notice, I use the CloudWorker and a CallableTask. The CallableTask allows me to call a method of my Task (here Mail.sendMail). There are two ways to call a method from a CallableTask:
task.get('myMethod', args...)
will return a promise that resolves when the method resolvestask.call('myMethod', args...)
will return a promise that resolves when the message is propagated. It’ll not wait for an actual answer.
The CloudWorker is a Worker that is directed by a WeightStrategy. This means that if I add 4 tasks (ie 4 child processes), and that I call worker.call('myMethod', args...)
, it’ll take the one that is the less busy.
Note that I set the restart: true
option so that if my Mail task fails, it’ll be restarted by the worker.
Let’s add a few mailing processes to the worker just for fun:
//worker.js
'use strict'
const relieve = require('relieve')
const CallableTask = relieve.tasks.CallableTask
const Worker = relieve.workers.CloudWorker
let worker = new Worker()
let i = 0
let len = 4
for (; i < len; i++) {
let task = new CallableTask('tasks/mail.js', {restart: true})
task.name = 'mail'+i
worker.add(task)
}
worker.on('info', data => console.log(data))
worker.on('error', data => console.error(data))
module.exports = worker
Now, put the router and the worker together:
//router.js
'use strict'
const worker = require('./worker.js')
app.post('/contact', function(req, res) {
req.checkBody('emitter', 'Invalid emitter email')
.notEmpty().isEmail()
req.checkBody('body', 'Invalid body (must be at least 30 characters long)')
.notEmpty().isLength([30])
let errors = req.validationErrors();
if (errors) {
return res.status(400).send('There have been validation errors: ' + util.inspect(errors))
}
if(!req.body.subject)
req.body.subject = 'No subject'
worker.call('sendMail', req.body.emitter, 'me@example.com', req.body.subject, req.body.body)
res.status(202)
})
# Sending data from the child process to the client
In the previous example, Worker is listening on info
and error
events. When an event is emitted from the Task, the Worker can then transfer the data to the client (using a Socket for example).
What’s nice with IPC is that it allows to transfer javascript instances on top of default strings.
This gives the ability to send anet.Socket
to our child process! By doing this, the Task (child) will be able to send a result to the client without using the Worker (master).
Here’s an example:
//worker.js
'use strict'
var relieve = require('../../index.js')
var CallableTask = relieve.tasks.CallableTask
var Worker = relieve.workers.CloudWorker
var worker = new Worker()
let i = 0
let len = 4
for (; i < len; i++) {
let task = new CallableTask(__dirname + '/task.js', {restart: true})
task.name = 'task'+i
worker.add(task)
}
worker.run()
var server = net.createServer()
server.on('connection', function(socket) {
worker.send('socket', socket)
.then(function(task) {
task.call('doHeavyStuff', 1e6)
})
})
server.listen(function() {
console.log('server listening on %j', server.address());
})
And now we can handle sockets:
//task.js
'use strict'
var socks = []
module.exports = {
setChannel: function(channel) {
channel.on('socket', function(socket) {
socks.push(socket)
})
},
doHeavyStuff: function(num) {
let sock = socks.shift()
//doing heavy stuff
sock.write(`Some answer`)
sock.end()
}
}
On connection on the server, the socket is sent to the Task. Once the message is delivered the worker asks the Task to doHeavyStuff
.
When the Task is done, it writes an answer directly to the socket and closes it.
# A Word
It was really fun to write relieve. I’m using it in a real life use case to handle plugins.
Say you have an application that needs to compress/decompress data in/from a directory. Typically, you would have two modules, two tasks. Your server does not need to handle the long process of compressing, it can just dedicate this to another service (which can be a module), and get the answer when it’s done. If the services crashes it can be handled by the worker. The opposite will only work when {detached: true}
(see here) is set.
The forcefulness of ipc is really amazing when you see it in action! You can find more use cases on the git repository!