fix: the one where we fix client subscriptions (#24473)

* chore: create a first-class type to distinguish client subscription ids from server subscription ids

* chore: add fast-stable-stringify as a dependency to web3.js

* fix: reimplement the subscription tracker as a state machine

* test: updated tests to recognize that signatureUnsubscribe happens synchronously

* chore: add sinon-chai so that we can make assertions on calling spies

* test: coverage for the full range of subscription state transitions

* fix: special case auto-disposing subscriptions like signatureSubscribe

* fix: document Subscription type (SQUASH THIS)

* fix: strict undefined checks (SQUASH THIS)

* fix: naming (SQUASH THIS)

* fix: move defaulting to source (SQUASH THIS)

* fix: build RPC args first, then produce the subscription topic hash (SQUASH THIS)

* fix: dispose handles no longer track whether they've been called (SQUASH THIS)

* fix: shore up the auto-disposing sub tests now that double-free doesn't fatal (SQUASH THIS)

* fix: write documentation explaining how and why to apply a default commitment (SQUASH THIS)

* fix: skip subscriptions that have been deleted when recursing (SQUASH THIS)

* fix: bail on async responses when the connection has been recycled (SQUASH THIS)

* fix: typo in comment (SQUASH THIS)

* chore: comment on why notification callbacks are ts-ignored

* chore: start all the new stuff out as private members of the Connection class
This commit is contained in:
Steven Luscher 2022-04-28 16:21:39 -07:00 committed by GitHub
parent b4ade0d48a
commit 2e617ba4fd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 8768 additions and 6373 deletions

File diff suppressed because it is too large Load Diff

View File

@ -64,10 +64,12 @@
"bs58": "^4.0.1",
"buffer": "6.0.1",
"cross-fetch": "^3.1.4",
"fast-stable-stringify": "^1.0.0",
"jayson": "^3.4.4",
"js-sha3": "^0.8.0",
"rpc-websockets": "^7.4.2",
"secp256k1": "^4.0.2",
"sinon-chai": "^3.7.0",
"superstruct": "^0.14.2",
"tweetnacl": "^1.0.0"
},

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,925 @@
import chai from 'chai';
import {Client} from 'rpc-websockets';
import {stub, SinonStubbedInstance, SinonSpy, spy} from 'sinon';
import sinonChai from 'sinon-chai';
import {
AccountChangeCallback,
Commitment,
Connection,
LogsCallback,
ProgramAccountChangeCallback,
PublicKey,
RootChangeCallback,
SignatureResultCallback,
SlotChangeCallback,
SlotUpdateCallback,
} from '../src';
import {url} from './url';
chai.use(sinonChai);
const expect = chai.expect;
function stubRpcWebSocket(
connection: Connection,
): SinonStubbedInstance<Client> {
const socket = connection._rpcWebSocket;
let mockOpen = false;
stub(socket, 'connect').callsFake(() => {
if (!mockOpen) {
mockOpen = true;
connection._rpcWebSocket.emit('open');
}
});
stub(socket, 'close').callsFake(() => {
if (mockOpen) {
mockOpen = false;
connection._rpcWebSocket.emit('close');
}
});
stub(socket, 'call');
return socket as SinonStubbedInstance<Client>;
}
describe('Subscriptions', () => {
let connection: Connection;
let stubbedSocket: SinonStubbedInstance<Client>;
const subscriptionMethodsConfig = {
accountSubscribe: {
getExpectedAlternateParams: () => [
'C2jDL4pcwpE2pP5EryTGn842JJUJTcurPGZUquQjySxK',
{
commitment: connection.commitment || 'finalized',
encoding: 'base64',
},
],
getExpectedParams: () => [
PublicKey.default.toBase58(),
{
commitment: connection.commitment || 'finalized',
encoding: 'base64',
},
],
setupAlternateListener(callback: AccountChangeCallback): number {
return connection.onAccountChange(
new PublicKey('C2jDL4pcwpE2pP5EryTGn842JJUJTcurPGZUquQjySxK'),
callback,
);
},
setupListener(callback: AccountChangeCallback): number {
return connection.onAccountChange(PublicKey.default, callback);
},
setupListenerWithDefaultsOmitted(
callback: AccountChangeCallback,
): number {
return connection.onAccountChange(PublicKey.default, callback);
},
setupListenerWithDefaultableParamsSetToTheirDefaults(
callback: AccountChangeCallback,
): number {
return connection.onAccountChange(
PublicKey.default,
callback,
connection.commitment || 'finalized',
);
},
publishNotificationForServerSubscriptionId(
socket: Client,
serverSubscriptionId: number,
) {
socket.emit('accountNotification', {
subscription: serverSubscriptionId,
result: {
context: {slot: 11},
value: {
data: Buffer.from(''),
executable: false,
lamports: 0,
owner: PublicKey.default.toBase58(),
rentEpoch: 0,
},
},
});
},
teardownListener(
...args: Parameters<typeof connection.removeAccountChangeListener>
) {
return connection.removeAccountChangeListener(...args);
},
},
logsSubscribe: {
getExpectedAlternateParams: () => [
{mentions: ['C2jDL4pcwpE2pP5EryTGn842JJUJTcurPGZUquQjySxK']},
{commitment: connection.commitment || 'finalized'},
],
getExpectedParams: () => [
{mentions: [PublicKey.default.toBase58()]},
{commitment: connection.commitment || 'finalized'},
],
setupAlternateListener(callback: LogsCallback): number {
return connection.onLogs(
new PublicKey('C2jDL4pcwpE2pP5EryTGn842JJUJTcurPGZUquQjySxK'),
callback,
);
},
setupListener(callback: LogsCallback): number {
return connection.onLogs(PublicKey.default, callback);
},
setupListenerWithDefaultsOmitted(callback: LogsCallback): number {
return connection.onLogs(PublicKey.default, callback);
},
setupListenerWithDefaultableParamsSetToTheirDefaults(
callback: LogsCallback,
): number {
return connection.onLogs(
PublicKey.default,
callback,
connection.commitment || 'finalized',
);
},
publishNotificationForServerSubscriptionId(
socket: Client,
serverSubscriptionId: number,
) {
socket.emit('logsNotification', {
subscription: serverSubscriptionId,
result: {
context: {slot: 11},
value: {
err: null,
logs: [
'BPF program 83astBRguLMdt2h5U1Tpdq5tjFoJ6noeGwaY3mDLVcri success',
],
signature:
'5h6xBEauJ3PK6SWCZ1PGjBvj8vDdWG3KpwATGy1ARAXFSDwt8GFXM7W5Ncn16wmqokgpiKRLuS83KUxyZyv2sUYv',
},
},
});
},
teardownListener(
...args: Parameters<typeof connection.removeOnLogsListener>
) {
return connection.removeOnLogsListener(...args);
},
},
programSubscribe: {
getExpectedAlternateParams: () => [
'C2jDL4pcwpE2pP5EryTGn842JJUJTcurPGZUquQjySxK',
{commitment: connection.commitment || 'finalized', encoding: 'base64'},
],
getExpectedParams: () => [
PublicKey.default.toBase58(),
{commitment: connection.commitment || 'finalized', encoding: 'base64'},
],
setupAlternateListener(callback: ProgramAccountChangeCallback): number {
return connection.onProgramAccountChange(
new PublicKey('C2jDL4pcwpE2pP5EryTGn842JJUJTcurPGZUquQjySxK'),
callback,
);
},
setupListener(callback: ProgramAccountChangeCallback): number {
return connection.onProgramAccountChange(PublicKey.default, callback);
},
setupListenerWithDefaultsOmitted(
callback: ProgramAccountChangeCallback,
): number {
return connection.onProgramAccountChange(PublicKey.default, callback);
},
setupListenerWithDefaultableParamsSetToTheirDefaults(
callback: ProgramAccountChangeCallback,
): number {
return connection.onProgramAccountChange(
PublicKey.default,
callback,
connection.commitment || 'finalized',
);
},
publishNotificationForServerSubscriptionId(
socket: Client,
serverSubscriptionId: number,
) {
socket.emit('programNotification', {
subscription: serverSubscriptionId,
result: {
context: {slot: 11},
value: {
pubkey: PublicKey.default.toBase58(),
account: {
data: Buffer.from(''),
executable: false,
lamports: 0,
owner: PublicKey.default.toBase58(),
rentEpoch: 0,
},
},
},
});
},
teardownListener(
...args: Parameters<
typeof connection.removeProgramAccountChangeListener
>
) {
return connection.removeProgramAccountChangeListener(...args);
},
},
rootSubscribe: {
getExpectedAlternateParams: () => [],
getExpectedParams: () => [],
setupAlternateListener: undefined,
setupListener(callback: RootChangeCallback): number {
return connection.onRootChange(callback);
},
setupListenerWithDefaultsOmitted: undefined,
setupListenerWithDefaultableParamsSetToTheirDefaults: undefined,
publishNotificationForServerSubscriptionId(
socket: Client,
serverSubscriptionId: number,
) {
socket.emit('rootNotification', {
subscription: serverSubscriptionId,
result: 101,
});
},
teardownListener(
...args: Parameters<typeof connection.removeRootChangeListener>
) {
return connection.removeRootChangeListener(...args);
},
},
signatureSubscribe: {
getExpectedAlternateParams: () => [
'C2jDL4pcwpE2pP5EryTGn842JJUJTcurPGZUquQjySxK',
{commitment: connection.commitment || 'finalized'},
],
getExpectedParams: () => [
PublicKey.default.toBase58(),
{commitment: connection.commitment || 'finalized'},
],
setupAlternateListener(callback: SignatureResultCallback): number {
return connection.onSignature(
new PublicKey(
'C2jDL4pcwpE2pP5EryTGn842JJUJTcurPGZUquQjySxK',
).toBase58(),
callback,
);
},
setupListener(callback: SignatureResultCallback): number {
return connection.onSignature(PublicKey.default.toBase58(), callback);
},
setupListenerWithDefaultsOmitted(
callback: SignatureResultCallback,
): number {
return connection.onSignature(PublicKey.default.toBase58(), callback);
},
setupListenerWithDefaultableParamsSetToTheirDefaults(
callback: SignatureResultCallback,
): number {
return connection.onSignature(
PublicKey.default.toBase58(),
callback,
connection.commitment || 'finalized',
);
},
publishNotificationForServerSubscriptionId(
socket: Client,
serverSubscriptionId: number,
) {
socket.emit('signatureNotification', {
subscription: serverSubscriptionId,
result: {
context: {slot: 11},
value: {err: null},
},
});
},
teardownListener(
...args: Parameters<typeof connection.removeSignatureListener>
) {
return connection.removeSignatureListener(...args);
},
},
slotSubscribe: {
getExpectedAlternateParams: () => [],
getExpectedParams: () => [],
setupAlternateListener: undefined,
setupListener(callback: SlotChangeCallback): number {
return connection.onSlotChange(callback);
},
setupListenerWithDefaultsOmitted: undefined,
setupListenerWithDefaultableParamsSetToTheirDefaults: undefined,
publishNotificationForServerSubscriptionId(
socket: Client,
serverSubscriptionId: number,
) {
socket.emit('slotNotification', {
subscription: serverSubscriptionId,
result: {parent: 1, slot: 2, root: 0},
});
},
teardownListener(
...args: Parameters<typeof connection.removeSlotChangeListener>
) {
return connection.removeSlotChangeListener(...args);
},
},
slotsUpdatesSubscribe: {
getExpectedAlternateParams: () => [],
getExpectedParams: () => [],
setupAlternateListener: undefined,
setupListener(callback: SlotUpdateCallback): number {
return connection.onSlotUpdate(callback);
},
setupListenerWithDefaultsOmitted: undefined,
setupListenerWithDefaultableParamsSetToTheirDefaults: undefined,
publishNotificationForServerSubscriptionId(
socket: Client,
serverSubscriptionId: number,
) {
socket.emit('slotsUpdatesNotification', {
subscription: serverSubscriptionId,
result: {
type: 'root',
slot: 0,
timestamp: 322992000000,
},
});
},
teardownListener(
...args: Parameters<typeof connection.removeSlotUpdateListener>
) {
return connection.removeSlotUpdateListener(...args);
},
},
};
beforeEach(() => {
connection = new Connection(url);
stubbedSocket = stubRpcWebSocket(connection);
});
afterEach(() => {
stubbedSocket.close();
});
Object.entries(subscriptionMethodsConfig).forEach(
([
subscriptionMethod,
{
getExpectedAlternateParams,
getExpectedParams,
publishNotificationForServerSubscriptionId,
setupAlternateListener,
setupListener,
teardownListener,
},
]) => {
describe(`The \`${subscriptionMethod}\` RPC method`, () => {
describe('attaching the first notification listener', () => {
let clientSubscriptionId: number;
let listenerCallback: SinonSpy;
let acknowledgeSubscription = (
// eslint-disable-next-line @typescript-eslint/no-unused-vars
_serverSubscriptionId: number,
) => {
expect.fail(
'Expected a function to have been assigned to `acknowledgeSubscription` in the test.',
);
};
let fatalSubscription = () => {
expect.fail(
'Expected a function to have been assigned to `fatalSubscription` in the test.',
);
};
const serverSubscriptionId = 0;
beforeEach(() => {
stubbedSocket.call
.withArgs(subscriptionMethod, getExpectedParams())
.callsFake(
() =>
// Defer the acknowledgement.
new Promise<number>((resolve, reject) => {
acknowledgeSubscription = resolve;
fatalSubscription = reject;
}),
);
listenerCallback = spy();
clientSubscriptionId = setupListener(listenerCallback);
});
it('results in a subscription request being made to the RPC', () => {
expect(stubbedSocket.call).to.have.been.calledOnceWithExactly(
subscriptionMethod,
getExpectedParams(),
);
});
describe('then unsubscribing that listener before the subscription has been acknowledged by the server', () => {
beforeEach(async () => {
stubbedSocket.call.resetHistory();
await teardownListener(clientSubscriptionId);
});
describe('once the subscription has been acknowledged by the server', () => {
beforeEach(async () => {
await acknowledgeSubscription(serverSubscriptionId);
});
it('results in the subscription being torn down immediately', () => {
expect(stubbedSocket.call).to.have.been.calledOnceWithExactly(
subscriptionMethod.replace('Subscribe', 'Unsubscribe'),
[serverSubscriptionId],
);
});
});
});
describe('once the subscription has been acknowledged by the server', () => {
beforeEach(async () => {
await acknowledgeSubscription(serverSubscriptionId);
});
describe('when a notification is published', () => {
beforeEach(() => {
publishNotificationForServerSubscriptionId(
stubbedSocket,
serverSubscriptionId,
);
});
it('fires the listener callback', () => {
expect(listenerCallback).to.have.been.calledOnce;
});
});
describe('then unsubscribing that listener', () => {
let acknowledgeUnsubscribe = () => {
expect.fail(
'Expected a function to have been assigned to `acknowledgeUnsubscribe` in the test',
);
};
let fatalUnsubscribe = () => {
expect.fail(
'Expected a function to have been assigned to `fatalUnsubscribe` in the test',
);
};
beforeEach(() => {
stubbedSocket.call.resetHistory();
stubbedSocket.call
.withArgs(
subscriptionMethod.replace('Subscribe', 'Unsubscribe'),
[serverSubscriptionId],
)
.callsFake(
() =>
// Defer the acknowledgement.
new Promise<void>((resolve, reject) => {
acknowledgeUnsubscribe = resolve;
fatalUnsubscribe = reject;
}),
);
teardownListener(clientSubscriptionId);
});
it('results in an unsubscribe request being made to the RPC', () => {
expect(stubbedSocket.call).to.have.been.calledOnceWithExactly(
subscriptionMethod.replace('Subscribe', 'Unsubscribe'),
[serverSubscriptionId],
);
});
describe('if a new listener is added before the unsubscribe is acknowledged by the server', () => {
beforeEach(() => {
stubbedSocket.call.resetHistory();
setupListener(spy());
});
describe('once that unsubscribe is acknowledged by the server', () => {
beforeEach(async () => {
await acknowledgeUnsubscribe();
});
it('results in a new subscription request being made to the RPC', () => {
expect(
stubbedSocket.call,
).to.have.been.calledOnceWithExactly(
subscriptionMethod,
getExpectedParams(),
);
});
});
});
describe('when a notification is published before the unsubscribe is acknowledged by the server', () => {
beforeEach(() => {
publishNotificationForServerSubscriptionId(
stubbedSocket,
serverSubscriptionId,
);
});
it('does not fire the listener callback', () => {
expect(listenerCallback).not.to.have.been.called;
});
});
describe('if that unsubscribe throws an exception', () => {
beforeEach(async () => {
stubbedSocket.call.resetHistory();
await fatalUnsubscribe();
});
it('results in a retry unsubscribe request being made to the RPC', () => {
expect(stubbedSocket.call).to.have.been.calledOnceWithExactly(
subscriptionMethod.replace('Subscribe', 'Unsubscribe'),
[serverSubscriptionId],
);
});
});
describe('then having the socket connection drop unexpectedly', () => {
beforeEach(() => {
stubbedSocket.emit('close');
});
describe('upon the socket connection reopening', () => {
let fatalPriorUnubscribe;
beforeEach(() => {
fatalPriorUnubscribe = fatalUnsubscribe;
stubbedSocket.call.resetHistory();
stubbedSocket.emit('open');
});
it('does not result in a new unsubscription request being made to the RPC', () => {
expect(stubbedSocket.call).not.to.have.been.called;
});
describe('then upon the prior unsubscribe fataling (eg. because its timeout triggers)', () => {
beforeEach(async () => {
stubbedSocket.call.resetHistory();
await fatalPriorUnubscribe();
});
it('does not result in a new unsubscription request being made to the RPC', () => {
expect(stubbedSocket.call).not.to.have.been.called;
});
});
});
});
});
describe('attaching a second notification listener with the same params', () => {
let secondListenerCallback: SinonSpy;
beforeEach(() => {
stubbedSocket.call.resetHistory();
secondListenerCallback = spy();
setupListener(secondListenerCallback);
});
it('does not result in a second subscription request to the RPC', () => {
expect(stubbedSocket.call).not.to.have.been.called;
});
describe('when a notification is published', () => {
beforeEach(() => {
publishNotificationForServerSubscriptionId(
stubbedSocket,
serverSubscriptionId,
);
});
it("fires the first listener's callback", () => {
expect(listenerCallback).to.have.been.calledOnce;
});
it("fires the second listener's callback", () => {
expect(secondListenerCallback).to.have.been.calledOnce;
});
});
describe('then unsubscribing the first listener', () => {
beforeEach(async () => {
stubbedSocket.call.resetHistory();
await teardownListener(clientSubscriptionId);
});
it('does not result in an unsubscribe request being made to the RPC', () => {
expect(stubbedSocket.call).not.to.have.been.called;
});
describe('when a notification is published', () => {
beforeEach(() => {
publishNotificationForServerSubscriptionId(
stubbedSocket,
serverSubscriptionId,
);
});
it("does not fire the first listener's callback", () => {
expect(listenerCallback).not.to.have.been.called;
});
it("fires the second listener's callback", () => {
expect(secondListenerCallback).to.have.been.calledOnce;
});
});
});
});
if (setupAlternateListener) {
describe('attaching a second notification listener with different params', () => {
let alternateListenerCallback: SinonSpy;
const secondServerSubscriptionId = 1;
beforeEach(() => {
stubbedSocket.call
.withArgs(subscriptionMethod, getExpectedAlternateParams())
.resolves(secondServerSubscriptionId);
alternateListenerCallback = spy();
setupAlternateListener(alternateListenerCallback);
});
it('results in a second subscription request being made to the RPC', () => {
expect(stubbedSocket.call).to.have.been.calledWithExactly(
subscriptionMethod,
getExpectedAlternateParams(),
);
});
describe('when a notification for the first subscription is published', () => {
beforeEach(() => {
publishNotificationForServerSubscriptionId(
stubbedSocket,
serverSubscriptionId,
);
});
it("fires the first listener's callback", () => {
expect(listenerCallback).to.have.been.called;
});
it("does not fire the second listener's callback", () => {
expect(alternateListenerCallback).not.to.have.been.called;
});
});
describe('when a notification for the second subscription is published', () => {
beforeEach(() => {
publishNotificationForServerSubscriptionId(
stubbedSocket,
secondServerSubscriptionId,
);
});
it("does not fire the first listener's callback", () => {
expect(listenerCallback).not.to.have.been.called;
});
it("fires the second listener's callback", () => {
expect(alternateListenerCallback).to.have.been.called;
});
});
});
}
});
describe('if that subscription throws an exception', () => {
beforeEach(async () => {
stubbedSocket.call.resetHistory();
await fatalSubscription();
});
it('results in a retry subscription request being made to the RPC', () => {
expect(stubbedSocket.call).to.have.been.calledOnceWithExactly(
subscriptionMethod,
getExpectedParams(),
);
});
});
describe('then having the socket connection drop unexpectedly', () => {
beforeEach(() => {
stubbedSocket.emit('close');
});
describe('then unsubscribing that listener', () => {
beforeEach(async () => {
await teardownListener(clientSubscriptionId);
});
describe('upon the socket connection reopening', () => {
beforeEach(() => {
stubbedSocket.call.resetHistory();
stubbedSocket.emit('open');
});
it('does not result in a new subscription request being made to the RPC', () => {
expect(stubbedSocket.call).not.to.have.been.called;
});
});
});
describe('upon the socket connection reopening', () => {
let fatalPriorSubscription;
beforeEach(() => {
fatalPriorSubscription = fatalSubscription;
stubbedSocket.call.resetHistory();
stubbedSocket.emit('open');
});
it('results in a new subscription request being made to the RPC', () => {
expect(stubbedSocket.call).to.have.been.calledOnceWithExactly(
subscriptionMethod,
getExpectedParams(),
);
});
describe('then upon the prior subscription fataling (eg. because its timeout triggers)', () => {
beforeEach(async () => {
stubbedSocket.call.resetHistory();
await fatalPriorSubscription();
});
it('does not result in a new subscription request being made to the RPC', () => {
expect(stubbedSocket.call).not.to.have.been.called;
});
describe('once the new subscription has been acknowledged by the server', () => {
beforeEach(async () => {
stubbedSocket.call.resetHistory();
await acknowledgeSubscription(serverSubscriptionId);
});
describe('when a notification is published', () => {
beforeEach(() => {
publishNotificationForServerSubscriptionId(
stubbedSocket,
serverSubscriptionId,
);
});
it('fires the listener callback', () => {
expect(listenerCallback).to.have.been.calledOnce;
});
});
});
});
});
});
});
});
},
);
/**
* Special case.
* After a signature is processed, RPCs automatically dispose of the
* subscription on the server side. This test asserts that RPC
* unsubscribe request are only made before such a subscription has
* received a notification which it knows to be final and indicative
* that the RPC has auto-disposed the subscription.
*
* NOTE: There is a proposal to eliminate this special case, here:
* https://github.com/solana-labs/solana/issues/18892
*/
describe('auto-disposing subscriptions', () => {
let clientSubscriptionId: number;
const serverSubscriptionId = 0;
const testSignature = 'C2jDL4pcwpE2pP5EryTGn842JJUJTcurPGZUquQjySxK';
const getExpectedParams = () => [
testSignature,
{commitment: connection.commitment || 'finalized'},
];
// This type of notification *is* indicative of auto-disposal.
const FINAL_NOTIFICATION_RESULT = {
context: {slot: 11},
value: {err: null},
};
// This type of notification is *not* indicative of auto-disposal.
const NON_FINAL_NOTIFICATION_RESULT = {
context: {slot: 11},
value: 'receivedSignature',
};
beforeEach(() => {
stubbedSocket.call
.withArgs('signatureSubscribe', getExpectedParams())
.resolves(serverSubscriptionId);
clientSubscriptionId = connection.onSignature(testSignature, spy());
});
describe('before an auto-disposing subscription has published any notification', () => {
describe('then unsubscribing the listener', () => {
beforeEach(async () => {
stubbedSocket.call.resetHistory();
await connection.removeSignatureListener(clientSubscriptionId);
});
it('results in an unsubscribe request being made to the RPC', () => {
expect(stubbedSocket.call).to.have.been.calledWith(
'signatureUnsubscribe',
[serverSubscriptionId],
);
});
});
});
describe('after an auto-disposing subscription has published a non-final notification', () => {
beforeEach(() => {
stubbedSocket.emit('signatureNotification', {
subscription: serverSubscriptionId,
result: NON_FINAL_NOTIFICATION_RESULT,
});
});
it('should not result in an unsubscribe request being made to the RPC', () => {
expect(stubbedSocket.call).not.to.have.been.calledWith(
'signatureUnsubscribe',
[serverSubscriptionId],
);
});
describe('then unsubscribing the listener', () => {
beforeEach(async () => {
stubbedSocket.call.resetHistory();
await connection.removeSignatureListener(clientSubscriptionId);
});
it('results in an unsubscribe request being made to the RPC', () => {
expect(stubbedSocket.call).to.have.been.calledWith(
'signatureUnsubscribe',
[serverSubscriptionId],
);
});
});
});
describe('after an auto-disposing subscription has published its final notification', () => {
beforeEach(() => {
stubbedSocket.emit('signatureNotification', {
subscription: serverSubscriptionId,
result: FINAL_NOTIFICATION_RESULT,
});
});
it('should not result in an unsubscribe request being made to the RPC', () => {
expect(stubbedSocket.call).not.to.have.been.calledWith(
'signatureUnsubscribe',
[serverSubscriptionId],
);
});
describe('then unsubscribing the listener', () => {
beforeEach(async () => {
stubbedSocket.call.resetHistory();
await connection.removeSignatureListener(clientSubscriptionId);
});
it('should not result in an unsubscribe request being made to the RPC', () => {
expect(stubbedSocket.call).not.to.have.been.called;
});
});
});
});
[
undefined, // Let `Connection` use the default commitment
'processed' as Commitment, // Override `Connection's` commitment
].forEach((maybeOverrideCommitment: Commitment | undefined) => {
describe(`given a Connection with ${
maybeOverrideCommitment
? `its commitment overriden to \`${maybeOverrideCommitment}\``
: 'an unspecified commitment override'
}`, () => {
Object.entries(subscriptionMethodsConfig).forEach(
([
subscriptionMethod,
{
getExpectedParams,
setupListenerWithDefaultableParamsSetToTheirDefaults,
setupListenerWithDefaultsOmitted,
},
]) => {
beforeEach(() => {
connection = new Connection(url, maybeOverrideCommitment);
stubbedSocket = stubRpcWebSocket(connection);
});
afterEach(() => {
stubbedSocket.close();
});
if (
setupListenerWithDefaultsOmitted &&
setupListenerWithDefaultableParamsSetToTheirDefaults
) {
describe('making a subscription with defaulted params omitted', () => {
beforeEach(() => {
setupListenerWithDefaultsOmitted(spy());
});
it('results in a subscription request being made to the RPC', () => {
expect(stubbedSocket.call).to.have.been.calledWithExactly(
subscriptionMethod,
getExpectedParams(),
);
});
describe('then making the same subscription with the defaultable params set to their defaults', () => {
beforeEach(() => {
stubbedSocket.call.resetHistory();
setupListenerWithDefaultableParamsSetToTheirDefaults(spy());
});
it('does not result in a subscription request being made to the RPC', () => {
expect(stubbedSocket.call).not.to.have.been.called;
});
});
});
}
},
);
});
});
describe('during state machine updates', () => {
beforeEach(() => {
stubbedSocket.connect.callsFake(() => {});
stubbedSocket.close.callsFake(() => {});
});
afterEach(() => {
stubbedSocket.emit('close');
});
/**
* This is a regression test for the case described here:
* https://github.com/solana-labs/solana/pull/24473#discussion_r858437090
*
* Essentially, you want to make sure that the state processor, as it recurses
* always processes the latest version of every subscription. Depending on how
* you craft the loop inside the processor, you can end up in this situation.
*
* 1A (pending subscription with zero callbacks; gets deleted then recurses)
* L 2B (pending subscription; transitions to subscribing and makes network call)
* 2A (old version of subscription 2; transitions again and makes 2nd network call)
*
* The fact that subscription 2 made two network calls is the bug there.
* What you want is this:
*
* 1A (pending subscription with zero callbacks; gets deleted then recurses)
* L 2B (pending subscription; transitions to subscribing and makes network call)
* 2A (now in the subscribing state; skipped by the processor)
*
* Below is a test that tries to replicate this exact scenario.
*/
it('the processor always operates over the most up-to-date state of a given subscription', () => {
// Add two subscriptions.
const clientSubscriptionIdA = connection.onAccountChange(
new PublicKey('C2jDL4pcwpE2pP5EryTGn842JJUJTcurPGZUquQjySxK'),
() => {},
);
connection.onAccountChange(
new PublicKey('27Y78XJXG9A13pnPajrB1VYU6EF8uNSoojPZBmhKsi8C'),
() => {},
);
// Then remove the first one before the connection opens.
connection.removeAccountChangeListener(clientSubscriptionIdA);
// Then open the connection.
stubbedSocket.emit('open');
// Despite recursion inside the state machine, ensure that the second
// subscription only makes *one* connection attempt.
expect(stubbedSocket.call).to.have.been.calledOnceWithExactly(
'accountSubscribe',
[
'27Y78XJXG9A13pnPajrB1VYU6EF8uNSoojPZBmhKsi8C',
{commitment: 'finalized', encoding: 'base64'},
],
);
});
});
});

View File

@ -202,6 +202,11 @@ const processTransaction = async ({
params: [signature, {commitment}],
result: {err: err || null},
});
await mockRpcMessage({
method: 'signatureUnsubscribe',
params: [1],
result: true,
});
return await connection.confirmTransaction(signature, commitment);
};
@ -228,6 +233,11 @@ const airdrop = async ({
params: [signature, {commitment: 'confirmed'}],
result: {err: null},
});
await mockRpcMessage({
method: 'signatureUnsubscribe',
params: [1],
result: true,
});
await connection.confirmTransaction(signature, 'confirmed');
return signature;

View File

@ -93,7 +93,12 @@ class MockClient {
];
expect(method).to.eq(mockRequest.method);
expect(params).to.eql(mockRequest.params);
if (method.endsWith('Unsubscribe')) {
expect(params.length).to.eq(1);
expect(params[0]).to.be.a('number');
} else {
expect(params).to.eql(mockRequest.params);
}
let id = ++this.subscriptionCounter;
const response = {

View File

@ -54,6 +54,9 @@ if (process.env.TEST_LIVE) {
expect(connection._rpcWebSocketIdleTimeout).not.to.eq(null);
const nextId = connection.onSignature(testSignature, () => {});
// wait for websocket to connect
await sleep(100);
expect(connection._rpcWebSocketIdleTimeout).to.eq(null);
await connection.removeSignatureListener(nextId);

File diff suppressed because it is too large Load Diff