feat: add logs subscription (#16045)

* feat: logs subscription

* fix: address review comments

* fix: use processed commitment

* fix: sleep before triggering log transaction
This commit is contained in:
Armani Ferrante 2021-03-23 20:05:17 -07:00 committed by GitHub
parent afbd09062d
commit d6ef694139
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 158 additions and 1 deletions

View File

@ -1556,6 +1556,54 @@ type RootSubscriptionInfo = {
subscriptionId: SubscriptionId | null; // null when there's no current server subscription id
};
/**
* @internal
*/
const LogsResult = pick({
err: TransactionErrorResult,
logs: array(string()),
signature: string(),
});
/**
* Logs result.
*
* @typedef {Object} Logs.
*/
export type Logs = {
err: TransactionError | null;
logs: string[];
signature: string;
};
/**
* Expected JSON RPC response for the "logsNotification" message.
*/
const LogsNotificationResult = pick({
result: notificationResultAndContext(LogsResult),
subscription: number(),
});
/**
* Filter for log subscriptions.
*/
export type LogsFilter = PublicKey | 'all' | 'allWithVotes';
/**
* Callback function for log notifications.
*/
export type LogsCallback = (logs: Logs, ctx: Context) => void;
/**
* @private
*/
type LogsSubscriptionInfo = {
callback: LogsCallback;
filter: LogsFilter;
subscriptionId: SubscriptionId | null; // null when there's no current server subscription id
commitment?: Commitment;
};
/**
* Signature result
*
@ -1669,6 +1717,11 @@ export class Connection {
[id: number]: SlotSubscriptionInfo;
} = {};
/** @internal */ _logsSubscriptionCounter: number = 0;
/** @internal */ _logsSubscriptions: {
[id: number]: LogsSubscriptionInfo;
} = {};
/**
* Establish a JSON RPC connection
*
@ -1730,6 +1783,10 @@ export class Connection {
'rootNotification',
this._wsOnRootNotification.bind(this),
);
this._rpcWebSocket.on(
'logsNotification',
this._wsOnLogsNotification.bind(this),
);
}
/**
@ -2991,12 +3048,14 @@ export class Connection {
const slotKeys = Object.keys(this._slotSubscriptions).map(Number);
const signatureKeys = Object.keys(this._signatureSubscriptions).map(Number);
const rootKeys = Object.keys(this._rootSubscriptions).map(Number);
const logsKeys = Object.keys(this._logsSubscriptions).map(Number);
if (
accountKeys.length === 0 &&
programKeys.length === 0 &&
slotKeys.length === 0 &&
signatureKeys.length === 0 &&
rootKeys.length === 0
rootKeys.length === 0 &&
logsKeys.length === 0
) {
if (this._rpcWebSocketConnected) {
this._rpcWebSocketConnected = false;
@ -3053,6 +3112,21 @@ export class Connection {
const sub = this._rootSubscriptions[id];
this._subscribe(sub, 'rootSubscribe', []);
}
for (let id of logsKeys) {
const sub = this._logsSubscriptions[id];
let filter;
if (typeof sub.filter === 'object') {
filter = {mentions: [sub.filter.toString()]};
} else {
filter = sub.filter;
}
this._subscribe(
sub,
'logsSubscribe',
this._buildArgs([filter], sub.commitment),
);
}
}
/**
@ -3169,6 +3243,55 @@ export class Connection {
}
}
/**
* Registers a callback to be invoked whenever logs are emitted.
*/
onLogs(
filter: LogsFilter,
callback: LogsCallback,
commitment?: Commitment,
): number {
const id = ++this._logsSubscriptionCounter;
this._logsSubscriptions[id] = {
filter,
callback,
commitment,
subscriptionId: null,
};
this._updateSubscriptions();
return id;
}
/**
* Deregister a logs callback.
*
* @param id subscription id to deregister.
*/
async removeOnLogsListener(id: number): Promise<void> {
if (!this._logsSubscriptions[id]) {
throw new Error(`Unknown logs id: ${id}`);
}
const subInfo = this._logsSubscriptions[id];
delete this._logsSubscriptions[id];
await this._unsubscribe(subInfo, 'logsUnsubscribe');
this._updateSubscriptions();
}
/**
* @internal
*/
_wsOnLogsNotification(notification: Object) {
const res = create(notification, LogsNotificationResult);
const keys = Object.keys(this._logsSubscriptions).map(Number);
for (let id of keys) {
const sub = this._logsSubscriptions[id];
if (sub.subscriptionId === res.subscription) {
sub.callback(res.result.value, res.result.context);
return;
}
}
}
/**
* @internal
*/

View File

@ -2268,6 +2268,40 @@ describe('Connection', () => {
await connection.removeRootChangeListener(subscriptionId);
});
it('logs notification', async () => {
let listener: number | undefined;
const owner = new Account();
const [logsRes, ctx] = await new Promise(resolve => {
listener = connection.onLogs(
'all',
(logs, ctx) => {
resolve([logs, ctx]);
},
'processed',
);
// Sleep to allow the subscription time to be setup.
//
// Without this, there's a race condition between setting up the log
// subscription and executing the transaction to trigger the log.
// If the transaction to trigger the log executes before the
// subscription is setup, the log event listener never fires and so the
// promise never resolves.
sleep(1000).then(() => {
// Execute a transaction so that we can pickup its logs.
connection.requestAirdrop(owner.publicKey, 1);
});
});
expect(ctx.slot).to.be.greaterThan(0);
expect(logsRes.logs.length).to.eq(2);
expect(logsRes.logs[0]).to.eq(
'Program 11111111111111111111111111111111 invoke [1]',
);
expect(logsRes.logs[1]).to.eq(
'Program 11111111111111111111111111111111 success',
);
await connection.removeOnLogsListener(listener!);
});
it('https request', async () => {
const connection = new Connection('https://devnet.solana.com');
const version = await connection.getVersion();