diff --git a/web3.js/module.flow.js b/web3.js/module.flow.js index e4b35b702..eda817170 100644 --- a/web3.js/module.flow.js +++ b/web3.js/module.flow.js @@ -76,6 +76,11 @@ declare module '@solana/web3.js' { callback: AccountChangeCallback, ): number; removeAccountChangeListener(id: number): Promise; + onProgramAccountChange( + programId: PublicKey, + callback: ProgramAccountChangeCallback, + ): number; + removeProgramAccountChangeListener(id: number): Promise; } // === src/system-program.js === diff --git a/web3.js/src/connection.js b/web3.js/src/connection.js index aa7b6c273..74a4de3de 100644 --- a/web3.js/src/connection.js +++ b/web3.js/src/connection.js @@ -102,6 +102,19 @@ const AccountNotificationResult = struct({ result: AccountInfoResult, }); +/** + * @private + */ +const ProgramAccountInfoResult = struct(['string', AccountInfoResult]); + +/*** + * Expected JSON RPC response for the "programNotification" message + */ +const ProgramAccountNotificationResult = struct({ + subscription: 'number', + result: ProgramAccountInfoResult, +}); + /** * Expected JSON RPC response for the "confirmTransaction" message */ @@ -156,6 +169,18 @@ type AccountInfo = { userdata: Buffer, }; +/** + * Account information identified by pubkey + * + * @typedef {Object} KeyedAccountInfo + * @property {PublicKey} accountId + * @property {AccountInfo} accountInfo + */ +type KeyedAccountInfo = { + accountId: PublicKey, + accountInfo: AccountInfo, +}; + /** * Callback function for account change notifications */ @@ -170,6 +195,22 @@ type AccountSubscriptionInfo = { subscriptionId: null | number, // null when there's no current server subscription id }; +/** + * Callback function for program account change notifications + */ +export type ProgramAccountChangeCallback = ( + keyedAccountInfo: KeyedAccountInfo, +) => void; + +/** + * @private + */ +type ProgramAccountSubscriptionInfo = { + programId: string, // PublicKey of the program as a base 58 string + callback: ProgramAccountChangeCallback, + subscriptionId: null | number, // null when there's no current server subscription id +}; + /** * Possible signature status values * @@ -198,6 +239,10 @@ export class Connection { _disableBlockhashCaching: boolean = false; _accountChangeSubscriptions: {[number]: AccountSubscriptionInfo} = {}; _accountChangeSubscriptionCounter: number = 0; + _programAccountChangeSubscriptions: { + [number]: ProgramAccountSubscriptionInfo, + } = {}; + _programAccountChangeSubscriptionCounter: number = 0; /** * Establish a JSON RPC connection @@ -231,6 +276,10 @@ export class Connection { 'accountNotification', this._wsOnAccountNotification.bind(this), ); + this._rpcWebSocket.on( + 'programNotification', + this._wsOnProgramAccountNotification.bind(this), + ); } /** @@ -459,6 +508,71 @@ export class Connection { this._rpcWebSocketConnected = false; } + /** + * @private + */ + async _updateSubscriptions() { + const accountKeys = Object.keys(this._accountChangeSubscriptions).map( + Number, + ); + const programKeys = Object.keys( + this._programAccountChangeSubscriptions, + ).map(Number); + if (accountKeys.length === 0 && programKeys.length === 0) { + this._rpcWebSocket.close(); + return; + } + + if (!this._rpcWebSocketConnected) { + for (let id of accountKeys) { + this._accountChangeSubscriptions[id].subscriptionId = null; + } + for (let id of programKeys) { + this._programAccountChangeSubscriptions[id].subscriptionId = null; + } + this._rpcWebSocket.connect(); + return; + } + + for (let id of accountKeys) { + const {subscriptionId, publicKey} = this._accountChangeSubscriptions[id]; + console.log('pubkey: ' + publicKey); + if (subscriptionId === null) { + try { + this._accountChangeSubscriptions[ + id + ].subscriptionId = await this._rpcWebSocket.call('accountSubscribe', [ + publicKey, + ]); + } catch (err) { + console.log( + `accountSubscribe error for ${publicKey}: ${err.message}`, + ); + } + } + } + for (let id of programKeys) { + const { + subscriptionId, + programId, + } = this._programAccountChangeSubscriptions[id]; + console.log('program-id: ' + programId); + if (subscriptionId === null) { + try { + this._programAccountChangeSubscriptions[ + id + ].subscriptionId = await this._rpcWebSocket.call('programSubscribe', [ + programId, + ]); + } catch (err) { + console.log( + `programSubscribe error for ${programId}: ${err.message}`, + ); + } + } + } + } + /** * @private */ @@ -486,42 +600,6 @@ export class Connection { } } - /** - * @private - */ - async _updateSubscriptions() { - const keys = Object.keys(this._accountChangeSubscriptions).map(Number); - if (keys.length === 0) { - this._rpcWebSocket.close(); - return; - } - - if (!this._rpcWebSocketConnected) { - for (let id of keys) { - this._accountChangeSubscriptions[id].subscriptionId = null; - } - this._rpcWebSocket.connect(); - return; - } - - for (let id of keys) { - const {subscriptionId, publicKey} = this._accountChangeSubscriptions[id]; - if (subscriptionId === null) { - try { - this._accountChangeSubscriptions[ - id - ].subscriptionId = await this._rpcWebSocket.call('accountSubscribe', [ - publicKey, - ]); - } catch (err) { - console.log( - `accountSubscribe error for ${publicKey}: ${err.message}`, - ); - } - } - } - } - /** * Register a callback to be invoked whenever the specified account changes * @@ -564,4 +642,80 @@ export class Connection { throw new Error(`Unknown account change id: ${id}`); } } + + /** + * @private + */ + _wsOnProgramAccountNotification(notification: Object) { + const res = ProgramAccountNotificationResult(notification); + if (res.error) { + throw new Error(res.error.message); + } + + const keys = Object.keys(this._programAccountChangeSubscriptions).map( + Number, + ); + for (let id of keys) { + const sub = this._programAccountChangeSubscriptions[id]; + if (sub.subscriptionId === res.subscription) { + const {result} = res; + assert(typeof result !== 'undefined'); + + sub.callback({ + accountId: result[0], + accountInfo: { + executable: result[1].executable, + owner: new PublicKey(result[1].owner), + lamports: result[1].lamports, + userdata: Buffer.from(result[1].userdata), + }, + }); + return true; + } + } + } + + /** + * Register a callback to be invoked whenever accounts owned by the + * specified program change + * + * @param programId Public key of the program to monitor + * @param callback Function to invoke whenever the account is changed + * @return subscription id + */ + onProgramAccountChange( + programId: PublicKey, + callback: ProgramAccountChangeCallback, + ): number { + const id = ++this._programAccountChangeSubscriptionCounter; + this._programAccountChangeSubscriptions[id] = { + programId: programId.toBase58(), + callback, + subscriptionId: null, + }; + this._updateSubscriptions(); + return id; + } + + /** + * Deregister an account notification callback + * + * @param id subscription id to deregister + */ + async removeProgramAccountChangeListener(id: number): Promise { + if (this._programAccountChangeSubscriptions[id]) { + const {subscriptionId} = this._programAccountChangeSubscriptions[id]; + delete this._programAccountChangeSubscriptions[id]; + if (subscriptionId !== null) { + try { + await this._rpcWebSocket.call('programUnsubscribe', [subscriptionId]); + } catch (err) { + console.log('programUnsubscribe error:', err.message); + } + } + this._updateSubscriptions(); + } else { + throw new Error(`Unknown account change id: ${id}`); + } + } } diff --git a/web3.js/test/connection.test.js b/web3.js/test/connection.test.js index 9713be28c..77073c031 100644 --- a/web3.js/test/connection.test.js +++ b/web3.js/test/connection.test.js @@ -476,3 +476,65 @@ test('account change notification', async () => { Buffer.from([1, 2, 3]), ); }); + +test('program account change notification', async () => { + if (mockRpcEnabled) { + console.log('non-live test skipped'); + return; + } + + const connection = new Connection(url); + const owner = new Account(); + const programAccount = new Account(); + + const mockCallback = jest.fn(); + + const subscriptionId = connection.onProgramAccountChange( + BpfLoader.programId, + mockCallback, + ); + + await connection.requestAirdrop(owner.publicKey, 42); + const transaction = SystemProgram.createAccount( + owner.publicKey, + programAccount.publicKey, + 42, + 3, + BpfLoader.programId, + ); + transaction.fee = 0; + await sendAndConfirmTransaction(connection, transaction, owner); + + const loader = new Loader(connection, BpfLoader.programId); + await loader.load(programAccount, [1, 2, 3]); + + // Wait for mockCallback to receive a call + let i = 0; + for (;;) { + if (mockCallback.mock.calls.length === 1) { + break; + } + + if (++i === 5) { + console.log(JSON.stringify(mockCallback.mock.calls)); + throw new Error('mockCallback should be called twice'); + } + // Sleep for a 1/4 of a slot, notifications only occur after a block is + // processed + await sleep((250 * DEFAULT_TICKS_PER_SLOT) / NUM_TICKS_PER_SECOND); + } + + await connection.removeProgramAccountChangeListener(subscriptionId); + + expect(mockCallback.mock.calls[0][0].accountId).toEqual( + programAccount.publicKey.toString(), + ); + expect(mockCallback.mock.calls[0][0].accountInfo.lamports).toBe(41); + expect(mockCallback.mock.calls[0][0].accountInfo.owner).toEqual( + BpfLoader.programId, + ); + expect(mockCallback.mock.calls[0][0].accountInfo.executable).toBe(false); + expect(mockCallback.mock.calls[0][0].accountInfo.userdata).toEqual( + Buffer.from([1, 2, 3]), + ); +});