fix: web3.js transaction confirmation now double-checks for already-confirmed txs (#28290)

* chore: create internal method for subscribing to subscription state changes

* add status pool

* fix tests

* more tests

* syntax fix

* variable rename

* fix test

* comment fix

* remove getSignatureStatuses pooling

* rename variable

* IIFE

* wait for subscription

* fix interval clear

* test: you can now pause the establishment of subscriptions in tests

* feat: implementation of signature status check after setting up signature subscription

Co-authored-by: steveluscher <me+github@steveluscher.com>
This commit is contained in:
Adrian Brzeziński 2022-11-28 20:55:56 +01:00 committed by GitHub
parent 9f370475d4
commit d16810ec84
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 222 additions and 21 deletions

View File

@ -88,6 +88,10 @@ type ClientSubscriptionId = number;
/** @internal */ type ServerSubscriptionId = number; /** @internal */ type ServerSubscriptionId = number;
/** @internal */ type SubscriptionConfigHash = string; /** @internal */ type SubscriptionConfigHash = string;
/** @internal */ type SubscriptionDisposeFn = () => Promise<void>; /** @internal */ type SubscriptionDisposeFn = () => Promise<void>;
/** @internal */ type SubscriptionStateChangeCallback = (
nextState: StatefulSubscription['state'],
) => void;
/** @internal */ type SubscriptionStateChangeDisposeFn = () => void;
/** /**
* @internal * @internal
* Every subscription contains the args used to open the subscription with * Every subscription contains the args used to open the subscription with
@ -2715,6 +2719,16 @@ export class Connection {
| SubscriptionDisposeFn | SubscriptionDisposeFn
| undefined; | undefined;
} = {}; } = {};
/** @internal */ private _subscriptionHashByClientSubscriptionId: {
[clientSubscriptionId: ClientSubscriptionId]:
| SubscriptionConfigHash
| undefined;
} = {};
/** @internal */ private _subscriptionStateChangeCallbacksByHash: {
[hash: SubscriptionConfigHash]:
| Set<SubscriptionStateChangeCallback>
| undefined;
} = {};
/** @internal */ private _subscriptionCallbacksByServerSubscriptionId: { /** @internal */ private _subscriptionCallbacksByServerSubscriptionId: {
[serverSubscriptionId: ServerSubscriptionId]: [serverSubscriptionId: ServerSubscriptionId]:
| Set<SubscriptionConfig['callback']> | Set<SubscriptionConfig['callback']>
@ -3372,7 +3386,10 @@ export class Connection {
const subscriptionCommitment = commitment || this.commitment; const subscriptionCommitment = commitment || this.commitment;
let timeoutId; let timeoutId;
let subscriptionId; let signatureSubscriptionId: number | undefined;
let disposeSignatureSubscriptionStateChangeObserver:
| SubscriptionStateChangeDisposeFn
| undefined;
let done = false; let done = false;
const confirmationPromise = new Promise<{ const confirmationPromise = new Promise<{
@ -3380,10 +3397,10 @@ export class Connection {
response: RpcResponseAndContext<SignatureResult>; response: RpcResponseAndContext<SignatureResult>;
}>((resolve, reject) => { }>((resolve, reject) => {
try { try {
subscriptionId = this.onSignature( signatureSubscriptionId = this.onSignature(
rawSignature, rawSignature,
(result: SignatureResult, context: Context) => { (result: SignatureResult, context: Context) => {
subscriptionId = undefined; signatureSubscriptionId = undefined;
const response = { const response = {
context, context,
value: result, value: result,
@ -3393,6 +3410,46 @@ export class Connection {
}, },
subscriptionCommitment, subscriptionCommitment,
); );
const subscriptionSetupPromise = new Promise<void>(
resolveSubscriptionSetup => {
if (signatureSubscriptionId == null) {
resolveSubscriptionSetup();
} else {
disposeSignatureSubscriptionStateChangeObserver =
this._onSubscriptionStateChange(
signatureSubscriptionId,
nextState => {
if (nextState === 'subscribed') {
resolveSubscriptionSetup();
}
},
);
}
},
);
(async () => {
await subscriptionSetupPromise;
if (done) return;
const response = await this.getSignatureStatus(rawSignature);
if (done) return;
if (response == null) {
return;
}
const {context, value} = response;
if (value?.err) {
reject(value.err);
}
if (value) {
done = true;
resolve({
__type: TransactionStatus.PROCESSED,
response: {
context,
value,
},
});
}
})();
} catch (err) { } catch (err) {
reject(err); reject(err);
} }
@ -3465,8 +3522,11 @@ export class Connection {
} }
} finally { } finally {
clearTimeout(timeoutId); clearTimeout(timeoutId);
if (subscriptionId) { if (disposeSignatureSubscriptionStateChangeObserver) {
this.removeSignatureListener(subscriptionId); disposeSignatureSubscriptionStateChangeObserver();
}
if (signatureSubscriptionId) {
this.removeSignatureListener(signatureSubscriptionId);
} }
} }
return result; return result;
@ -5106,13 +5166,60 @@ export class Connection {
Object.entries( Object.entries(
this._subscriptionsByHash as Record<SubscriptionConfigHash, Subscription>, this._subscriptionsByHash as Record<SubscriptionConfigHash, Subscription>,
).forEach(([hash, subscription]) => { ).forEach(([hash, subscription]) => {
this._subscriptionsByHash[hash] = { this._setSubscription(hash, {
...subscription, ...subscription,
state: 'pending', state: 'pending',
}; });
}); });
} }
/**
* @internal
*/
private _setSubscription(
hash: SubscriptionConfigHash,
nextSubscription: Subscription,
) {
const prevState = this._subscriptionsByHash[hash]?.state;
this._subscriptionsByHash[hash] = nextSubscription;
if (prevState !== nextSubscription.state) {
const stateChangeCallbacks =
this._subscriptionStateChangeCallbacksByHash[hash];
if (stateChangeCallbacks) {
stateChangeCallbacks.forEach(cb => {
try {
cb(nextSubscription.state);
// eslint-disable-next-line no-empty
} catch {}
});
}
}
}
/**
* @internal
*/
private _onSubscriptionStateChange(
clientSubscriptionId: ClientSubscriptionId,
callback: SubscriptionStateChangeCallback,
): SubscriptionStateChangeDisposeFn {
const hash =
this._subscriptionHashByClientSubscriptionId[clientSubscriptionId];
if (hash == null) {
return () => {};
}
const stateChangeCallbacks = (this._subscriptionStateChangeCallbacksByHash[
hash
] ||= new Set());
stateChangeCallbacks.add(callback);
return () => {
stateChangeCallbacks.delete(callback);
if (stateChangeCallbacks.size === 0) {
delete this._subscriptionStateChangeCallbacksByHash[hash];
}
};
}
/** /**
* @internal * @internal
*/ */
@ -5193,17 +5300,17 @@ export class Connection {
await (async () => { await (async () => {
const {args, method} = subscription; const {args, method} = subscription;
try { try {
this._subscriptionsByHash[hash] = { this._setSubscription(hash, {
...subscription, ...subscription,
state: 'subscribing', state: 'subscribing',
}; });
const serverSubscriptionId: ServerSubscriptionId = const serverSubscriptionId: ServerSubscriptionId =
(await this._rpcWebSocket.call(method, args)) as number; (await this._rpcWebSocket.call(method, args)) as number;
this._subscriptionsByHash[hash] = { this._setSubscription(hash, {
...subscription, ...subscription,
serverSubscriptionId, serverSubscriptionId,
state: 'subscribed', state: 'subscribed',
}; });
this._subscriptionCallbacksByServerSubscriptionId[ this._subscriptionCallbacksByServerSubscriptionId[
serverSubscriptionId serverSubscriptionId
] = subscription.callbacks; ] = subscription.callbacks;
@ -5220,10 +5327,10 @@ export class Connection {
return; return;
} }
// TODO: Maybe add an 'errored' state or a retry limit? // TODO: Maybe add an 'errored' state or a retry limit?
this._subscriptionsByHash[hash] = { this._setSubscription(hash, {
...subscription, ...subscription,
state: 'pending', state: 'pending',
}; });
await this._updateSubscriptions(); await this._updateSubscriptions();
} }
})(); })();
@ -5251,10 +5358,14 @@ export class Connection {
serverSubscriptionId, serverSubscriptionId,
); );
} else { } else {
this._subscriptionsByHash[hash] = { this._setSubscription(hash, {
...subscription, ...subscription,
state: 'unsubscribing', state: 'unsubscribing',
}; });
this._setSubscription(hash, {
...subscription,
state: 'unsubscribing',
});
try { try {
await this._rpcWebSocket.call(unsubscribeMethod, [ await this._rpcWebSocket.call(unsubscribeMethod, [
serverSubscriptionId, serverSubscriptionId,
@ -5267,18 +5378,18 @@ export class Connection {
return; return;
} }
// TODO: Maybe add an 'errored' state or a retry limit? // TODO: Maybe add an 'errored' state or a retry limit?
this._subscriptionsByHash[hash] = { this._setSubscription(hash, {
...subscription, ...subscription,
state: 'subscribed', state: 'subscribed',
}; });
await this._updateSubscriptions(); await this._updateSubscriptions();
return; return;
} }
} }
this._subscriptionsByHash[hash] = { this._setSubscription(hash, {
...subscription, ...subscription,
state: 'unsubscribed', state: 'unsubscribed',
}; });
await this._updateSubscriptions(); await this._updateSubscriptions();
})(); })();
} }
@ -5381,12 +5492,14 @@ export class Connection {
} else { } else {
existingSubscription.callbacks.add(subscriptionConfig.callback); existingSubscription.callbacks.add(subscriptionConfig.callback);
} }
this._subscriptionHashByClientSubscriptionId[clientSubscriptionId] = hash;
this._subscriptionDisposeFunctionsByClientSubscriptionId[ this._subscriptionDisposeFunctionsByClientSubscriptionId[
clientSubscriptionId clientSubscriptionId
] = async () => { ] = async () => {
delete this._subscriptionDisposeFunctionsByClientSubscriptionId[ delete this._subscriptionDisposeFunctionsByClientSubscriptionId[
clientSubscriptionId clientSubscriptionId
]; ];
delete this._subscriptionHashByClientSubscriptionId[clientSubscriptionId];
const subscription = this._subscriptionsByHash[hash]; const subscription = this._subscriptionsByHash[hash];
assert( assert(
subscription !== undefined, subscription !== undefined,

View File

@ -3,7 +3,8 @@ import {Buffer} from 'buffer';
import * as splToken from '@solana/spl-token'; import * as splToken from '@solana/spl-token';
import {expect, use} from 'chai'; import {expect, use} from 'chai';
import chaiAsPromised from 'chai-as-promised'; import chaiAsPromised from 'chai-as-promised';
import {useFakeTimers, SinonFakeTimers} from 'sinon'; import {mock, useFakeTimers, SinonFakeTimers} from 'sinon';
import sinonChai from 'sinon-chai';
import { import {
Authorized, Authorized,
@ -67,6 +68,7 @@ import {MessageV0} from '../src/message/v0';
import {encodeData} from '../src/instruction'; import {encodeData} from '../src/instruction';
use(chaiAsPromised); use(chaiAsPromised);
use(sinonChai);
const verifySignatureStatus = ( const verifySignatureStatus = (
status: SignatureStatus | null, status: SignatureStatus | null,
@ -1133,6 +1135,85 @@ describe('Connection', function () {
value: {err: null}, value: {err: null},
}); });
}); });
it('confirm transaction - does not check the signature status before the signature subscription comes alive', async () => {
const mockSignature =
'w2Zeq8YkpyB463DttvfzARD7k9ZxGEwbsEw4boEK7jDp3pfoxZbTdLFSsEPhzXhpCcjGi2kHtHFobgX49MMhbWt';
await mockRpcMessage({
method: 'signatureSubscribe',
params: [mockSignature, {commitment: 'finalized'}],
result: {err: null},
subscriptionEstablishmentPromise: new Promise(() => {}), // Never resolve.
});
const getSignatureStatusesExpectation = mock(connection)
.expects('getSignatureStatuses')
.never();
connection.confirmTransaction(mockSignature);
getSignatureStatusesExpectation.verify();
});
it('confirm transaction - checks the signature status once the signature subscription comes alive', async () => {
const mockSignature =
'w2Zeq8YkpyB463DttvfzARD7k9ZxGEwbsEw4boEK7jDp3pfoxZbTdLFSsEPhzXhpCcjGi2kHtHFobgX49MMhbWt';
await mockRpcMessage({
method: 'signatureSubscribe',
params: [mockSignature, {commitment: 'finalized'}],
result: {err: null},
});
const getSignatureStatusesExpectation = mock(connection)
.expects('getSignatureStatuses')
.once();
const confirmationPromise =
connection.confirmTransaction(mockSignature);
clock.runAllAsync();
await expect(confirmationPromise).to.eventually.deep.equal({
context: {slot: 11},
value: {err: null},
});
getSignatureStatusesExpectation.verify();
});
// FIXME: This test does not work.
// it('confirm transaction - confirms transaction when signature status check yields confirmation before signature subscription does', async () => {
// const mockSignature =
// 'w2Zeq8YkpyB463DttvfzARD7k9ZxGEwbsEw4boEK7jDp3pfoxZbTdLFSsEPhzXhpCcjGi2kHtHFobgX49MMhbWt';
// // Keep the subscription from ever returning data.
// await mockRpcMessage({
// method: 'signatureSubscribe',
// params: [mockSignature, {commitment: 'finalized'}],
// result: new Promise(() => {}), // Never resolve.
// });
// clock.runAllAsync();
// const confirmationPromise =
// connection.confirmTransaction(mockSignature);
// clock.runAllAsync();
// // Return a signature status through the RPC API.
// await mockRpcResponse({
// method: 'getSignatureStatuses',
// params: [[mockSignature]],
// value: [
// {
// slot: 0,
// confirmations: 11,
// status: {Ok: null},
// err: null,
// },
// ],
// });
// clock.runAllAsync();
// await expect(confirmationPromise).to.eventually.deep.equal({
// context: {slot: 11},
// value: {err: null},
// });
// });
}); });
} }

View File

@ -7,6 +7,7 @@ import {Connection} from '../../src';
type RpcRequest = { type RpcRequest = {
method: string; method: string;
params?: Array<any>; params?: Array<any>;
subscriptionEstablishmentPromise?: Promise<void>;
}; };
type RpcResponse = { type RpcResponse = {
@ -24,13 +25,15 @@ export const mockRpcMessage = ({
method, method,
params, params,
result, result,
subscriptionEstablishmentPromise,
}: { }: {
method: string; method: string;
params: Array<any>; params: Array<any>;
result: any | Promise<any>; result: any | Promise<any>;
subscriptionEstablishmentPromise?: Promise<void>;
}) => { }) => {
mockRpcSocket.push([ mockRpcSocket.push([
{method, params}, {method, params, subscriptionEstablishmentPromise},
{ {
context: {slot: 11}, context: {slot: 11},
value: result, value: result,
@ -109,6 +112,10 @@ class MockClient {
expect(params).to.eql(mockRequest.params); expect(params).to.eql(mockRequest.params);
} }
if (mockRequest.subscriptionEstablishmentPromise) {
await mockRequest.subscriptionEstablishmentPromise;
}
let id = ++this.subscriptionCounter; let id = ++this.subscriptionCounter;
const response = { const response = {
subscription: id, subscription: id,