Skip to content

Commit 869dc60

Browse files
committed
cluster: send connection to other server when worker drop it
1 parent dabda03 commit 869dc60

File tree

2 files changed

+67
-1
lines changed

2 files changed

+67
-1
lines changed

lib/internal/cluster/child.js

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,14 @@ function rr(message, { indexesKey, index }, cb) {
208208
function onconnection(message, handle) {
209209
const key = message.key;
210210
const server = handles.get(key);
211-
const accepted = server !== undefined;
211+
let accepted = server !== undefined;
212+
213+
if (accepted && server[owner_symbol]) {
214+
const self = server[owner_symbol];
215+
if (self.maxConnections && self._connections >= self.maxConnections) {
216+
accepted = false;
217+
}
218+
}
212219

213220
send({ ack: message.seq, accepted });
214221

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
'use strict';
2+
const common = require('../common');
3+
const assert = require('assert');
4+
const net = require('net');
5+
const cluster = require('cluster');
6+
const tmpdir = require('../common/tmpdir');
7+
8+
let connectionCount = 0;
9+
let listenCount = 0;
10+
let worker1;
11+
let worker2;
12+
13+
function request(path) {
14+
for (let i = 0; i < 10; i++) {
15+
net.connect(path);
16+
}
17+
}
18+
19+
function handleMessage(message) {
20+
assert.match(message.action, /listen|connection/);
21+
if (message.action === 'listen') {
22+
if (++listenCount === 2) {
23+
request(common.PIPE);
24+
}
25+
} else if (message.action === 'connection') {
26+
if (++connectionCount === 10) {
27+
worker1.send({ action: 'disconnect' });
28+
worker2.send({ action: 'disconnect' });
29+
}
30+
}
31+
}
32+
33+
if (cluster.isPrimary) {
34+
cluster.schedulingPolicy = cluster.SCHED_RR;
35+
tmpdir.refresh();
36+
worker1 = cluster.fork({ maxConnections: 1, pipePath: common.PIPE });
37+
worker2 = cluster.fork({ maxConnections: 9, pipePath: common.PIPE });
38+
worker1.on('message', common.mustCall((message) => {
39+
handleMessage(message);
40+
}, 2));
41+
worker2.on('message', common.mustCall((message) => {
42+
handleMessage(message);
43+
}, 10));
44+
} else {
45+
const server = net.createServer(common.mustCall((socket) => {
46+
process.send({ action: 'connection' });
47+
}, +process.env.maxConnections));
48+
49+
server.listen(process.env.pipePath, common.mustCall(() => {
50+
process.send({ action: 'listen' });
51+
}));
52+
53+
server.maxConnections = +process.env.maxConnections;
54+
55+
process.on('message', common.mustCall((message) => {
56+
assert.strictEqual(message.action, 'disconnect');
57+
process.disconnect();
58+
}));
59+
}

0 commit comments

Comments
 (0)