feat: add idle timeout and fix subscription race (#12093)

This commit is contained in:
Justin Starry 2020-09-07 23:12:22 +08:00 committed by GitHub
parent ee646aa7a2
commit cfe9b8b744
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 115 additions and 54 deletions

View File

@ -20756,15 +20756,15 @@
} }
}, },
"rpc-websockets": { "rpc-websockets": {
"version": "7.4.0", "version": "7.4.2",
"resolved": "https://registry.npmjs.org/rpc-websockets/-/rpc-websockets-7.4.0.tgz", "resolved": "https://registry.npmjs.org/rpc-websockets/-/rpc-websockets-7.4.2.tgz",
"integrity": "sha512-FCPhQyD2Lfg1rOm8SOVL/FiDw7/3e9MDVeLqSSb4KBWUWmcJb/WcWtjkIXm6ecarbNmHntVLx8KH3Exyj0hMDA==", "integrity": "sha512-kUpYcnbEU/BeAxGTlfySZ/tp9FU+TLSgONbViyx6hQsIh8876uxggJWzVOCe+CztBvuCOAOd0BXyPlKfcflykw==",
"requires": { "requires": {
"@babel/runtime": "^7.11.2", "@babel/runtime": "^7.11.2",
"assert-args": "^1.2.1", "assert-args": "^1.2.1",
"bufferutil": "^4.0.1", "bufferutil": "^4.0.1",
"circular-json": "^0.5.9", "circular-json": "^0.5.9",
"eventemitter3": "^4.0.6", "eventemitter3": "^4.0.7",
"utf-8-validate": "^5.0.2", "utf-8-validate": "^5.0.2",
"uuid": "^8.3.0", "uuid": "^8.3.0",
"ws": "^7.3.1" "ws": "^7.3.1"

View File

@ -83,7 +83,7 @@
"mz": "^2.7.0", "mz": "^2.7.0",
"node-fetch": "^2.2.0", "node-fetch": "^2.2.0",
"npm-run-all": "^4.1.5", "npm-run-all": "^4.1.5",
"rpc-websockets": "^7.4.0", "rpc-websockets": "^7.4.2",
"superstruct": "^0.8.3", "superstruct": "^0.8.3",
"tweetnacl": "^1.0.0", "tweetnacl": "^1.0.0",
"ws": "^7.0.0" "ws": "^7.0.0"

View File

@ -1365,7 +1365,9 @@ export type ConfirmedSignatureInfo = {
export class Connection { export class Connection {
_rpcRequest: RpcRequest; _rpcRequest: RpcRequest;
_rpcWebSocket: RpcWebSocketClient; _rpcWebSocket: RpcWebSocketClient;
_rpcWebSocketConnected: boolean = false;
_rpcWebSocketHeartbeat: IntervalID | null = null; _rpcWebSocketHeartbeat: IntervalID | null = null;
_rpcWebSocketIdleTimeout: TimeoutID | null = null;
_commitment: ?Commitment; _commitment: ?Commitment;
_blockhashInfo: { _blockhashInfo: {
@ -2669,6 +2671,7 @@ export class Connection {
* @private * @private
*/ */
_wsOnOpen() { _wsOnOpen() {
this._rpcWebSocketConnected = true;
this._rpcWebSocketHeartbeat = setInterval(() => { this._rpcWebSocketHeartbeat = setInterval(() => {
// Ping server every 5s to prevent idle timeouts // Ping server every 5s to prevent idle timeouts
this._rpcWebSocket.notify('ping').catch(() => {}); this._rpcWebSocket.notify('ping').catch(() => {});
@ -2686,9 +2689,17 @@ export class Connection {
/** /**
* @private * @private
*/ */
_wsOnClose() { _wsOnClose(code: number) {
clearInterval(this._rpcWebSocketHeartbeat); clearInterval(this._rpcWebSocketHeartbeat);
this._rpcWebSocketHeartbeat = null; this._rpcWebSocketHeartbeat = null;
if (code === 1000) {
// explicit close, check if any subscriptions have been made since close
this._updateSubscriptions();
return;
}
// implicit close, prepare subscriptions for auto-reconnect
this._resetSubscriptions(); this._resetSubscriptions();
} }
@ -2777,12 +2788,23 @@ export class Connection {
signatureKeys.length === 0 && signatureKeys.length === 0 &&
rootKeys.length === 0 rootKeys.length === 0
) { ) {
this._rpcWebSocket.close(); if (this._rpcWebSocketConnected) {
this._rpcWebSocketConnected = false;
this._rpcWebSocketIdleTimeout = setTimeout(() => {
this._rpcWebSocketIdleTimeout = null;
this._rpcWebSocket.close();
}, 500);
}
return; return;
} }
if (this._rpcWebSocketHeartbeat === null) { if (this._rpcWebSocketIdleTimeout !== null) {
this._resetSubscriptions(); clearTimeout(this._rpcWebSocketIdleTimeout);
this._rpcWebSocketIdleTimeout = null;
this._rpcWebSocketConnected = true;
}
if (!this._rpcWebSocketConnected) {
this._rpcWebSocket.connect(); this._rpcWebSocket.connect();
return; return;
} }

View File

@ -1,10 +1,6 @@
// @flow // @flow
import { import {Client as LiveClient} from 'rpc-websockets';
Client as RpcWebSocketClient,
NodeWebSocketTypeOptions,
IWSClientAdditionalOptions,
} from 'rpc-websockets';
// Define TEST_LIVE in the environment to test against the real full node // Define TEST_LIVE in the environment to test against the real full node
// identified by `url` instead of using the mock // identified by `url` instead of using the mock
@ -12,49 +8,24 @@ export const mockRpcEnabled = !process.env.TEST_LIVE;
let mockNotice = true; let mockNotice = true;
export class Client { class MockClient {
client: RpcWebSocketClient; constructor(url: string) {
if (mockNotice) {
constructor( console.log(
url: string, 'Note: rpc-websockets mock is disabled, testing live against',
options: NodeWebSocketTypeOptions & IWSClientAdditionalOptions, url,
) { );
//console.log('MockClient', url, options); mockNotice = false;
if (!mockRpcEnabled) {
if (mockNotice) {
console.log(
'Note: rpc-websockets mock is disabled, testing live against',
url,
);
mockNotice = false;
}
this.client = new RpcWebSocketClient(url, options);
} }
} }
connect() { connect() {}
if (!mockRpcEnabled) { close() {}
return this.client.connect(); on() {}
} call(): Promise<Object> {
}
close() {
if (!mockRpcEnabled) {
return this.client.close();
}
}
on(event: string, callback: Function) {
if (!mockRpcEnabled) {
return this.client.on(event, callback);
}
//console.log('on', event);
}
async call(method: string, params: Object): Promise<Object> {
if (!mockRpcEnabled) {
return await this.client.call(method, params);
}
throw new Error('call unsupported'); throw new Error('call unsupported');
} }
} }
const Client = mockRpcEnabled ? MockClient : LiveClient;
export {Client};

View File

@ -0,0 +1,68 @@
// @flow
import bs58 from 'bs58';
import {Connection} from '../src';
import {url} from './url';
import {mockRpcEnabled} from './__mocks__/node-fetch';
import {sleep} from '../src/util/sleep';
describe('websocket', () => {
if (mockRpcEnabled) {
test('no-op', () => {});
console.log('non-live test skipped');
return;
}
const connection = new Connection(url);
test('connect and disconnect', async () => {
const testSignature = bs58.encode(Buffer.alloc(64));
const id = connection.onSignature(testSignature, () => {});
// wait for websocket to connect
await sleep(100);
expect(connection._rpcWebSocketConnected).toBe(true);
expect(connection._rpcWebSocketHeartbeat).not.toBe(null);
// test if socket is open
await connection._rpcWebSocket.notify('ping');
await connection.removeSignatureListener(id);
expect(connection._rpcWebSocketConnected).toBe(false);
expect(connection._rpcWebSocketHeartbeat).not.toBe(null);
expect(connection._rpcWebSocketIdleTimeout).not.toBe(null);
// wait for websocket to disconnect
await sleep(1100);
expect(connection._rpcWebSocketConnected).toBe(false);
expect(connection._rpcWebSocketHeartbeat).toBe(null);
expect(connection._rpcWebSocketIdleTimeout).toBe(null);
// test if socket is closed
await expect(connection._rpcWebSocket.notify('ping')).rejects.toThrow(
'socket not ready',
);
});
test('idle timeout', async () => {
const testSignature = bs58.encode(Buffer.alloc(64));
const id = connection.onSignature(testSignature, () => {});
// wait for websocket to connect
await sleep(100);
expect(connection._rpcWebSocketIdleTimeout).toBe(null);
await connection.removeSignatureListener(id);
expect(connection._rpcWebSocketIdleTimeout).not.toBe(null);
const nextId = connection.onSignature(testSignature, () => {});
expect(connection._rpcWebSocketIdleTimeout).toBe(null);
await connection.removeSignatureListener(nextId);
expect(connection._rpcWebSocketIdleTimeout).not.toBe(null);
// wait for websocket to disconnect
await sleep(1100);
expect(connection._rpcWebSocketIdleTimeout).toBe(null);
});
});