Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 588b00f

Browse filesBrowse files
islandryuaduh95
authored andcommitted
cluster: fix port reuse between cluster
Fixes: #60086 PR-URL: #60141 Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
1 parent a87f7a5 commit 588b00f
Copy full SHA for 588b00f

4 files changed

+118-8Lines changed: 118 additions & 8 deletions

File tree

Expand file treeCollapse file tree
Open diff view settings
Filter options
Expand file treeCollapse file tree
Open diff view settings
Collapse file

‎lib/internal/cluster/primary.js‎

Copy file name to clipboardExpand all lines: lib/internal/cluster/primary.js
+17-8Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -271,8 +271,12 @@ function queryServer(worker, message) {
271271
return;
272272

273273
const key = `${message.address}:${message.port}:${message.addressType}:` +
274-
`${message.fd}:${message.index}`;
275-
let handle = handles.get(key);
274+
`${message.fd}` + (message.port === 0 ? `:${message.index}` : '');
275+
const cachedHandle = handles.get(key);
276+
let handle;
277+
if (cachedHandle && !cachedHandle.has(worker)) {
278+
handle = cachedHandle;
279+
}
276280

277281
if (handle === undefined) {
278282
let address = message.address;
@@ -298,25 +302,30 @@ function queryServer(worker, message) {
298302
handle = new RoundRobinHandle(key, address, message);
299303
}
300304

301-
handles.set(key, handle);
305+
if (!cachedHandle) {
306+
handles.set(key, handle);
307+
}
302308
}
303309

304310
handle.data ||= message.data;
305311

306312
// Set custom server data
307-
handle.add(worker, (errno, reply, handle) => {
313+
handle.add(worker, (errno, reply, serverHandle) => {
314+
if (!errno) {
315+
handles.set(key, handle); // Update in case it was replaced.
316+
}
308317
const { data } = handles.get(key);
309-
310-
if (errno)
311-
handles.delete(key); // Gives other workers a chance to retry.
318+
if (!cachedHandle && errno) {
319+
handles.delete(key);
320+
}
312321

313322
send(worker, {
314323
errno,
315324
key,
316325
ack: message.seq,
317326
data,
318327
...reply,
319-
}, handle);
328+
}, serverHandle);
320329
});
321330
}
322331

Collapse file

‎lib/internal/cluster/round_robin_handle.js‎

Copy file name to clipboardExpand all lines: lib/internal/cluster/round_robin_handle.js
+4Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,3 +137,7 @@ RoundRobinHandle.prototype.handoff = function(worker) {
137137
this.handoff(worker);
138138
});
139139
};
140+
141+
RoundRobinHandle.prototype.has = function(worker) {
142+
return this.all.has(worker.id);
143+
};
Collapse file

‎lib/internal/cluster/shared_handle.js‎

Copy file name to clipboardExpand all lines: lib/internal/cluster/shared_handle.js
+4Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,3 +47,7 @@ SharedHandle.prototype.remove = function(worker) {
4747
this.handle = null;
4848
return true;
4949
};
50+
51+
SharedHandle.prototype.has = function(worker) {
52+
return this.workers.has(worker.id);
53+
};
Collapse file
+93Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
'use strict';
2+
3+
const common = require('../common');
4+
const cluster = require('cluster');
5+
const assert = require('assert');
6+
7+
const acts = {
8+
WORKER1_SERVER1_CLOSED: { cmd: 'WORKER1_SERVER1_CLOSED' },
9+
WORKER2_SERVER1_STARTED: { cmd: 'WORKER2_SERVER1_STARTED' },
10+
WORKER1_SERVER2_CLOSED: { cmd: 'WORKER1_SERVER2_CLOSED' },
11+
};
12+
13+
if (cluster.isMaster) {
14+
const currentHost = '::';
15+
const worker1 = cluster.fork({
16+
WORKER_ID: 'worker1',
17+
HOST: currentHost,
18+
});
19+
let worker2;
20+
worker1.on('error', common.mustNotCall());
21+
worker1.on('message', onMessage);
22+
23+
function createWorker2() {
24+
worker2 = cluster.fork({
25+
WORKER_ID: 'worker2',
26+
HOST: currentHost,
27+
});
28+
worker2.on('error', common.mustNotCall());
29+
worker2.on('message', onMessage);
30+
}
31+
32+
function onMessage(msg) {
33+
switch (msg.cmd) {
34+
case acts.WORKER1_SERVER1_CLOSED.cmd:
35+
createWorker2();
36+
break;
37+
case acts.WORKER2_SERVER1_STARTED.cmd:
38+
worker1.send(acts.WORKER2_SERVER1_STARTED);
39+
break;
40+
case acts.WORKER1_SERVER2_CLOSED.cmd:
41+
worker1.kill();
42+
worker2.kill();
43+
break;
44+
default:
45+
assert.fail(`Unexpected message ${msg.cmd}`);
46+
}
47+
}
48+
} else {
49+
const WORKER_ID = process.env.WORKER_ID;
50+
function createServer() {
51+
return new Promise((resolve, reject) => {
52+
const net = require('net');
53+
const PORT = 8000;
54+
const server = net
55+
.createServer((socket) => {
56+
socket.end(
57+
`Handled by worker ${process.env.WORKER_ID} (${process.pid})\n`
58+
);
59+
})
60+
.on('error', (e) => {
61+
reject(e);
62+
});
63+
64+
server.listen(
65+
{
66+
port: PORT,
67+
host: process.env.HOST,
68+
},
69+
() => resolve(server)
70+
);
71+
});
72+
}
73+
(async () => {
74+
const server1 = await createServer();
75+
if (WORKER_ID === 'worker2') {
76+
process.send(acts.WORKER2_SERVER1_STARTED);
77+
} else {
78+
await createServer().catch(common.mustCall());
79+
await new Promise((r) => server1.close(r));
80+
process.send(acts.WORKER1_SERVER1_CLOSED);
81+
82+
process.on('message', async (msg) => {
83+
if (msg.cmd === acts.WORKER2_SERVER1_STARTED.cmd) {
84+
const server2 = await createServer();
85+
await new Promise((r) => server2.close(r));
86+
process.send(acts.WORKER1_SERVER2_CLOSED);
87+
} else {
88+
assert.fail(`Unexpected message ${msg.cmd}`);
89+
}
90+
});
91+
}
92+
})().then(common.mustCall());
93+
}

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.