test: refactor notification tests on the basis of promises rather than polling

This commit is contained in:
steveluscher 2022-04-09 18:39:32 -07:00 committed by Steven Luscher
parent db50893fa1
commit 21a64db140
1 changed files with 146 additions and 151 deletions

View File

@ -20,7 +20,6 @@ import {
Message,
} from '../src';
import invariant from '../src/util/assert';
import {DEFAULT_TICKS_PER_SLOT, NUM_TICKS_PER_SECOND} from '../src/timing';
import {MOCK_PORT, url} from './url';
import {
AccountInfo,
@ -29,8 +28,10 @@ import {
BlockSignatures,
Commitment,
ConfirmedBlock,
Context,
EpochInfo,
InflationGovernor,
Logs,
SlotInfo,
} from '../src/connection';
import {sleep} from '../src/util/sleep';
@ -3551,50 +3552,84 @@ describe('Connection', function () {
);
});
it('account change notification', async () => {
if (mockServer) {
console.log('non-live test skipped');
return;
}
const connection = new Connection(url, 'confirmed');
const owner = Keypair.generate();
let subscriptionId;
try {
const notificationPromise = new Promise<AccountInfo<Buffer>>(
resolve => {
subscriptionId = connection.onAccountChange(
owner.publicKey,
resolve,
'confirmed',
);
},
);
connection.requestAirdrop(owner.publicKey, LAMPORTS_PER_SOL);
const notificationPayload = await notificationPromise;
expect(notificationPayload.lamports).to.eq(LAMPORTS_PER_SOL);
expect(notificationPayload.owner.equals(SystemProgram.programId)).to.be
.true;
} finally {
if (subscriptionId != null) {
connection.removeAccountChangeListener(subscriptionId);
describe('given an open websocket connection', () => {
beforeEach(async () => {
// Open the socket connection and wait for it to become pingable.
connection._rpcWebSocket.connect();
// eslint-disable-next-line no-constant-condition
while (true) {
try {
await connection._rpcWebSocket.notify('ping');
break;
// eslint-disable-next-line no-empty
} catch {}
await sleep(100);
}
}
});
});
it('program account change notification', async () => {
connection._commitment = 'confirmed';
it('account change notification', async () => {
const connection = new Connection(url, 'confirmed');
const owner = Keypair.generate();
const owner = Keypair.generate();
const programAccount = Keypair.generate();
const balanceNeeded = await connection.getMinimumBalanceForRentExemption(
0,
);
let subscriptionId: number | undefined;
try {
const accountInfoPromise = new Promise<AccountInfo<Buffer>>(
resolve => {
subscriptionId = connection.onAccountChange(
owner.publicKey,
resolve,
'confirmed',
);
},
);
connection.requestAirdrop(owner.publicKey, LAMPORTS_PER_SOL);
const accountInfo = await accountInfoPromise;
expect(accountInfo.lamports).to.eq(LAMPORTS_PER_SOL);
expect(accountInfo.owner.equals(SystemProgram.programId)).to.be.true;
} finally {
if (subscriptionId != null) {
await connection.removeAccountChangeListener(subscriptionId);
}
}
});
let notified = false;
const subscriptionId = connection.onProgramAccountChange(
SystemProgram.programId,
(keyedAccountInfo: KeyedAccountInfo) => {
it('program account change notification', async () => {
connection._commitment = 'confirmed';
const owner = Keypair.generate();
const programAccount = Keypair.generate();
const balanceNeeded =
await connection.getMinimumBalanceForRentExemption(0);
let subscriptionId: number | undefined;
try {
const keyedAccountInfoPromise = new Promise<KeyedAccountInfo>(
resolve => {
subscriptionId = connection.onProgramAccountChange(
SystemProgram.programId,
resolve,
);
},
);
await helpers.airdrop({
connection,
address: owner.publicKey,
amount: LAMPORTS_PER_SOL,
});
const transaction = new Transaction().add(
SystemProgram.transfer({
fromPubkey: owner.publicKey,
toPubkey: programAccount.publicKey,
lamports: balanceNeeded,
}),
);
await sendAndConfirmTransaction(connection, transaction, [owner], {
commitment: 'confirmed',
});
const keyedAccountInfo = await keyedAccountInfoPromise;
if (keyedAccountInfo.accountId.equals(programAccount.publicKey)) {
expect(keyedAccountInfo.accountInfo.lamports).to.eq(balanceNeeded);
expect(
@ -3602,129 +3637,89 @@ describe('Connection', function () {
SystemProgram.programId,
),
).to.be.true;
notified = true;
}
},
);
await helpers.airdrop({
connection,
address: owner.publicKey,
amount: LAMPORTS_PER_SOL,
} finally {
if (subscriptionId != null) {
await connection.removeProgramAccountChangeListener(subscriptionId);
}
}
});
try {
const transaction = new Transaction().add(
SystemProgram.transfer({
fromPubkey: owner.publicKey,
toPubkey: programAccount.publicKey,
lamports: balanceNeeded,
}),
);
await sendAndConfirmTransaction(connection, transaction, [owner], {
commitment: 'confirmed',
});
} catch (err) {
await connection.removeProgramAccountChangeListener(subscriptionId);
throw err;
}
let i = 0;
while (!notified) {
if (++i === 30) {
throw new Error('Program change notification not observed');
it('slot notification', async () => {
let subscriptionId: number | undefined;
try {
const notifiedSlotInfo = await new Promise<SlotInfo>(resolve => {
subscriptionId = connection.onSlotChange(resolve);
});
expect(notifiedSlotInfo.parent).to.be.at.least(0);
expect(notifiedSlotInfo.root).to.be.at.least(0);
expect(notifiedSlotInfo.slot).to.be.at.least(1);
} finally {
if (subscriptionId != null) {
await connection.removeSlotChangeListener(subscriptionId);
}
}
// Sleep for a 1/4 of a slot, notifications only occur after a block is
// processed
await sleep((250 * DEFAULT_TICKS_PER_SLOT) / NUM_TICKS_PER_SECOND);
}
await connection.removeProgramAccountChangeListener(subscriptionId);
});
it('slot notification', async () => {
let notifiedSlotInfo: SlotInfo | undefined;
const subscriptionId = connection.onSlotChange(slotInfo => {
notifiedSlotInfo = slotInfo;
});
// Wait for notification
let i = 0;
while (!notifiedSlotInfo) {
if (++i === 30) {
throw new Error('Slot change notification not observed');
it('root notification', async () => {
let subscriptionId: number | undefined;
try {
const atLeastTwoRoots = await new Promise<number[]>(resolve => {
const roots: number[] = [];
subscriptionId = connection.onRootChange(root => {
if (roots.length === 2) {
return;
}
roots.push(root);
if (roots.length === 2) {
// Collect at least two, then resolve.
resolve(roots);
}
});
});
expect(atLeastTwoRoots[1]).to.be.greaterThan(atLeastTwoRoots[0]);
} finally {
if (subscriptionId != null) {
await connection.removeRootChangeListener(subscriptionId);
}
}
// Sleep for a 1/4 of a slot, notifications only occur after a block is
// processed
await sleep((250 * DEFAULT_TICKS_PER_SLOT) / NUM_TICKS_PER_SECOND);
}
expect(notifiedSlotInfo.parent).to.be.at.least(0);
expect(notifiedSlotInfo.root).to.be.at.least(0);
expect(notifiedSlotInfo.slot).to.be.at.least(1);
await connection.removeSlotChangeListener(subscriptionId);
});
it('root notification', async () => {
let roots: number[] = [];
const subscriptionId = connection.onRootChange(root => {
roots.push(root);
});
// Wait for mockCallback to receive a call
let i = 0;
while (roots.length < 2) {
if (++i === 30) {
throw new Error('Root change notification not observed');
}
// Sleep for a 1/4 of a slot, notifications only occur after a block is
// processed
await sleep((250 * DEFAULT_TICKS_PER_SLOT) / NUM_TICKS_PER_SECOND);
}
expect(roots[1]).to.be.greaterThan(roots[0]);
await connection.removeRootChangeListener(subscriptionId);
});
it('logs notification', async () => {
let listener: number | undefined;
const owner = Keypair.generate();
const [logsRes, ctx] = await new Promise(resolve => {
let received = false;
listener = connection.onLogs(
owner.publicKey,
(logs, ctx) => {
if (!logs.err) {
received = true;
resolve([logs, ctx]);
}
},
'processed',
);
// Send transactions until the log subscription receives an event
(async () => {
while (!received) {
// Execute a transaction so that we can pickup its logs.
await connection.requestAirdrop(
it('logs notification', async () => {
let subscriptionId: number | undefined;
const owner = Keypair.generate();
try {
const logPromise = new Promise<[Logs, Context]>(resolve => {
subscriptionId = connection.onLogs(
owner.publicKey,
1 * LAMPORTS_PER_SOL,
(logs, ctx) => {
if (!logs.err) {
resolve([logs, ctx]);
}
},
'processed',
);
await sleep(1000);
});
// Execute a transaction so that we can pickup its logs.
await connection.requestAirdrop(owner.publicKey, LAMPORTS_PER_SOL);
const [logsRes, ctx] = await logPromise;
expect(ctx.slot).to.be.greaterThan(0);
expect(logsRes.logs.length).to.eq(2);
expect(logsRes.logs[0]).to.eq(
'Program 11111111111111111111111111111111 invoke [1]',
);
expect(logsRes.logs[1]).to.eq(
'Program 11111111111111111111111111111111 success',
);
} finally {
if (subscriptionId != null) {
await connection.removeOnLogsListener(subscriptionId);
}
})();
}
});
expect(ctx.slot).to.be.greaterThan(0);
expect(logsRes.logs.length).to.eq(2);
expect(logsRes.logs[0]).to.eq(
'Program 11111111111111111111111111111111 invoke [1]',
);
expect(logsRes.logs[1]).to.eq(
'Program 11111111111111111111111111111111 success',
);
await connection.removeOnLogsListener(listener!);
}).timeout(60 * 1000);
});
it('https request', async () => {
const connection = new Connection('https://api.mainnet-beta.solana.com');