Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 607aa3a

Browse filesBrowse files
bnoordhuisrvagg
authored andcommitted
child_process: add callback parameter to .send()
Add an optional callback parameter to `ChildProcess.prototype.send()` that is invoked when the message has been sent. Juggle the control channel's reference count so that in-flight messages keep the event loop (and therefore the process) alive until they have been sent. `ChildProcess.prototype.send()` and `process.send()` used to operate synchronously but became asynchronous in commit libuv/libuv@393c1c5 ("unix: set non-block mode in uv_{pipe,tcp,udp}_open"), which landed in io.js in commit 07bd05b ("deps: update libuv to 1.2.1"). Fixes: #760 PR-URL: #2620 Reviewed-By: trevnorris - Trevor Norris <trev.norris@gmail.com> Reviewed-By: jasnell - James M Snell <jasnell@gmail.com>
1 parent 599d4f5 commit 607aa3a
Copy full SHA for 607aa3a

File tree

Expand file treeCollapse file tree

5 files changed

+111
-36
lines changed
Open diff view settings
Filter options
Expand file treeCollapse file tree

5 files changed

+111
-36
lines changed
Open diff view settings
Collapse file

‎doc/api/child_process.markdown‎

Copy file name to clipboardExpand all lines: doc/api/child_process.markdown
+14-9Lines changed: 14 additions & 9 deletions
  • Display the source diff
  • Display the rich diff
Original file line numberDiff line numberDiff line change
@@ -214,13 +214,15 @@ to a process.
214214

215215
See `kill(2)`
216216

217-
### child.send(message[, sendHandle])
217+
### child.send(message[, sendHandle][, callback])
218218

219219
* `message` {Object}
220220
* `sendHandle` {Handle object}
221+
* `callback` {Function}
222+
* Return: Boolean
221223

222224
When using `child_process.fork()` you can write to the child using
223-
`child.send(message, [sendHandle])` and messages are received by
225+
`child.send(message[, sendHandle][, callback])` and messages are received by
224226
a `'message'` event on the child.
225227

226228
For example:
@@ -246,11 +248,6 @@ And then the child script, `'sub.js'` might look like this:
246248
In the child the `process` object will have a `send()` method, and `process`
247249
will emit objects each time it receives a message on its channel.
248250

249-
Please note that the `send()` method on both the parent and child are
250-
synchronous - sending large chunks of data is not advised (pipes can be used
251-
instead, see
252-
[`child_process.spawn`](#child_process_child_process_spawn_command_args_options)).
253-
254251
There is a special case when sending a `{cmd: 'NODE_foo'}` message. All messages
255252
containing a `NODE_` prefix in its `cmd` property will not be emitted in
256253
the `message` event, since they are internal messages used by Node.js core.
@@ -261,8 +258,16 @@ The `sendHandle` option to `child.send()` is for sending a TCP server or
261258
socket object to another process. The child will receive the object as its
262259
second argument to the `message` event.
263260

264-
Emits an `'error'` event if the message cannot be sent, for example because
265-
the child process has already exited.
261+
The `callback` option is a function that is invoked after the message is
262+
sent but before the target may have received it. It is called with a single
263+
argument: `null` on success, or an `Error` object on failure.
264+
265+
`child.send()` emits an `'error'` event if no callback was given and the message
266+
cannot be sent, for example because the child process has already exited.
267+
268+
Returns `true` under normal circumstances or `false` when the backlog of
269+
unsent messages exceeds a threshold that makes it unwise to send more.
270+
Use the callback mechanism to implement flow control.
266271

267272
#### Example: sending server object
268273

Collapse file

‎doc/api/cluster.markdown‎

Copy file name to clipboardExpand all lines: doc/api/cluster.markdown
+3-1Lines changed: 3 additions & 1 deletion
  • Display the source diff
  • Display the rich diff
Original file line numberDiff line numberDiff line change
@@ -426,10 +426,12 @@ exit, the master may choose not to respawn a worker based on this value.
426426
// kill worker
427427
worker.kill();
428428

429-
### worker.send(message[, sendHandle])
429+
### worker.send(message[, sendHandle][, callback])
430430

431431
* `message` {Object}
432432
* `sendHandle` {Handle object}
433+
* `callback` {Function}
434+
* Return: Boolean
433435

434436
Send a message to a worker or master, optionally with a handle.
435437

Collapse file

‎lib/child_process.js‎

Copy file name to clipboardExpand all lines: lib/child_process.js
+3-7Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -48,16 +48,12 @@ exports._forkChild = function(fd) {
4848
var p = new Pipe(true);
4949
p.open(fd);
5050
p.unref();
51-
setupChannel(process, p);
52-
53-
var refs = 0;
51+
const control = setupChannel(process, p);
5452
process.on('newListener', function(name) {
55-
if (name !== 'message' && name !== 'disconnect') return;
56-
if (++refs === 1) p.ref();
53+
if (name === 'message' || name === 'disconnect') control.ref();
5754
});
5855
process.on('removeListener', function(name) {
59-
if (name !== 'message' && name !== 'disconnect') return;
60-
if (--refs === 0) p.unref();
56+
if (name === 'message' || name === 'disconnect') control.unref();
6157
});
6258
};
6359

Collapse file

‎lib/internal/child_process.js‎

Copy file name to clipboardExpand all lines: lib/internal/child_process.js
+72-19Lines changed: 72 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -397,6 +397,25 @@ function setupChannel(target, channel) {
397397
target._channel = channel;
398398
target._handleQueue = null;
399399

400+
const control = new class extends EventEmitter {
401+
constructor() {
402+
super();
403+
this.channel = channel;
404+
this.refs = 0;
405+
}
406+
ref() {
407+
if (++this.refs === 1) {
408+
this.channel.ref();
409+
}
410+
}
411+
unref() {
412+
if (--this.refs === 0) {
413+
this.channel.unref();
414+
this.emit('unref');
415+
}
416+
}
417+
};
418+
400419
var decoder = new StringDecoder('utf8');
401420
var jsonBuffer = '';
402421
channel.buffering = false;
@@ -446,7 +465,7 @@ function setupChannel(target, channel) {
446465
target._handleQueue = null;
447466

448467
queue.forEach(function(args) {
449-
target._send(args.message, args.handle, false);
468+
target._send(args.message, args.handle, false, args.callback);
450469
});
451470

452471
// Process a pending disconnect (if any).
@@ -478,14 +497,24 @@ function setupChannel(target, channel) {
478497
});
479498
});
480499

481-
target.send = function(message, handle) {
482-
if (!this.connected)
483-
this.emit('error', new Error('channel closed'));
484-
else
485-
this._send(message, handle, false);
500+
target.send = function(message, handle, callback) {
501+
if (typeof handle === 'function') {
502+
callback = handle;
503+
handle = undefined;
504+
}
505+
if (this.connected) {
506+
this._send(message, handle, false, callback);
507+
return;
508+
}
509+
const ex = new Error('channel closed');
510+
if (typeof callback === 'function') {
511+
process.nextTick(callback, ex);
512+
} else {
513+
this.emit('error', ex); // FIXME(bnoordhuis) Defer to next tick.
514+
}
486515
};
487516

488-
target._send = function(message, handle, swallowErrors) {
517+
target._send = function(message, handle, swallowErrors, callback) {
489518
assert(this.connected || this._channel);
490519

491520
if (message === undefined)
@@ -516,7 +545,11 @@ function setupChannel(target, channel) {
516545

517546
// Queue-up message and handle if we haven't received ACK yet.
518547
if (this._handleQueue) {
519-
this._handleQueue.push({ message: message.msg, handle: handle });
548+
this._handleQueue.push({
549+
callback: callback,
550+
handle: handle,
551+
message: message.msg,
552+
});
520553
return;
521554
}
522555

@@ -538,24 +571,43 @@ function setupChannel(target, channel) {
538571
} else if (this._handleQueue &&
539572
!(message && message.cmd === 'NODE_HANDLE_ACK')) {
540573
// Queue request anyway to avoid out-of-order messages.
541-
this._handleQueue.push({ message: message, handle: null });
574+
this._handleQueue.push({
575+
callback: callback,
576+
handle: null,
577+
message: message,
578+
});
542579
return;
543580
}
544581

545582
var req = new WriteWrap();
546-
req.oncomplete = nop;
583+
req.async = false;
584+
547585
var string = JSON.stringify(message) + '\n';
548586
var err = channel.writeUtf8String(req, string, handle);
549587

550-
if (err) {
551-
if (!swallowErrors)
552-
this.emit('error', errnoException(err, 'write'));
553-
} else if (handle && !this._handleQueue) {
554-
this._handleQueue = [];
555-
}
556-
557-
if (obj && obj.postSend) {
558-
req.oncomplete = obj.postSend.bind(null, handle);
588+
if (err === 0) {
589+
if (handle && !this._handleQueue)
590+
this._handleQueue = [];
591+
req.oncomplete = function() {
592+
if (this.async === true)
593+
control.unref();
594+
if (obj && obj.postSend)
595+
obj.postSend(handle);
596+
if (typeof callback === 'function')
597+
callback(null);
598+
};
599+
if (req.async === true) {
600+
control.ref();
601+
} else {
602+
process.nextTick(function() { req.oncomplete(); });
603+
}
604+
} else if (!swallowErrors) {
605+
const ex = errnoException(err, 'write');
606+
if (typeof callback === 'function') {
607+
process.nextTick(callback, ex);
608+
} else {
609+
this.emit('error', ex); // FIXME(bnoordhuis) Defer to next tick.
610+
}
559611
}
560612

561613
/* If the master is > 2 read() calls behind, please stop sending. */
@@ -616,6 +668,7 @@ function setupChannel(target, channel) {
616668
};
617669

618670
channel.readStart();
671+
return control;
619672
}
620673

621674

Collapse file
+19Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
'use strict';
2+
const common = require('../common');
3+
const assert = require('assert');
4+
const fork = require('child_process').fork;
5+
6+
if (process.argv[2] === 'child') {
7+
process.send('ok', common.mustCall(function(err) {
8+
assert.strictEqual(err, null);
9+
}));
10+
} else {
11+
const child = fork(process.argv[1], ['child']);
12+
child.on('message', common.mustCall(function(message) {
13+
assert.strictEqual(message, 'ok');
14+
}));
15+
child.on('exit', common.mustCall(function(exitCode, signalCode) {
16+
assert.strictEqual(exitCode, 0);
17+
assert.strictEqual(signalCode, null);
18+
}));
19+
}

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.