Commit d9a017a2 authored by Robert Knight's avatar Robert Knight

Move WebSocket automatic reconnection to streamer service

Move the logic for automatically reconnecting to the WebSocket after an
unexpected disconnection from the `Socket` WebSocket wrapper class to
the `StreamerService` service. The Socket class is now responsible only
for notifying the caller of unexpected disconnections.

Letting StreamerService handle reconnection ensures that a fresh
WebSocket URL, with a current access token, is used on each attempt. The
previous approach would re-use the same URL on each attempt. This could
lead to an obsolete access token being used, with the result that the
WebSocket connection degraded to an unauthenticated one and not all
expected updates were received.

As part of this change, the logic for calculating the retry delay has been
simplified and no longer uses the `retry` package, hopefully making the
behavior easier to understand.
parent 44310d04
......@@ -55,6 +55,12 @@ export class StreamerService {
*/
this._configMessages = {};
/**
* Number of automatic reconnection attempts that have been made following
* an unexpected disconnection.
*/
this._connectionAttempts = 0;
this._reconnectSetUp = false;
}
......@@ -195,7 +201,24 @@ export class StreamerService {
}
const newSocket = new Socket(url);
newSocket.on('open', () => this._sendClientConfig(newSocket));
newSocket.on('open', () => {
this._connectionAttempts = 0;
this._sendClientConfig(newSocket);
});
newSocket.on('disconnect', () => {
++this._connectionAttempts;
if (this._connectionAttempts < 10) {
// Reconnect with a delay that doubles on each attempt.
// This reduces the stampede of requests if the WebSocket server has a
// problem.
const delay = 1000 * 2 ** this._connectionAttempts;
setTimeout(() => this._reconnect(), delay);
} else {
console.error(
'Gave up trying to reconnect to Hypothesis real time update service'
);
}
});
newSocket.on('error', err => this._handleSocketError(err));
newSocket.on('message', event => this._handleSocketMessage(event));
this._socket = newSocket;
......
......@@ -252,6 +252,18 @@ describe('StreamerService', () => {
});
describe('Automatic reconnection', () => {
let clock;
beforeEach(() => {
clock = null;
sinon.stub(console, 'error');
});
afterEach(() => {
clock?.restore();
console.error.restore();
});
it('should reconnect when user changes', () => {
let oldWebSocket;
createDefaultStreamer();
......@@ -269,6 +281,43 @@ describe('StreamerService', () => {
});
});
it('should reconnect after unexpected disconnection', async () => {
clock = sinon.useFakeTimers();
createDefaultStreamer();
await activeStreamer.connect();
fakeWebSocket.emit('disconnect');
// Wait for reconnection to happen.
clock.tick(3000);
await Promise.resolve();
assert.lengthOf(fakeWebSockets, 2);
});
it('should limit number of reconnection attempts after an unexpected disconnection', async () => {
clock = sinon.useFakeTimers();
createDefaultStreamer();
await activeStreamer.connect();
for (let i = 1; i < 11; i++) {
fakeWebSocket.emit('disconnect');
// This mirrors the delay calculation in the service itself.
const delay = 1000 * 2 ** i;
clock.tick(delay);
await Promise.resolve();
}
assert.lengthOf(fakeWebSockets, 10);
assert.calledWith(
console.error,
'Gave up trying to reconnect to Hypothesis real time update service'
);
});
it('should only set up auto-reconnect once', async () => {
createDefaultStreamer();
// This should register auto-reconnect
......@@ -282,7 +331,7 @@ describe('StreamerService', () => {
fakeStore.setState({});
await delay(1);
// Total number of web sockets blown through in this test should be 2
// Total number of web sockets created in this test should be 2
// 3+ would indicate `reconnect` fired more than once
assert.lengthOf(fakeWebSockets, 2);
});
......
......@@ -3,18 +3,14 @@ import {
CLOSE_NORMAL,
CLOSE_GOING_AWAY,
CLOSE_ABNORMAL,
RECONNECT_MIN_DELAY,
} from '../websocket';
describe('websocket wrapper', () => {
let fakeSocket;
let clock;
let connectionCount;
class FakeWebSocket {
constructor() {
++connectionCount;
this.close = sinon.stub();
this.send = sinon.stub();
fakeSocket = this; // eslint-disable-line consistent-this
......@@ -27,7 +23,6 @@ describe('websocket wrapper', () => {
beforeEach(() => {
globalThis.WebSocket = FakeWebSocket;
clock = sinon.useFakeTimers();
connectionCount = 0;
// Suppress warnings of WebSocket issues in tests for handling
// of abnormal disconnections
......@@ -43,78 +38,33 @@ describe('websocket wrapper', () => {
});
context('when the connection is closed by the browser or server', () => {
it('should reconnect after an abnormal disconnection', () => {
new Socket('ws://test:1234');
assert.ok(fakeSocket);
const initialSocket = fakeSocket;
fakeSocket.onopen({});
fakeSocket.onclose({ code: CLOSE_ABNORMAL });
clock.tick(2000);
assert.ok(fakeSocket);
assert.notEqual(fakeSocket, initialSocket);
});
it('should emit "disconnect" event after an abnormal disconnection', () => {
const onDisconnect = sinon.stub();
const socket = new Socket('ws://test:1234');
socket.on('disconnect', onDisconnect);
it('should reconnect if initial connection fails', () => {
new Socket('ws://test:1234');
assert.ok(fakeSocket);
const initialSocket = fakeSocket;
fakeSocket.onopen({});
fakeSocket.onclose({ code: CLOSE_ABNORMAL });
clock.tick(4000);
assert.ok(fakeSocket);
assert.notEqual(fakeSocket, initialSocket);
});
it('should send queued messages after a reconnect', () => {
// simulate WebSocket setup and initial connection
const socket = new Socket('ws://test:1234');
fakeSocket.onopen({});
// simulate abnormal disconnection
fakeSocket.onclose({ code: CLOSE_ABNORMAL });
// enqueue a message and check that it is sent after the WS reconnects
socket.send({ aKey: 'aValue' });
fakeSocket.onopen({});
assert.calledWith(fakeSocket.send, '{"aKey":"aValue"}');
assert.calledOnce(onDisconnect);
});
[CLOSE_NORMAL, CLOSE_GOING_AWAY].forEach(closeCode => {
it('should not reconnect after a normal disconnection', () => {
new Socket('ws://test:1234');
it('should not emit "disconnect" after a normal disconnection', () => {
const onDisconnect = sinon.stub();
const socket = new Socket('ws://test:1234');
socket.on('disconnect', onDisconnect);
assert.ok(fakeSocket);
const initialSocket = fakeSocket;
fakeSocket.onopen({});
fakeSocket.onclose({ code: closeCode });
clock.tick(4000);
assert.ok(fakeSocket);
assert.equal(fakeSocket, initialSocket);
assert.notCalled(onDisconnect);
});
});
it('should stop trying to reconnect after 10 retries', () => {
new Socket('ws://test:1234');
connectionCount = 0;
for (let attempt = 1; attempt <= 11; attempt++) {
fakeSocket.onclose({ code: CLOSE_ABNORMAL });
// The delay between retries is a random value between `minTimeout` and
// `minTimeout * (backoffFactor ** attempt)`. See docs for "retry" package.
const minTimeout = RECONNECT_MIN_DELAY;
const backoffFactor = 2; // Default exponential factor for "retry" package
const maxDelay = minTimeout * Math.pow(backoffFactor, attempt);
clock.tick(maxDelay);
}
assert.equal(connectionCount, 10);
assert.calledWith(
console.error,
'Reached max retries attempting to reconnect WebSocket'
);
});
});
it('should queue messages sent prior to connection', () => {
......
import retry from 'retry';
import { TinyEmitter } from 'tiny-emitter';
/**
* Operation created by `retry.operation`. See "retry" docs.
*
* @typedef RetryOperation
* @prop {(callback: () => void) => void} attempt
* @prop {(e: Error) => boolean} retry
*/
// Status codes indicating the reason why a WebSocket connection closed.
// See https://developer.mozilla.org/en-US/docs/Web/API/CloseEvent and
// https://tools.ietf.org/html/rfc6455#section-7.4.
......@@ -23,17 +14,13 @@ export const CLOSE_ABNORMAL = 1006;
// There are other possible close status codes not listed here. They are all
// considered abnormal closures.
// Minimum delay, in ms, before reconnecting after an abnormal connection close.
export const RECONNECT_MIN_DELAY = 1000;
/**
* Socket is a minimal wrapper around WebSocket which provides:
*
* - Automatic reconnection in the event of an abnormal close
* - Serialization of JSON messages (see {@link send})
* - An EventEmitter API
* - Queuing of messages passed to send() whilst the socket is
* connecting
* - Uses the standard EventEmitter API for reporting open, close, error
* and message events.
*/
export class Socket extends TinyEmitter {
/**
......@@ -58,13 +45,6 @@ export class Socket extends TinyEmitter {
*/
let socket;
/**
* Pending connection or re-connection operation.
*
* @type {RetryOperation|null}
*/
let operation = null;
const sendMessages = () => {
while (messageQueue.length > 0) {
const messageString = JSON.stringify(messageQueue.shift());
......@@ -72,70 +52,32 @@ export class Socket extends TinyEmitter {
}
};
/**
* Handler for when the WebSocket disconnects "abnormally".
*
* This may be the result of a failure to connect, or an abnormal close after
* a previous successful connection.
*
* @param {Error} error
* @param {() => void} reconnect
*/
const onAbnormalClose = (error, reconnect) => {
// If we're already in a reconnection loop, trigger a retry...
if (operation) {
if (!operation.retry(error)) {
console.error(
'Reached max retries attempting to reconnect WebSocket'
);
}
return;
}
// ...otherwise reconnect the websocket after a short delay.
let delay = RECONNECT_MIN_DELAY;
delay += Math.floor(Math.random() * delay);
setTimeout(reconnect, delay);
};
/**
* Connect the WebSocket.
*/
const connect = () => {
operation = /** @type {RetryOperation} */ (
retry.operation({
minTimeout: RECONNECT_MIN_DELAY * 2,
// Don't retry forever -- fail permanently after 10 retries
retries: 10,
// Randomize retry times to minimize the thundering herd effect
randomize: true,
})
);
operation.attempt(() => {
socket = new WebSocket(url);
socket.onopen = event => {
operation = null;
sendMessages();
this.emit('open', event);
};
socket.onclose = event => {
if (event.code === CLOSE_NORMAL || event.code === CLOSE_GOING_AWAY) {
this.emit('close', event);
return;
}
const err = new Error(
`WebSocket closed abnormally, code: ${event.code}`
);
console.warn(err);
onAbnormalClose(err, connect);
};
socket.onerror = event => {
this.emit('error', event);
};
socket.onmessage = event => {
this.emit('message', event);
};
});
socket = new WebSocket(url);
socket.onopen = event => {
sendMessages();
this.emit('open', event);
};
socket.onclose = event => {
if (event.code === CLOSE_NORMAL || event.code === CLOSE_GOING_AWAY) {
this.emit('close', event);
return;
}
const err = new Error(
`WebSocket closed abnormally, code: ${event.code}`
);
console.warn(err);
this.emit('disconnect');
};
socket.onerror = event => {
this.emit('error', event);
};
socket.onmessage = event => {
this.emit('message', event);
};
};
/** Close the underlying WebSocket connection */
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment