diff --git a/web3.js/src/connection.ts b/web3.js/src/connection.ts index 6036a03e4..f049dc180 100644 --- a/web3.js/src/connection.ts +++ b/web3.js/src/connection.ts @@ -1056,6 +1056,112 @@ const SlotNotificationResult = pick({ result: SlotInfoResult, }); +/** + * Slot updates which can be used for tracking the live progress of a cluster. + * - `"firstShredReceived"`: connected node received the first shred of a block. + * Indicates that a new block that is being produced. + * - `"completed"`: connected node has received all shreds of a block. Indicates + * a block was recently produced. + * - `"optimisticConfirmation"`: block was optimistically confirmed by the + * cluster. It is not guaranteed that an optimistic confirmation notification + * will be sent for every finalized blocks. + * - `"root"`: the connected node rooted this block. + * - `"createdBank"`: the connected node has started validating this block. + * - `"frozen"`: the connected node has validated this block. + * - `"dead"`: the connected node failed to validate this block. + */ +export type SlotUpdate = + | { + type: 'firstShredReceived'; + slot: number; + timestamp: number; + } + | { + type: 'completed'; + slot: number; + timestamp: number; + } + | { + type: 'createdBank'; + slot: number; + timestamp: number; + parent: number; + } + | { + type: 'frozen'; + slot: number; + timestamp: number; + stats: { + numTransactionEntries: number; + numSuccessfulTransactions: number; + numFailedTransactions: number; + maxTransactionsPerEntry: number; + }; + } + | { + type: 'dead'; + slot: number; + timestamp: number; + err: string; + } + | { + type: 'optimisticConfirmation'; + slot: number; + timestamp: number; + } + | { + type: 'root'; + slot: number; + timestamp: number; + }; + +/** + * @internal + */ +const SlotUpdateResult = union([ + pick({ + type: union([ + literal('firstShredReceived'), + literal('completed'), + literal('optimisticConfirmation'), + literal('root'), + ]), + slot: number(), + timestamp: number(), + }), + pick({ + type: literal('createdBank'), + parent: number(), + slot: number(), + timestamp: number(), + }), + pick({ + type: literal('frozen'), + slot: number(), + timestamp: number(), + stats: pick({ + numTransactionEntries: number(), + numSuccessfulTransactions: number(), + numFailedTransactions: number(), + maxTransactionsPerEntry: number(), + }), + }), + pick({ + type: literal('dead'), + slot: number(), + timestamp: number(), + err: string(), + }), +]); + +/** + * Expected JSON RPC response for the "slotsUpdatesNotification" message + */ +const SlotUpdateNotificationResult = pick({ + subscription: number(), + result: SlotUpdateResult, +}); + /** * Expected JSON RPC response for the "signatureNotification" message */ @@ -1569,6 +1675,19 @@ type SlotSubscriptionInfo = { subscriptionId: SubscriptionId | null; // null when there's no current server subscription id }; +/** + * Callback function for slot update notifications + */ +export type SlotUpdateCallback = (slotUpdate: SlotUpdate) => void; + +/** + * @private + */ +type SlotUpdateSubscriptionInfo = { + callback: SlotUpdateCallback; + subscriptionId: SubscriptionId | null; // null when there's no current server subscription id +}; + /** * Callback function for signature status notifications */ @@ -1815,6 +1934,11 @@ export class Connection { [id: number]: LogsSubscriptionInfo; } = {}; + /** @internal */ _slotUpdateSubscriptionCounter: number = 0; + /** @internal */ _slotUpdateSubscriptions: { + [id: number]: SlotUpdateSubscriptionInfo; + } = {}; + /** * Establish a JSON RPC connection * @@ -1887,6 +2011,10 @@ export class Connection { 'slotNotification', this._wsOnSlotNotification.bind(this), ); + this._rpcWebSocket.on( + 'slotsUpdatesNotification', + this._wsOnSlotUpdatesNotification.bind(this), + ); this._rpcWebSocket.on( 'signatureNotification', this._wsOnSignatureNotification.bind(this), @@ -3323,13 +3451,16 @@ export class Connection { Object.values(this._programAccountChangeSubscriptions).forEach( s => (s.subscriptionId = null), ); + Object.values(this._rootSubscriptions).forEach( + s => (s.subscriptionId = null), + ); Object.values(this._signatureSubscriptions).forEach( s => (s.subscriptionId = null), ); Object.values(this._slotSubscriptions).forEach( s => (s.subscriptionId = null), ); - Object.values(this._rootSubscriptions).forEach( + Object.values(this._slotUpdateSubscriptions).forEach( s => (s.subscriptionId = null), ); } @@ -3345,6 +3476,9 @@ export class Connection { this._programAccountChangeSubscriptions, ).map(Number); const slotKeys = Object.keys(this._slotSubscriptions).map(Number); + const slotUpdateKeys = Object.keys(this._slotUpdateSubscriptions).map( + Number, + ); const signatureKeys = Object.keys(this._signatureSubscriptions).map(Number); const rootKeys = Object.keys(this._rootSubscriptions).map(Number); const logsKeys = Object.keys(this._logsSubscriptions).map(Number); @@ -3352,6 +3486,7 @@ export class Connection { accountKeys.length === 0 && programKeys.length === 0 && slotKeys.length === 0 && + slotUpdateKeys.length === 0 && signatureKeys.length === 0 && rootKeys.length === 0 && logsKeys.length === 0 @@ -3400,6 +3535,11 @@ export class Connection { this._subscribe(sub, 'slotSubscribe', []); } + for (let id of slotUpdateKeys) { + const sub = this._slotUpdateSubscriptions[id]; + this._subscribe(sub, 'slotsUpdatesSubscribe', []); + } + for (let id of signatureKeys) { const sub = this._signatureSubscriptions[id]; const args: any[] = [sub.signature]; @@ -3639,6 +3779,49 @@ export class Connection { /** * @internal */ + _wsOnSlotUpdatesNotification(notification: Object) { + const res = create(notification, SlotUpdateNotificationResult); + for (const sub of Object.values(this._slotUpdateSubscriptions)) { + if (sub.subscriptionId === res.subscription) { + sub.callback(res.result); + return; + } + } + } + + /** + * Register a callback to be invoked upon slot updates. {@link SlotUpdate}'s + * may be useful to track live progress of a cluster. + * + * @param callback Function to invoke whenever the slot updates + * @return subscription id + */ + onSlotUpdate(callback: SlotUpdateCallback): number { + const id = ++this._slotUpdateSubscriptionCounter; + this._slotUpdateSubscriptions[id] = { + callback, + subscriptionId: null, + }; + this._updateSubscriptions(); + return id; + } + + /** + * Deregister a slot update notification callback + * + * @param id subscription id to deregister + */ + async removeSlotUpdateListener(id: number): Promise { + if (this._slotUpdateSubscriptions[id]) { + const subInfo = this._slotUpdateSubscriptions[id]; + delete this._slotUpdateSubscriptions[id]; + await this._unsubscribe(subInfo, 'slotsUpdatesUnsubscribe'); + this._updateSubscriptions(); + } else { + throw new Error(`Unknown slot update id: ${id}`); + } + } + _buildArgs( args: Array, override?: Commitment,