feat: add onSignature pub sub api

This commit is contained in:
Justin Starry 2020-02-03 23:22:11 +08:00 committed by Michael Vines
parent 7ebf82b19d
commit 740b7a3b23
3 changed files with 159 additions and 17 deletions

11
web3.js/module.d.ts vendored
View File

@ -114,6 +114,9 @@ declare module '@solana/web3.js' {
keyedAccountInfo: KeyedAccountInfo,
) => void;
export type SlotChangeCallback = (slotInfo: SlotInfo) => void;
export type SignatureResultCallback = (
signatureResult: SignatureStatusResult,
) => void;
export type SignatureSuccess = {
Ok: null;
@ -213,8 +216,14 @@ declare module '@solana/web3.js' {
programId: PublicKey,
callback: ProgramAccountChangeCallback,
): number;
onSlotChange(callback: SlotChangeCallback): number;
removeProgramAccountChangeListener(id: number): Promise<void>;
onSlotChange(callback: SlotChangeCallback): number;
removeSlotChangeListener(id: number): Promise<void>;
onSignature(
signature: TransactionSignature,
callback: SignatureResultCallback,
): number;
removeSignatureListener(id: number): Promise<void>;
validatorExit(): Promise<boolean>;
getMinimumBalanceForRentExemption(
dataLength: number,

View File

@ -58,8 +58,7 @@ declare module '@solana/web3.js' {
declare export type SignatureStatusResult =
| SignatureSuccess
| TransactionError
| null;
| TransactionError;
declare export type BlockhashAndFeeCalculator = {
blockhash: Blockhash,
@ -96,7 +95,7 @@ declare module '@solana/web3.js' {
fee: number,
preBalances: Array<number>,
postBalances: Array<number>,
status: SignatureStatusResult,
status: ?SignatureStatusResult,
},
}>,
};
@ -128,6 +127,9 @@ declare module '@solana/web3.js' {
keyedAccountInfo: KeyedAccountInfo,
) => void;
declare type SlotChangeCallback = (slotInfo: SlotInfo) => void;
declare type SignatureResultCallback = (
signatureResult: SignatureStatusResult,
) => void;
declare export type SignatureSuccess = {|
Ok: null,
@ -227,8 +229,14 @@ declare module '@solana/web3.js' {
programId: PublicKey,
callback: ProgramAccountChangeCallback,
): number;
onSlotChange(callback: SlotChangeCallback): number;
removeProgramAccountChangeListener(id: number): Promise<void>;
onSlotChange(callback: SlotChangeCallback): number;
removeSlotChangeListener(id: number): Promise<void>;
onSignature(
signature: TransactionSignature,
callback: SignatureResultCallback,
): number;
removeSignatureListener(id: number): Promise<void>;
validatorExit(): Promise<boolean>;
getMinimumBalanceForRentExemption(
dataLength: number,

View File

@ -175,6 +175,14 @@ const GetEpochScheduleResult = struct({
firstNormalSlot: 'number',
});
/**
* Signature status for a transaction
*/
const SignatureStatusResult = struct.union([
struct({Ok: 'null'}),
struct({Err: 'object'}),
]);
/**
* Version info for a node
*
@ -204,7 +212,7 @@ type ConfirmedBlock = {
fee: number,
preBalances: Array<number>,
postBalances: Array<number>,
status: SignatureStatusResult,
status?: SignatureStatusResult,
},
}>,
};
@ -337,7 +345,7 @@ const SlotInfo = struct({
root: 'number',
});
/***
/**
* Expected JSON RPC response for the "slotNotification" message
*/
const SlotNotificationResult = struct({
@ -345,6 +353,14 @@ const SlotNotificationResult = struct({
result: SlotInfo,
});
/**
* Expected JSON RPC response for the "signatureNotification" message
*/
const SignatureNotificationResult = struct({
subscription: 'number',
result: SignatureStatusResult,
});
/**
* Expected JSON RPC response for the "getProgramAccounts" message
*/
@ -419,15 +435,12 @@ const GetVoteAccounts = jsonRpcResult(
}),
);
const SignatureStatusResult = struct.union([
'null',
struct.union([struct({Ok: 'null'}), struct({Err: 'object'})]),
]);
/**
* Expected JSON RPC response for the "getSignatureStatus" message
*/
const GetSignatureStatusRpcResult = jsonRpcResult(SignatureStatusResult);
const GetSignatureStatusRpcResult = jsonRpcResult(
struct.union(['null', SignatureStatusResult]),
);
/**
* Expected JSON RPC response for the "getTransactionCount" message
@ -481,7 +494,7 @@ export const GetConfirmedBlockRpcResult = jsonRpcResult(
meta: struct.union([
'null',
struct({
status: SignatureStatusResult,
status: struct.union(['null', SignatureStatusResult]),
fee: 'number',
preBalances: struct.array(['number']),
postBalances: struct.array(['number']),
@ -578,6 +591,11 @@ type ProgramAccountSubscriptionInfo = {
subscriptionId: null | number, // null when there's no current server subscription id
};
/**
* Callback function for slot change notifications
*/
export type SlotChangeCallback = (slotInfo: SlotInfo) => void;
/**
* @private
*/
@ -587,9 +605,20 @@ type SlotSubscriptionInfo = {
};
/**
* Callback function for slot change notifications
* Callback function for signature notifications
*/
export type SlotChangeCallback = (slotInfo: SlotInfo) => void;
export type SignatureResultCallback = (
signatureResult: SignatureStatusResult,
) => void;
/**
* @private
*/
type SignatureSubscriptionInfo = {
signature: TransactionSignature, // TransactionSignature as a base 58 string
callback: SignatureResultCallback,
subscriptionId: null | number, // null when there's no current server subscription id
};
/**
* Signature status: Success
@ -650,6 +679,10 @@ export class Connection {
[number]: SlotSubscriptionInfo,
} = {};
_slotSubscriptionCounter: number = 0;
_signatureSubscriptions: {
[number]: SignatureSubscriptionInfo,
} = {};
_signatureSubscriptionCounter: number = 0;
/**
* Establish a JSON RPC connection
@ -693,6 +726,10 @@ export class Connection {
'slotNotification',
this._wsOnSlotNotification.bind(this),
);
this._rpcWebSocket.on(
'signatureNotification',
this._wsOnSignatureNotification.bind(this),
);
}
/**
@ -1289,10 +1326,12 @@ export class Connection {
this._programAccountChangeSubscriptions,
).map(Number);
const slotKeys = Object.keys(this._slotSubscriptions).map(Number);
const signatureKeys = Object.keys(this._signatureSubscriptions).map(Number);
if (
accountKeys.length === 0 &&
programKeys.length === 0 &&
slotKeys.length === 0
slotKeys.length === 0 &&
signatureKeys.length === 0
) {
this._rpcWebSocket.close();
return;
@ -1308,6 +1347,9 @@ export class Connection {
for (let id of slotKeys) {
this._slotSubscriptions[id].subscriptionId = null;
}
for (let id of signatureKeys) {
this._signatureSubscriptions[id].subscriptionId = null;
}
this._rpcWebSocket.connect();
return;
}
@ -1359,6 +1401,21 @@ export class Connection {
}
}
}
for (let id of signatureKeys) {
const subscription = this._signatureSubscriptions[id];
if (subscription.subscriptionId === null) {
subscription.subscriptionId = -1; // indicates subscribing
try {
const id = await this._rpcWebSocket.call('signatureSubscribe', [
subscription.signature,
]);
subscription.subscriptionId = id;
} catch (err) {
subscription.subscriptionId = null;
console.log(`signatureSubscribe error: ${err.message}`);
}
}
}
}
/**
@ -1577,4 +1634,72 @@ export class Connection {
}
return args;
}
/**
* @private
*/
_wsOnSignatureNotification(notification: Object) {
const res = SignatureNotificationResult(notification);
if (res.error) {
throw new Error(res.error.message);
}
assert(typeof res.result !== 'undefined');
const keys = Object.keys(this._signatureSubscriptions).map(Number);
for (let id of keys) {
const sub = this._signatureSubscriptions[id];
if (sub.subscriptionId === res.subscription) {
// Signatures subscriptions are auto-removed by the RPC service so
// no need to explicitly send an unsubscribe message
delete this._signatureSubscriptions[id];
this._updateSubscriptions();
sub.callback(res.result);
return;
}
}
}
/**
* Register a callback to be invoked upon signature updates
*
* @param callback Function to invoke on signature notifications
* @return subscription id
*/
onSignature(
signature: TransactionSignature,
callback: SignatureResultCallback,
): number {
const id = ++this._signatureSubscriptionCounter;
this._signatureSubscriptions[id] = {
signature,
callback,
subscriptionId: null,
};
this._updateSubscriptions();
return id;
}
/**
* Deregister a signature notification callback
*
* @param id subscription id to deregister
*/
async removeSignatureListener(id: number): Promise<void> {
if (this._signatureSubscriptions[id]) {
const {subscriptionId} = this._signatureSubscriptions[id];
delete this._signatureSubscriptions[id];
if (subscriptionId !== null) {
try {
await this._rpcWebSocket.call('signatureUnsubscribe', [
subscriptionId,
]);
} catch (err) {
console.log('signatureUnsubscribe error:', err.message);
}
}
this._updateSubscriptions();
} else {
throw new Error(`Unknown signature change id: ${id}`);
}
}
}