Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 09ad64d

Browse filesBrowse files
committed
stream: add CompressionStream and DecompressionStream
Signed-off-by: James M Snell <jasnell@gmail.com> PR-URL: #39348 Reviewed-By: Antoine du Hamel <duhamelantoine1995@gmail.com> Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
1 parent 25e2f17 commit 09ad64d
Copy full SHA for 09ad64d

File tree

Expand file treeCollapse file tree

4 files changed

+281
-0
lines changed
Open diff view settings
Filter options
Expand file treeCollapse file tree

4 files changed

+281
-0
lines changed
Open diff view settings
Collapse file

‎doc/api/webstreams.md‎

Copy file name to clipboardExpand all lines: doc/api/webstreams.md
+51Lines changed: 51 additions & 0 deletions
  • Display the source diff
  • Display the rich diff
Original file line numberDiff line numberDiff line change
@@ -1217,5 +1217,56 @@ added: REPLACEME
12171217
12181218
* Type: {WritableStream}
12191219
1220+
### Class: `CompressionStream`
1221+
<!-- YAML
1222+
added: REPLACEME
1223+
-->
1224+
#### `new CompressionStream(format)`
1225+
<!-- YAML
1226+
added: REPLACEME
1227+
-->
1228+
1229+
* `format` {string} One of either `'deflate'` or `'gzip'`.
1230+
1231+
#### `compressionStream.readable`
1232+
<!-- YAML
1233+
added: REPLACEME
1234+
-->
1235+
1236+
* Type: {ReadableStream}
1237+
1238+
#### `compressionStream.writable`
1239+
<!-- YAML
1240+
added: REPLACEME
1241+
-->
1242+
1243+
* Type: {WritableStream}
1244+
1245+
### Class: `DecompressionStream`
1246+
<!-- YAML
1247+
added: REPLACEME
1248+
-->
1249+
1250+
#### `new DecompressionStream(format)`
1251+
<!-- YAML
1252+
added: REPLACEME
1253+
-->
1254+
1255+
* `format` {string} One of either `'deflate'` or `'gzip'`.
1256+
1257+
#### `decompressionStream.readable`
1258+
<!-- YAML
1259+
added: REPLACEME
1260+
-->
1261+
1262+
* Type: {ReadableStream}
1263+
1264+
#### `deccompressionStream.writable`
1265+
<!-- YAML
1266+
added: REPLACEME
1267+
-->
1268+
1269+
* Type: {WritableStream}
1270+
12201271
[Streams]: stream.md
12211272
[WHATWG Streams Standard]: https://streams.spec.whatwg.org/
Collapse file
+164Lines changed: 164 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,164 @@
1+
'use strict';
2+
3+
const {
4+
ObjectDefineProperties,
5+
Symbol,
6+
} = primordials;
7+
8+
const {
9+
codes: {
10+
ERR_INVALID_ARG_VALUE,
11+
ERR_INVALID_THIS,
12+
},
13+
} = require('internal/errors');
14+
15+
const {
16+
newReadableWritablePairFromDuplex,
17+
} = require('internal/webstreams/adapters');
18+
19+
const {
20+
customInspect,
21+
kEnumerableProperty,
22+
} = require('internal/webstreams/util');
23+
24+
const {
25+
customInspectSymbol: kInspect,
26+
} = require('internal/util');
27+
28+
let zlib;
29+
function lazyZlib() {
30+
zlib ??= require('zlib');
31+
return zlib;
32+
}
33+
34+
const kHandle = Symbol('kHandle');
35+
const kTransform = Symbol('kTransform');
36+
const kType = Symbol('kType');
37+
38+
/**
39+
* @typedef {import('./readablestream').ReadableStream} ReadableStream
40+
* @typedef {import('./writablestream').WritableStream} WritableStream
41+
*/
42+
43+
function isCompressionStream(value) {
44+
return typeof value?.[kHandle] === 'object' &&
45+
value?.[kType] === 'CompressionStream';
46+
}
47+
48+
function isDecompressionStream(value) {
49+
return typeof value?.[kHandle] === 'object' &&
50+
value?.[kType] === 'DecompressionStream';
51+
}
52+
53+
class CompressionStream {
54+
/**
55+
* @param {'deflate'|'gzip'} format
56+
*/
57+
constructor(format) {
58+
this[kType] = 'CompressionStream';
59+
switch (format) {
60+
case 'deflate':
61+
this[kHandle] = lazyZlib().createDeflate();
62+
break;
63+
case 'gzip':
64+
this[kHandle] = lazyZlib().createGzip();
65+
break;
66+
default:
67+
throw new ERR_INVALID_ARG_VALUE('format', format);
68+
}
69+
this[kTransform] = newReadableWritablePairFromDuplex(this[kHandle]);
70+
}
71+
72+
/**
73+
* @readonly
74+
* @type {ReadableStream}
75+
*/
76+
get readable() {
77+
if (!isCompressionStream(this))
78+
throw new ERR_INVALID_THIS('CompressionStream');
79+
return this[kTransform].readable;
80+
}
81+
82+
/**
83+
* @readonly
84+
* @type {WritableStream}
85+
*/
86+
get writable() {
87+
if (!isCompressionStream(this))
88+
throw new ERR_INVALID_THIS('CompressionStream');
89+
return this[kTransform].writable;
90+
}
91+
92+
[kInspect](depth, options) {
93+
if (!isCompressionStream(this))
94+
throw new ERR_INVALID_THIS('CompressionStream');
95+
customInspect(depth, options, 'CompressionStream', {
96+
readable: this[kTransform].readable,
97+
writable: this[kTransform].writable,
98+
});
99+
}
100+
}
101+
102+
class DecompressionStream {
103+
/**
104+
* @param {'deflate'|'gzip'} format
105+
*/
106+
constructor(format) {
107+
this[kType] = 'DecompressionStream';
108+
switch (format) {
109+
case 'deflate':
110+
this[kHandle] = lazyZlib().createInflate();
111+
break;
112+
case 'gzip':
113+
this[kHandle] = lazyZlib().createGunzip();
114+
break;
115+
default:
116+
throw new ERR_INVALID_ARG_VALUE('format', format);
117+
}
118+
this[kTransform] = newReadableWritablePairFromDuplex(this[kHandle]);
119+
}
120+
121+
/**
122+
* @readonly
123+
* @type {ReadableStream}
124+
*/
125+
get readable() {
126+
if (!isDecompressionStream(this))
127+
throw new ERR_INVALID_THIS('DecompressionStream');
128+
return this[kTransform].readable;
129+
}
130+
131+
/**
132+
* @readonly
133+
* @type {WritableStream}
134+
*/
135+
get writable() {
136+
if (!isDecompressionStream(this))
137+
throw new ERR_INVALID_THIS('DecompressionStream');
138+
return this[kTransform].writable;
139+
}
140+
141+
[kInspect](depth, options) {
142+
if (!isDecompressionStream(this))
143+
throw new ERR_INVALID_THIS('DecompressionStream');
144+
customInspect(depth, options, 'DecompressionStream', {
145+
readable: this[kTransform].readable,
146+
writable: this[kTransform].writable,
147+
});
148+
}
149+
}
150+
151+
ObjectDefineProperties(CompressionStream.prototype, {
152+
readable: kEnumerableProperty,
153+
writable: kEnumerableProperty,
154+
});
155+
156+
ObjectDefineProperties(DecompressionStream.prototype, {
157+
readable: kEnumerableProperty,
158+
writable: kEnumerableProperty,
159+
});
160+
161+
module.exports = {
162+
CompressionStream,
163+
DecompressionStream,
164+
};
Collapse file

‎lib/stream/web.js‎

Copy file name to clipboardExpand all lines: lib/stream/web.js
+7Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,11 @@ const {
3636
TextDecoderStream,
3737
} = require('internal/webstreams/encoding');
3838

39+
const {
40+
CompressionStream,
41+
DecompressionStream,
42+
} = require('internal/webstreams/compression');
43+
3944
module.exports = {
4045
ReadableStream,
4146
ReadableStreamDefaultReader,
@@ -52,4 +57,6 @@ module.exports = {
5257
CountQueuingStrategy,
5358
TextEncoderStream,
5459
TextDecoderStream,
60+
CompressionStream,
61+
DecompressionStream,
5562
};
Collapse file
+59Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
// Flags: --no-warnings
2+
'use strict';
3+
4+
const common = require('../common');
5+
6+
const {
7+
CompressionStream,
8+
DecompressionStream,
9+
} = require('stream/web');
10+
11+
const assert = require('assert');
12+
const dec = new TextDecoder();
13+
14+
async function test(format) {
15+
const gzip = new CompressionStream(format);
16+
const gunzip = new DecompressionStream(format);
17+
18+
gzip.readable.pipeTo(gunzip.writable).then(common.mustCall());
19+
20+
const reader = gunzip.readable.getReader();
21+
const writer = gzip.writable.getWriter();
22+
23+
await Promise.all([
24+
reader.read().then(({ value, done }) => {
25+
assert.strictEqual(dec.decode(value), 'hello');
26+
}),
27+
reader.read().then(({ done }) => assert(done)),
28+
writer.write('hello'),
29+
writer.close(),
30+
]);
31+
}
32+
33+
Promise.all(['gzip', 'deflate'].map((i) => test(i))).then(common.mustCall());
34+
35+
[1, 'hello', false, {}].forEach((i) => {
36+
assert.throws(() => new CompressionStream(i), {
37+
code: 'ERR_INVALID_ARG_VALUE',
38+
});
39+
assert.throws(() => new DecompressionStream(i), {
40+
code: 'ERR_INVALID_ARG_VALUE',
41+
});
42+
});
43+
44+
assert.throws(
45+
() => Reflect.get(CompressionStream.prototype, 'readable', {}), {
46+
code: 'ERR_INVALID_THIS',
47+
});
48+
assert.throws(
49+
() => Reflect.get(CompressionStream.prototype, 'writable', {}), {
50+
code: 'ERR_INVALID_THIS',
51+
});
52+
assert.throws(
53+
() => Reflect.get(DecompressionStream.prototype, 'readable', {}), {
54+
code: 'ERR_INVALID_THIS',
55+
});
56+
assert.throws(
57+
() => Reflect.get(DecompressionStream.prototype, 'writable', {}), {
58+
code: 'ERR_INVALID_THIS',
59+
});

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.