feat: slot change callback (provides real-time fork information)

This commit is contained in:
Sunny Gleason 2019-11-25 11:04:35 -05:00 committed by Michael Vines
parent ddce1d3c9f
commit 286891fa51
3 changed files with 166 additions and 1 deletions

View File

@ -96,10 +96,17 @@ declare module '@solana/web3.js' {
commission: number,
};
declare export type SlotInfo = {
parent: 'number',
slot: 'number',
root: 'number',
};
declare type AccountChangeCallback = (accountInfo: AccountInfo) => void;
declare type ProgramAccountChangeCallback = (
keyedAccountInfo: KeyedAccountInfo,
) => void;
declare type SlotChangeCallback = (slotInfo: SlotInfo) => void;
declare export type SignatureSuccess = {|
Ok: null,
@ -191,6 +198,7 @@ declare module '@solana/web3.js' {
programId: PublicKey,
callback: ProgramAccountChangeCallback,
): number;
onSlotChange(callback: SlotChangeCallback): number;
removeProgramAccountChangeListener(id: number): Promise<void>;
validatorExit(): Promise<boolean>;
}

View File

@ -318,6 +318,23 @@ const ProgramAccountNotificationResult = struct({
result: ProgramAccountInfoResult,
});
/**
* @private
*/
const SlotInfo = struct({
parent: 'number',
slot: 'number',
root: 'number',
});
/***
* Expected JSON RPC response for the "slotNotification" message
*/
const SlotNotificationResult = struct({
subscription: 'number',
result: SlotInfo,
});
/**
* Expected JSON RPC response for the "getProgramAccounts" message
*/
@ -569,6 +586,19 @@ type ProgramAccountSubscriptionInfo = {
subscriptionId: null | number, // null when there's no current server subscription id
};
/**
* @private
*/
type SlotSubscriptionInfo = {
callback: SlotChangeCallback,
subscriptionId: null | number, // null when there's no current server subscription id
};
/**
* Callback function for slot change notifications
*/
export type SlotChangeCallback = (slotInfo: SlotInfo) => void;
/**
* Signature status: Success
*
@ -618,6 +648,10 @@ export class Connection {
[number]: ProgramAccountSubscriptionInfo,
} = {};
_programAccountChangeSubscriptionCounter: number = 0;
_slotSubscriptions: {
[number]: SlotSubscriptionInfo,
} = {};
_slotSubscriptionCounter: number = 0;
/**
* Establish a JSON RPC connection
@ -657,6 +691,10 @@ export class Connection {
'programNotification',
this._wsOnProgramAccountNotification.bind(this),
);
this._rpcWebSocket.on(
'slotNotification',
this._wsOnSlotNotification.bind(this),
);
}
/**
@ -1256,7 +1294,12 @@ export class Connection {
const programKeys = Object.keys(
this._programAccountChangeSubscriptions,
).map(Number);
if (accountKeys.length === 0 && programKeys.length === 0) {
const slotKeys = Object.keys(this._slotSubscriptions).map(Number);
if (
accountKeys.length === 0 &&
programKeys.length === 0 &&
slotKeys.length === 0
) {
this._rpcWebSocket.close();
return;
}
@ -1268,6 +1311,9 @@ export class Connection {
for (let id of programKeys) {
this._programAccountChangeSubscriptions[id].subscriptionId = null;
}
for (let id of slotKeys) {
this._slotSubscriptions[id].subscriptionId = null;
}
this._rpcWebSocket.connect();
return;
}
@ -1307,6 +1353,18 @@ export class Connection {
}
}
}
for (let id of slotKeys) {
const {subscriptionId} = this._slotSubscriptions[id];
if (subscriptionId === null) {
try {
this._slotSubscriptions[
id
].subscriptionId = await this._rpcWebSocket.call('slotSubscribe', []);
} catch (err) {
console.log(`slotSubscribe error: ${err.message}`);
}
}
}
}
/**
@ -1455,6 +1513,69 @@ export class Connection {
}
}
/**
* @private
*/
_wsOnSlotNotification(notification: Object) {
const res = SlotNotificationResult(notification);
if (res.error) {
throw new Error(res.error.message);
}
assert(typeof res.result !== 'undefined');
const {parent, slot, root} = res.result;
const keys = Object.keys(this._slotSubscriptions).map(Number);
for (let id of keys) {
const sub = this._slotSubscriptions[id];
if (sub.subscriptionId === res.subscription) {
sub.callback({
parent,
slot,
root,
});
return true;
}
}
}
/**
* Register a callback to be invoked upon slot changes
*
* @param callback Function to invoke whenever the slot changes
* @return subscription id
*/
onSlotChange(callback: SlotChangeCallback): number {
const id = ++this._slotSubscriptionCounter;
this._slotSubscriptions[id] = {
callback,
subscriptionId: id,
};
this._updateSubscriptions();
return id;
}
/**
* Deregister a slot notification callback
*
* @param id subscription id to deregister
*/
async removeSlotChangeListener(id: number): Promise<void> {
if (this._slotSubscriptions[id]) {
const {subscriptionId} = this._slotSubscriptions[id];
delete this._slotSubscriptions[id];
if (subscriptionId !== null) {
try {
await this._rpcWebSocket.call('slotUnsubscribe', [subscriptionId]);
} catch (err) {
console.log('slotUnsubscribe error:', err.message);
}
}
this._updateSubscriptions();
} else {
throw new Error(`Unknown slot change id: ${id}`);
}
}
_argsWithCommitment(args: Array<any>, override: ?Commitment): Array<any> {
const commitment = override || this._commitment;
if (commitment) {

View File

@ -931,3 +931,39 @@ test('program account change notification', async () => {
await connection.removeProgramAccountChangeListener(subscriptionId);
});
test('slot notification', async () => {
if (mockRpcEnabled) {
console.log('non-live test skipped');
return;
}
const connection = new Connection(url, 'recent');
// let notified = false;
const subscriptionId = connection.onSlotChange(slotInfo => {
expect(slotInfo.parent).toBeDefined();
expect(slotInfo.slot).toBeDefined();
expect(slotInfo.root).toBeDefined();
expect(slotInfo.slot).toBeGreaterThan(slotInfo.parent);
expect(slotInfo.slot).toBeGreaterThanOrEqual(slotInfo.root);
// notified = true;
});
//
// FIXME: enable this check when slotNotification is live
//
// // Wait for mockCallback to receive a call
// let i = 0;
// while (!notified) {
// //for (;;) {
// if (++i === 30) {
// throw new Error('Slot change notification not observed');
// }
// // Sleep for a 1/4 of a slot, notifications only occur after a block is
// // processed
// await sleep((250 * DEFAULT_TICKS_PER_SLOT) / NUM_TICKS_PER_SECOND);
// }
await connection.removeSlotChangeListener(subscriptionId);
});