feat: add support for slot update pubsub subscriptions (#16990)

This commit is contained in:
Justin Starry 2021-05-02 20:14:30 +08:00 committed by GitHub
parent 8e561354d5
commit da81ad0c41
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 184 additions and 1 deletions

View File

@ -1056,6 +1056,112 @@ const SlotNotificationResult = pick({
result: SlotInfoResult,
});
/**
* Slot updates which can be used for tracking the live progress of a cluster.
* - `"firstShredReceived"`: connected node received the first shred of a block.
* Indicates that a new block that is being produced.
* - `"completed"`: connected node has received all shreds of a block. Indicates
* a block was recently produced.
* - `"optimisticConfirmation"`: block was optimistically confirmed by the
* cluster. It is not guaranteed that an optimistic confirmation notification
* will be sent for every finalized blocks.
* - `"root"`: the connected node rooted this block.
* - `"createdBank"`: the connected node has started validating this block.
* - `"frozen"`: the connected node has validated this block.
* - `"dead"`: the connected node failed to validate this block.
*/
export type SlotUpdate =
| {
type: 'firstShredReceived';
slot: number;
timestamp: number;
}
| {
type: 'completed';
slot: number;
timestamp: number;
}
| {
type: 'createdBank';
slot: number;
timestamp: number;
parent: number;
}
| {
type: 'frozen';
slot: number;
timestamp: number;
stats: {
numTransactionEntries: number;
numSuccessfulTransactions: number;
numFailedTransactions: number;
maxTransactionsPerEntry: number;
};
}
| {
type: 'dead';
slot: number;
timestamp: number;
err: string;
}
| {
type: 'optimisticConfirmation';
slot: number;
timestamp: number;
}
| {
type: 'root';
slot: number;
timestamp: number;
};
/**
* @internal
*/
const SlotUpdateResult = union([
pick({
type: union([
literal('firstShredReceived'),
literal('completed'),
literal('optimisticConfirmation'),
literal('root'),
]),
slot: number(),
timestamp: number(),
}),
pick({
type: literal('createdBank'),
parent: number(),
slot: number(),
timestamp: number(),
}),
pick({
type: literal('frozen'),
slot: number(),
timestamp: number(),
stats: pick({
numTransactionEntries: number(),
numSuccessfulTransactions: number(),
numFailedTransactions: number(),
maxTransactionsPerEntry: number(),
}),
}),
pick({
type: literal('dead'),
slot: number(),
timestamp: number(),
err: string(),
}),
]);
/**
* Expected JSON RPC response for the "slotsUpdatesNotification" message
*/
const SlotUpdateNotificationResult = pick({
subscription: number(),
result: SlotUpdateResult,
});
/**
* Expected JSON RPC response for the "signatureNotification" message
*/
@ -1569,6 +1675,19 @@ type SlotSubscriptionInfo = {
subscriptionId: SubscriptionId | null; // null when there's no current server subscription id
};
/**
* Callback function for slot update notifications
*/
export type SlotUpdateCallback = (slotUpdate: SlotUpdate) => void;
/**
* @private
*/
type SlotUpdateSubscriptionInfo = {
callback: SlotUpdateCallback;
subscriptionId: SubscriptionId | null; // null when there's no current server subscription id
};
/**
* Callback function for signature status notifications
*/
@ -1815,6 +1934,11 @@ export class Connection {
[id: number]: LogsSubscriptionInfo;
} = {};
/** @internal */ _slotUpdateSubscriptionCounter: number = 0;
/** @internal */ _slotUpdateSubscriptions: {
[id: number]: SlotUpdateSubscriptionInfo;
} = {};
/**
* Establish a JSON RPC connection
*
@ -1887,6 +2011,10 @@ export class Connection {
'slotNotification',
this._wsOnSlotNotification.bind(this),
);
this._rpcWebSocket.on(
'slotsUpdatesNotification',
this._wsOnSlotUpdatesNotification.bind(this),
);
this._rpcWebSocket.on(
'signatureNotification',
this._wsOnSignatureNotification.bind(this),
@ -3323,13 +3451,16 @@ export class Connection {
Object.values(this._programAccountChangeSubscriptions).forEach(
s => (s.subscriptionId = null),
);
Object.values(this._rootSubscriptions).forEach(
s => (s.subscriptionId = null),
);
Object.values(this._signatureSubscriptions).forEach(
s => (s.subscriptionId = null),
);
Object.values(this._slotSubscriptions).forEach(
s => (s.subscriptionId = null),
);
Object.values(this._rootSubscriptions).forEach(
Object.values(this._slotUpdateSubscriptions).forEach(
s => (s.subscriptionId = null),
);
}
@ -3345,6 +3476,9 @@ export class Connection {
this._programAccountChangeSubscriptions,
).map(Number);
const slotKeys = Object.keys(this._slotSubscriptions).map(Number);
const slotUpdateKeys = Object.keys(this._slotUpdateSubscriptions).map(
Number,
);
const signatureKeys = Object.keys(this._signatureSubscriptions).map(Number);
const rootKeys = Object.keys(this._rootSubscriptions).map(Number);
const logsKeys = Object.keys(this._logsSubscriptions).map(Number);
@ -3352,6 +3486,7 @@ export class Connection {
accountKeys.length === 0 &&
programKeys.length === 0 &&
slotKeys.length === 0 &&
slotUpdateKeys.length === 0 &&
signatureKeys.length === 0 &&
rootKeys.length === 0 &&
logsKeys.length === 0
@ -3400,6 +3535,11 @@ export class Connection {
this._subscribe(sub, 'slotSubscribe', []);
}
for (let id of slotUpdateKeys) {
const sub = this._slotUpdateSubscriptions[id];
this._subscribe(sub, 'slotsUpdatesSubscribe', []);
}
for (let id of signatureKeys) {
const sub = this._signatureSubscriptions[id];
const args: any[] = [sub.signature];
@ -3639,6 +3779,49 @@ export class Connection {
/**
* @internal
*/
_wsOnSlotUpdatesNotification(notification: Object) {
const res = create(notification, SlotUpdateNotificationResult);
for (const sub of Object.values(this._slotUpdateSubscriptions)) {
if (sub.subscriptionId === res.subscription) {
sub.callback(res.result);
return;
}
}
}
/**
* Register a callback to be invoked upon slot updates. {@link SlotUpdate}'s
* may be useful to track live progress of a cluster.
*
* @param callback Function to invoke whenever the slot updates
* @return subscription id
*/
onSlotUpdate(callback: SlotUpdateCallback): number {
const id = ++this._slotUpdateSubscriptionCounter;
this._slotUpdateSubscriptions[id] = {
callback,
subscriptionId: null,
};
this._updateSubscriptions();
return id;
}
/**
* Deregister a slot update notification callback
*
* @param id subscription id to deregister
*/
async removeSlotUpdateListener(id: number): Promise<void> {
if (this._slotUpdateSubscriptions[id]) {
const subInfo = this._slotUpdateSubscriptions[id];
delete this._slotUpdateSubscriptions[id];
await this._unsubscribe(subInfo, 'slotsUpdatesUnsubscribe');
this._updateSubscriptions();
} else {
throw new Error(`Unknown slot update id: ${id}`);
}
}
_buildArgs(
args: Array<any>,
override?: Commitment,