From cfe9b8b7447ea11bcdc0b29790bed8d539a1cad3 Mon Sep 17 00:00:00 2001 From: Justin Starry Date: Mon, 7 Sep 2020 23:12:22 +0800 Subject: [PATCH] feat: add idle timeout and fix subscription race (#12093) --- web3.js/package-lock.json | 8 +-- web3.js/package.json | 2 +- web3.js/src/connection.js | 30 +++++++++-- web3.js/test/__mocks__/rpc-websockets.js | 61 ++++++--------------- web3.js/test/websocket.test.js | 68 ++++++++++++++++++++++++ 5 files changed, 115 insertions(+), 54 deletions(-) create mode 100644 web3.js/test/websocket.test.js diff --git a/web3.js/package-lock.json b/web3.js/package-lock.json index 4539944b91..3853cb2345 100644 --- a/web3.js/package-lock.json +++ b/web3.js/package-lock.json @@ -20756,15 +20756,15 @@ } }, "rpc-websockets": { - "version": "7.4.0", - "resolved": "https://registry.npmjs.org/rpc-websockets/-/rpc-websockets-7.4.0.tgz", - "integrity": "sha512-FCPhQyD2Lfg1rOm8SOVL/FiDw7/3e9MDVeLqSSb4KBWUWmcJb/WcWtjkIXm6ecarbNmHntVLx8KH3Exyj0hMDA==", + "version": "7.4.2", + "resolved": "https://registry.npmjs.org/rpc-websockets/-/rpc-websockets-7.4.2.tgz", + "integrity": "sha512-kUpYcnbEU/BeAxGTlfySZ/tp9FU+TLSgONbViyx6hQsIh8876uxggJWzVOCe+CztBvuCOAOd0BXyPlKfcflykw==", "requires": { "@babel/runtime": "^7.11.2", "assert-args": "^1.2.1", "bufferutil": "^4.0.1", "circular-json": "^0.5.9", - "eventemitter3": "^4.0.6", + "eventemitter3": "^4.0.7", "utf-8-validate": "^5.0.2", "uuid": "^8.3.0", "ws": "^7.3.1" diff --git a/web3.js/package.json b/web3.js/package.json index f730fe4470..f27c7d5f71 100644 --- a/web3.js/package.json +++ b/web3.js/package.json @@ -83,7 +83,7 @@ "mz": "^2.7.0", "node-fetch": "^2.2.0", "npm-run-all": "^4.1.5", - "rpc-websockets": "^7.4.0", + "rpc-websockets": "^7.4.2", "superstruct": "^0.8.3", "tweetnacl": "^1.0.0", "ws": "^7.0.0" diff --git a/web3.js/src/connection.js b/web3.js/src/connection.js index 09e8506000..bcf15146bb 100644 --- a/web3.js/src/connection.js +++ b/web3.js/src/connection.js @@ -1365,7 +1365,9 @@ export type ConfirmedSignatureInfo = { export class Connection { _rpcRequest: RpcRequest; _rpcWebSocket: RpcWebSocketClient; + _rpcWebSocketConnected: boolean = false; _rpcWebSocketHeartbeat: IntervalID | null = null; + _rpcWebSocketIdleTimeout: TimeoutID | null = null; _commitment: ?Commitment; _blockhashInfo: { @@ -2669,6 +2671,7 @@ export class Connection { * @private */ _wsOnOpen() { + this._rpcWebSocketConnected = true; this._rpcWebSocketHeartbeat = setInterval(() => { // Ping server every 5s to prevent idle timeouts this._rpcWebSocket.notify('ping').catch(() => {}); @@ -2686,9 +2689,17 @@ export class Connection { /** * @private */ - _wsOnClose() { + _wsOnClose(code: number) { clearInterval(this._rpcWebSocketHeartbeat); this._rpcWebSocketHeartbeat = null; + + if (code === 1000) { + // explicit close, check if any subscriptions have been made since close + this._updateSubscriptions(); + return; + } + + // implicit close, prepare subscriptions for auto-reconnect this._resetSubscriptions(); } @@ -2777,12 +2788,23 @@ export class Connection { signatureKeys.length === 0 && rootKeys.length === 0 ) { - this._rpcWebSocket.close(); + if (this._rpcWebSocketConnected) { + this._rpcWebSocketConnected = false; + this._rpcWebSocketIdleTimeout = setTimeout(() => { + this._rpcWebSocketIdleTimeout = null; + this._rpcWebSocket.close(); + }, 500); + } return; } - if (this._rpcWebSocketHeartbeat === null) { - this._resetSubscriptions(); + if (this._rpcWebSocketIdleTimeout !== null) { + clearTimeout(this._rpcWebSocketIdleTimeout); + this._rpcWebSocketIdleTimeout = null; + this._rpcWebSocketConnected = true; + } + + if (!this._rpcWebSocketConnected) { this._rpcWebSocket.connect(); return; } diff --git a/web3.js/test/__mocks__/rpc-websockets.js b/web3.js/test/__mocks__/rpc-websockets.js index 75a43e4b71..5619bcc1c5 100644 --- a/web3.js/test/__mocks__/rpc-websockets.js +++ b/web3.js/test/__mocks__/rpc-websockets.js @@ -1,10 +1,6 @@ // @flow -import { - Client as RpcWebSocketClient, - NodeWebSocketTypeOptions, - IWSClientAdditionalOptions, -} from 'rpc-websockets'; +import {Client as LiveClient} from 'rpc-websockets'; // Define TEST_LIVE in the environment to test against the real full node // identified by `url` instead of using the mock @@ -12,49 +8,24 @@ export const mockRpcEnabled = !process.env.TEST_LIVE; let mockNotice = true; -export class Client { - client: RpcWebSocketClient; - - constructor( - url: string, - options: NodeWebSocketTypeOptions & IWSClientAdditionalOptions, - ) { - //console.log('MockClient', url, options); - if (!mockRpcEnabled) { - if (mockNotice) { - console.log( - 'Note: rpc-websockets mock is disabled, testing live against', - url, - ); - mockNotice = false; - } - this.client = new RpcWebSocketClient(url, options); +class MockClient { + constructor(url: string) { + if (mockNotice) { + console.log( + 'Note: rpc-websockets mock is disabled, testing live against', + url, + ); + mockNotice = false; } } - connect() { - if (!mockRpcEnabled) { - return this.client.connect(); - } - } - - close() { - if (!mockRpcEnabled) { - return this.client.close(); - } - } - - on(event: string, callback: Function) { - if (!mockRpcEnabled) { - return this.client.on(event, callback); - } - //console.log('on', event); - } - - async call(method: string, params: Object): Promise { - if (!mockRpcEnabled) { - return await this.client.call(method, params); - } + connect() {} + close() {} + on() {} + call(): Promise { throw new Error('call unsupported'); } } + +const Client = mockRpcEnabled ? MockClient : LiveClient; +export {Client}; diff --git a/web3.js/test/websocket.test.js b/web3.js/test/websocket.test.js new file mode 100644 index 0000000000..39fac8fb32 --- /dev/null +++ b/web3.js/test/websocket.test.js @@ -0,0 +1,68 @@ +// @flow +import bs58 from 'bs58'; + +import {Connection} from '../src'; +import {url} from './url'; +import {mockRpcEnabled} from './__mocks__/node-fetch'; +import {sleep} from '../src/util/sleep'; + +describe('websocket', () => { + if (mockRpcEnabled) { + test('no-op', () => {}); + console.log('non-live test skipped'); + return; + } + + const connection = new Connection(url); + + test('connect and disconnect', async () => { + const testSignature = bs58.encode(Buffer.alloc(64)); + const id = connection.onSignature(testSignature, () => {}); + + // wait for websocket to connect + await sleep(100); + expect(connection._rpcWebSocketConnected).toBe(true); + expect(connection._rpcWebSocketHeartbeat).not.toBe(null); + + // test if socket is open + await connection._rpcWebSocket.notify('ping'); + + await connection.removeSignatureListener(id); + expect(connection._rpcWebSocketConnected).toBe(false); + expect(connection._rpcWebSocketHeartbeat).not.toBe(null); + expect(connection._rpcWebSocketIdleTimeout).not.toBe(null); + + // wait for websocket to disconnect + await sleep(1100); + expect(connection._rpcWebSocketConnected).toBe(false); + expect(connection._rpcWebSocketHeartbeat).toBe(null); + expect(connection._rpcWebSocketIdleTimeout).toBe(null); + + // test if socket is closed + await expect(connection._rpcWebSocket.notify('ping')).rejects.toThrow( + 'socket not ready', + ); + }); + + test('idle timeout', async () => { + const testSignature = bs58.encode(Buffer.alloc(64)); + const id = connection.onSignature(testSignature, () => {}); + + // wait for websocket to connect + await sleep(100); + expect(connection._rpcWebSocketIdleTimeout).toBe(null); + + await connection.removeSignatureListener(id); + expect(connection._rpcWebSocketIdleTimeout).not.toBe(null); + + const nextId = connection.onSignature(testSignature, () => {}); + expect(connection._rpcWebSocketIdleTimeout).toBe(null); + + await connection.removeSignatureListener(nextId); + expect(connection._rpcWebSocketIdleTimeout).not.toBe(null); + + // wait for websocket to disconnect + await sleep(1100); + expect(connection._rpcWebSocketIdleTimeout).toBe(null); + }); +});