Commit 797e231e authored by Alejandro Celaya's avatar Alejandro Celaya Committed by Alejandro Celaya

Migrate StreamerService to TS

parent c564c77f
import { generateHexString } from '../../shared/random'; import { generateHexString } from '../../shared/random';
import { warnOnce } from '../../shared/warn-once'; import { warnOnce } from '../../shared/warn-once';
import type { SidebarStore } from '../store';
import { watch } from '../util/watch'; import { watch } from '../util/watch';
import { Socket } from '../websocket'; import { Socket } from '../websocket';
import type { APIRoutesService } from './api-routes';
import type { AuthService } from './auth';
import type { GroupsService } from './groups';
import type { SessionService } from './session';
/** /**
* `StreamerService` manages the WebSocket connection to the Hypothesis Real-Time * `StreamerService` manages the WebSocket connection to the Hypothesis Real-Time
...@@ -21,46 +26,54 @@ import { Socket } from '../websocket'; ...@@ -21,46 +26,54 @@ import { Socket } from '../websocket';
* @inject * @inject
*/ */
export class StreamerService { export class StreamerService {
/** private _auth: AuthService;
* @param {import('../store').SidebarStore} store private _groups: GroupsService;
* @param {import('./api-routes').APIRoutesService} apiRoutes private _session: SessionService;
* @param {import('./auth').AuthService} auth private _store: SidebarStore;
* @param {import('./groups').GroupsService} groups private _websocketURL: Promise<string>;
* @param {import('./session').SessionService} session private _socket: Socket | null;
*/ private _reconnectSetUp: boolean;
constructor(store, apiRoutes, auth, groups, session) {
this._auth = auth;
this._groups = groups;
this._session = session;
this._store = store;
this._websocketURL = apiRoutes.links().then(links => links.websocket);
/** The randomly generated session ID */
this.clientId = generateHexString(32);
/** @type {Socket|null} */
this._socket = null;
/** /**
* Flag that controls whether to apply updates immediately or defer them * Flag that controls whether to apply updates immediately or defer them
* until "manually" applied via `applyPendingUpdates` * until "manually" applied via `applyPendingUpdates`
*/ */
this._updateImmediately = true; private _updateImmediately: boolean;
/** /**
* Client configuration messages, to be sent each time a new connection is * Client configuration messages, to be sent each time a new connection is
* established. * established.
*
* @type {Record<string, object>}
*/ */
this._configMessages = {}; private _configMessages: Record<string, object>;
/** /**
* Number of automatic reconnection attempts that have been made following * Number of automatic reconnection attempts that have been made following
* an unexpected disconnection. * an unexpected disconnection.
*/ */
this._reconnectionAttempts = 0; private _reconnectionAttempts: number;
/** The randomly generated session ID */
clientId: string;
constructor(
store: SidebarStore,
apiRoutes: APIRoutesService,
auth: AuthService,
groups: GroupsService,
session: SessionService
) {
this._auth = auth;
this._groups = groups;
this._session = session;
this._store = store;
this._websocketURL = apiRoutes.links().then(links => links.websocket);
this.clientId = generateHexString(32);
this._socket = null;
this._updateImmediately = true;
this._configMessages = {};
this._reconnectionAttempts = 0;
this._reconnectSetUp = false; this._reconnectSetUp = false;
} }
...@@ -84,11 +97,7 @@ export class StreamerService { ...@@ -84,11 +97,7 @@ export class StreamerService {
this._store.clearPendingUpdates(); this._store.clearPendingUpdates();
} }
/** private _handleSocketError(websocketURL: string, event: ErrorEvent) {
* @param {string} websocketURL
* @param {ErrorEvent} event
*/
_handleSocketError(websocketURL, event) {
warnOnce('Error connecting to H push notification service:', event); warnOnce('Error connecting to H push notification service:', event);
// In development, warn if the connection failure might be due to // In development, warn if the connection failure might be due to
...@@ -107,8 +116,7 @@ export class StreamerService { ...@@ -107,8 +116,7 @@ export class StreamerService {
} }
} }
/** @param {MessageEvent} event */ private _handleSocketMessage(event: MessageEvent) {
_handleSocketMessage(event) {
const message = JSON.parse(event.data); const message = JSON.parse(event.data);
if (!message) { if (!message) {
return; return;
...@@ -151,8 +159,7 @@ export class StreamerService { ...@@ -151,8 +159,7 @@ export class StreamerService {
} }
} }
/** @param {Socket} socket */ private _sendClientConfig(socket: Socket) {
_sendClientConfig(socket) {
Object.keys(this._configMessages).forEach(key => { Object.keys(this._configMessages).forEach(key => {
if (this._configMessages[key]) { if (this._configMessages[key]) {
socket.send(this._configMessages[key]); socket.send(this._configMessages[key]);
...@@ -164,11 +171,8 @@ export class StreamerService { ...@@ -164,11 +171,8 @@ export class StreamerService {
* Send a configuration message to the push notification service. * Send a configuration message to the push notification service.
* Each message is associated with a key, which is used to re-send * Each message is associated with a key, which is used to re-send
* configuration data to the server in the event of a reconnection. * configuration data to the server in the event of a reconnection.
*
* @param {string} key
* @param {object} configMessage
*/ */
setConfig(key, configMessage) { setConfig(key: string, configMessage: object) {
this._configMessages[key] = configMessage; this._configMessages[key] = configMessage;
if (this._socket?.isConnected()) { if (this._socket?.isConnected()) {
this._socket.send(configMessage); this._socket.send(configMessage);
...@@ -222,14 +226,10 @@ export class StreamerService { ...@@ -222,14 +226,10 @@ export class StreamerService {
); );
} }
}); });
newSocket.on( newSocket.on('error', (event: ErrorEvent) =>
'error',
/** @param {ErrorEvent} event */ event =>
this._handleSocketError(websocketURL, event) this._handleSocketError(websocketURL, event)
); );
newSocket.on( newSocket.on('message', (event: MessageEvent) =>
'message',
/** @param {MessageEvent} event */ event =>
this._handleSocketMessage(event) this._handleSocketMessage(event)
); );
this._socket = newSocket; this._socket = newSocket;
...@@ -254,12 +254,12 @@ export class StreamerService { ...@@ -254,12 +254,12 @@ export class StreamerService {
* *
* If the service has already connected this does nothing. * If the service has already connected this does nothing.
* *
* @param {object} [options] * @param [options.applyUpdatesImmediately] - true if pending updates should be applied immediately
* @param {boolean} [options.applyUpdatesImmediately] - true if pending updates should be applied immediately * @return Promise which resolves once the WebSocket connection process has started.
* @return {Promise<void>} Promise which resolves once the WebSocket connection
* process has started.
*/ */
async connect(options = {}) { async connect(
options: { applyUpdatesImmediately?: boolean } = {}
): Promise<void> {
this._updateImmediately = options.applyUpdatesImmediately ?? true; this._updateImmediately = options.applyUpdatesImmediately ?? true;
// Setup reconnection when user changes, as auth token will have changed. // Setup reconnection when user changes, as auth token will have changed.
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment