Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit dfc0ef5

Browse filesBrowse files
mscdexBridgeAR
authored andcommitted
net: allow reading data into a static buffer
Co-Authored-By: Anna Henningsen <anna@addaleax.net> PR-URL: #25436 Reviewed-By: James M Snell <jasnell@gmail.com> Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Ben Noordhuis <info@bnoordhuis.nl>
1 parent 2fafd63 commit dfc0ef5
Copy full SHA for dfc0ef5

File tree

Expand file treeCollapse file tree

7 files changed

+474
-65
lines changed
Open diff view settings
Filter options
Expand file treeCollapse file tree

7 files changed

+474
-65
lines changed
Open diff view settings
Collapse file

‎benchmark/net/net-s2c.js‎

Copy file name to clipboardExpand all lines: benchmark/net/net-s2c.js
+46-11Lines changed: 46 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,48 +5,84 @@ const common = require('../common.js');
55
const PORT = common.PORT;
66

77
const bench = common.createBenchmark(main, {
8-
len: [64, 102400, 1024 * 1024 * 16],
8+
sendchunklen: [256, 32 * 1024, 128 * 1024, 16 * 1024 * 1024],
99
type: ['utf', 'asc', 'buf'],
10+
recvbuflen: [0, 64 * 1024, 1024 * 1024],
11+
recvbufgenfn: ['true', 'false'],
1012
dur: [5]
1113
});
1214

1315
var chunk;
1416
var encoding;
17+
var recvbuf;
18+
var received = 0;
19+
20+
function main({ dur, sendchunklen, type, recvbuflen, recvbufgenfn }) {
21+
if (isFinite(recvbuflen) && recvbuflen > 0)
22+
recvbuf = Buffer.alloc(recvbuflen);
1523

16-
function main({ dur, len, type }) {
1724
switch (type) {
1825
case 'buf':
19-
chunk = Buffer.alloc(len, 'x');
26+
chunk = Buffer.alloc(sendchunklen, 'x');
2027
break;
2128
case 'utf':
2229
encoding = 'utf8';
23-
chunk = 'ü'.repeat(len / 2);
30+
chunk = 'ü'.repeat(sendchunklen / 2);
2431
break;
2532
case 'asc':
2633
encoding = 'ascii';
27-
chunk = 'x'.repeat(len);
34+
chunk = 'x'.repeat(sendchunklen);
2835
break;
2936
default:
3037
throw new Error(`invalid type: ${type}`);
3138
}
3239

3340
const reader = new Reader();
34-
const writer = new Writer();
41+
var writer;
42+
var socketOpts;
43+
if (recvbuf === undefined) {
44+
writer = new Writer();
45+
socketOpts = { port: PORT };
46+
} else {
47+
let buffer = recvbuf;
48+
if (recvbufgenfn === 'true') {
49+
let bufidx = -1;
50+
const bufpool = [
51+
recvbuf,
52+
Buffer.from(recvbuf),
53+
Buffer.from(recvbuf),
54+
];
55+
buffer = () => {
56+
bufidx = (bufidx + 1) % bufpool.length;
57+
return bufpool[bufidx];
58+
};
59+
}
60+
socketOpts = {
61+
port: PORT,
62+
onread: {
63+
buffer,
64+
callback: function(nread, buf) {
65+
received += nread;
66+
}
67+
}
68+
};
69+
}
3570

3671
// The actual benchmark.
3772
const server = net.createServer((socket) => {
3873
reader.pipe(socket);
3974
});
4075

4176
server.listen(PORT, () => {
42-
const socket = net.connect(PORT);
77+
const socket = net.connect(socketOpts);
4378
socket.on('connect', () => {
4479
bench.start();
4580

46-
socket.pipe(writer);
81+
if (recvbuf === undefined)
82+
socket.pipe(writer);
4783

4884
setTimeout(() => {
49-
const bytes = writer.received;
85+
const bytes = received;
5086
const gbits = (bytes * 8) / (1024 * 1024 * 1024);
5187
bench.end(gbits);
5288
process.exit(0);
@@ -58,12 +94,11 @@ function main({ dur, len, type }) {
5894
const net = require('net');
5995

6096
function Writer() {
61-
this.received = 0;
6297
this.writable = true;
6398
}
6499

65100
Writer.prototype.write = function(chunk, encoding, cb) {
66-
this.received += chunk.length;
101+
received += chunk.length;
67102

68103
if (typeof encoding === 'function')
69104
encoding();
Collapse file

‎doc/api/net.md‎

Copy file name to clipboardExpand all lines: doc/api/net.md
+36Lines changed: 36 additions & 0 deletions
  • Display the source diff
  • Display the rich diff
Original file line numberDiff line numberDiff line change
@@ -593,6 +593,9 @@ for the [`'connect'`][] event **once**.
593593
<!-- YAML
594594
added: v0.1.90
595595
changes:
596+
- version: REPLACEME
597+
pr-url: https://github.com/nodejs/node/pull/25436
598+
description: Added `onread` option.
596599
- version: v6.0.0
597600
pr-url: https://github.com/nodejs/node/pull/6021
598601
description: The `hints` option defaults to `0` in all cases now.
@@ -629,6 +632,39 @@ For [IPC][] connections, available `options` are:
629632
See [Identifying paths for IPC connections][]. If provided, the TCP-specific
630633
options above are ignored.
631634

635+
For both types, available `options` include:
636+
637+
* `onread` {Object} - If specified, incoming data is stored in a single `buffer`
638+
and passed to the supplied `callback` when data arrives on the socket.
639+
Note: this will cause the streaming functionality to not provide any data,
640+
however events like `'error'`, `'end'`, and `'close'` will still be emitted
641+
as normal and methods like `pause()` and `resume()` will also behave as
642+
expected.
643+
* `buffer` {Buffer|Uint8Array|Function} - Either a reusable chunk of memory to
644+
use for storing incoming data or a function that returns such.
645+
* `callback` {Function} This function is called for every chunk of incoming
646+
data. Two arguments are passed to it: the number of bytes written to
647+
`buffer` and a reference to `buffer`. Return `false` from this function to
648+
implicitly `pause()` the socket. This function will be executed in the
649+
global context.
650+
651+
Following is an example of a client using the `onread` option:
652+
653+
```js
654+
const net = require('net');
655+
net.connect({
656+
port: 80,
657+
onread: {
658+
// Reuses a 4KiB Buffer for every read from the socket
659+
buffer: Buffer.alloc(4 * 1024),
660+
callback: function(nread, buf) {
661+
// Received data is available in `buf` from 0 to `nread`
662+
console.log(buf.toString('utf8', 0, nread));
663+
}
664+
}
665+
});
666+
```
667+
632668
#### socket.connect(path[, connectListener])
633669

634670
* `path` {string} Path the client should connect to. See
Collapse file

‎lib/internal/stream_base_commons.js‎

Copy file name to clipboardExpand all lines: lib/internal/stream_base_commons.js
+26-5Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ const {
2323
setUnrefTimeout,
2424
getTimerDuration
2525
} = require('internal/timers');
26+
const { isUint8Array } = require('internal/util/types');
2627
const { clearTimeout } = require('timers');
2728

2829
const kMaybeDestroy = Symbol('kMaybeDestroy');
@@ -32,6 +33,9 @@ const kHandle = Symbol('kHandle');
3233
const kSession = Symbol('kSession');
3334

3435
const debug = require('internal/util/debuglog').debuglog('stream');
36+
const kBuffer = Symbol('kBuffer');
37+
const kBufferGen = Symbol('kBufferGen');
38+
const kBufferCb = Symbol('kBufferCb');
3539

3640
function handleWriteReq(req, data, encoding) {
3741
const { handle } = req;
@@ -161,9 +165,23 @@ function onStreamRead(arrayBuffer) {
161165
stream[kUpdateTimer]();
162166

163167
if (nread > 0 && !stream.destroyed) {
164-
const offset = streamBaseState[kArrayBufferOffset];
165-
const buf = new FastBuffer(arrayBuffer, offset, nread);
166-
if (!stream.push(buf)) {
168+
let ret;
169+
let result;
170+
const userBuf = stream[kBuffer];
171+
if (userBuf) {
172+
result = (stream[kBufferCb](nread, userBuf) !== false);
173+
const bufGen = stream[kBufferGen];
174+
if (bufGen !== null) {
175+
const nextBuf = bufGen();
176+
if (isUint8Array(nextBuf))
177+
stream[kBuffer] = ret = nextBuf;
178+
}
179+
} else {
180+
const offset = streamBaseState[kArrayBufferOffset];
181+
const buf = new FastBuffer(arrayBuffer, offset, nread);
182+
result = stream.push(buf);
183+
}
184+
if (!result) {
167185
handle.reading = false;
168186
if (!stream.destroyed) {
169187
const err = handle.readStop();
@@ -172,7 +190,7 @@ function onStreamRead(arrayBuffer) {
172190
}
173191
}
174192

175-
return;
193+
return ret;
176194
}
177195

178196
if (nread === 0) {
@@ -241,5 +259,8 @@ module.exports = {
241259
kUpdateTimer,
242260
kHandle,
243261
kSession,
244-
setStreamTimeout
262+
setStreamTimeout,
263+
kBuffer,
264+
kBufferCb,
265+
kBufferGen
245266
};

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.