diff --git a/.gitignore b/.gitignore index a59e8375..83bc880a 100644 --- a/.gitignore +++ b/.gitignore @@ -3,5 +3,6 @@ node_modules bench/metrics.csv *.pem* *.pks* +*out.json* .env test-ledger diff --git a/bench/src/helpers.rs b/bench/src/helpers.rs index dcec19dc..5b8ac882 100644 --- a/bench/src/helpers.rs +++ b/bench/src/helpers.rs @@ -69,13 +69,15 @@ impl BenchHelper { funded_payer: &Keypair, blockhash: Hash, ) -> Vec { - let random_bytes: Vec = Alphanumeric - .sample_iter(rand::thread_rng()) - .take(10) - .collect(); - (0..num_of_txs) - .map(|_| Self::create_memo_tx(&random_bytes, funded_payer, blockhash)) + .map(|_| { + let random_bytes: Vec = Alphanumeric + .sample_iter(rand::thread_rng()) + .take(10) + .collect(); + + Self::create_memo_tx(&random_bytes, funded_payer, blockhash) + }) .collect() } diff --git a/benches/create_n_users.bench.ts b/benches/create_n_users.bench.ts index 6964472d..33dddbc4 100644 --- a/benches/create_n_users.bench.ts +++ b/benches/create_n_users.bench.ts @@ -1,6 +1,6 @@ -import { Connection, Keypair, LAMPORTS_PER_SOL, PublicKey } from '@solana/web3.js'; -import * as fs from 'fs'; +import { Connection, Keypair } from '@solana/web3.js'; import * as splToken from "@solana/spl-token"; +import * as fs from 'fs'; import * as os from 'os'; // number of users @@ -9,45 +9,42 @@ const nbUsers = +process.argv[2]; const url = process.argv.length > 3 ? process.argv[3] : "http://0.0.0.0:8899"; // outfile const outFile = process.argv.length > 4 ? process.argv[4] : "out.json"; - -console.log("creating " + nbUsers + " Users on " + url + " out file " + outFile); -function delay(ms: number) { - return new Promise( resolve => setTimeout(resolve, ms) ); -} -export async function main() { - const connection = new Connection(url, 'confirmed'); - let authority = Keypair.fromSecretKey( - Uint8Array.from( - JSON.parse( - process.env.KEYPAIR || - fs.readFileSync(os.homedir() + '/.config/solana/id.json', 'utf-8'), - ), - ), - ); - let userKps = [...Array(nbUsers)].map(_x => Keypair.generate()) - let mint = await splToken.createMint( +(async function main() { + console.log("Creating " + nbUsers + " Users on " + url + " out file " + outFile); + console.time('Time taken'); + + const connection = new Connection(url, 'confirmed'); + + const authority = Keypair.fromSecretKey( + Uint8Array.from( + JSON.parse( + process.env.KEYPAIR || + fs.readFileSync(os.homedir() + '/.config/solana/id.json', 'utf-8'), + ), + ), + ); + + const userKps = Array(nbUsers).fill(0).map(() => Keypair.generate()); + + const mint = await splToken.createMint( connection, authority, authority.publicKey, null, 6, ); - let accounts : PublicKey[] = []; - for (const user of userKps) { - console.log("account created"); - let account = await splToken.createAccount( + + const accounts = await Promise.all(userKps.map(async user => { + const account = await splToken.createAccount( connection, authority, mint, user.publicKey, - ) - accounts.push(account) - await delay(100) - }; + ); + + console.log("Account created"); - for (const account of accounts) { - console.log("account minted"); await splToken.mintTo( connection, authority, @@ -56,30 +53,27 @@ export async function main() { authority, 1_000_000_000_000, ) - await delay(100) - }; - const users = userKps.map(x => { - const info = { - 'publicKey' : x.publicKey.toBase58(), - 'secretKey' : Array.from(x.secretKey) - }; - return info; - }); + console.log("Account minted"); + + return account; + })); + + console.timeLog('Time taken'); + + const users = userKps.map(x => ({ + 'publicKey': x.publicKey.toBase58(), + 'secretKey': Array.from(x.secretKey) + })); const data = { - 'users' : users, - 'tokenAccounts' : accounts, - 'mint' : mint, - 'minted_amount' : 1_000_000_000_000 + 'users': users, + 'tokenAccounts': accounts, + 'mint': mint, + 'minted_amount': 1_000_000_000_000 }; console.log('created ' + nbUsers + ' Users and minted 10^12 tokens for mint ' + mint); - fs.writeFileSync(outFile, JSON.stringify(data)); -} -main().then(x => { - console.log('finished sucessfully') -}).catch(e => { - console.log('caught an error : ' + e) -}) \ No newline at end of file + fs.writeFileSync(outFile, JSON.stringify(data)); +})() diff --git a/src/lib.rs b/src/lib.rs index 45e66760..e49c1784 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -21,10 +21,12 @@ pub const DEFAULT_WS_ADDR: &str = "ws://0.0.0.0:8900"; pub const DEFAULT_TX_MAX_RETRIES: u16 = 1; #[from_env] pub const DEFAULT_TX_BATCH_SIZE: usize = 512; + +/// 25 slots in 10s send to little more leaders #[from_env] -pub const DEFAULT_FANOUT_SIZE: u64 = 100; +pub const DEFAULT_FANOUT_SIZE: u64 = 30; #[from_env] -pub const DEFAULT_TX_BATCH_INTERVAL_MS: u64 = 10; +pub const DEFAULT_TX_BATCH_INTERVAL_MS: u64 = 100; #[from_env] pub const DEFAULT_CLEAN_INTERVAL_MS: u64 = 5 * 60 * 1000; // five minute pub const DEFAULT_TRANSACTION_CONFIRMATION_STATUS: TransactionConfirmationStatus = diff --git a/src/workers/tx_sender.rs b/src/workers/tx_sender.rs index c70a4ee9..a3024472 100644 --- a/src/workers/tx_sender.rs +++ b/src/workers/tx_sender.rs @@ -45,7 +45,7 @@ lazy_static::lazy_static! { } pub type WireTransaction = Vec; -const NUMBER_OF_TX_SENDERS: usize = 5; +const NUMBER_OF_TX_SENDERS: usize = 7; // making 250 as sleep time will effectively make lite rpc send // (1000/250) * 5 * 512 = 10240 tps const SLEEP_TIME_FOR_SENDING_TASK_MS: u64 = 250; @@ -193,14 +193,21 @@ impl TxSender { let mut txs = Vec::with_capacity(tx_batch_size); let mut permit = None; let tasks_counter = tasks_counter.clone(); + let mut timeout_interval = tx_send_interval.as_millis() as u64; while txs.len() <= tx_batch_size { - match tokio::time::timeout(tx_send_interval, recv.recv()).await { + let instance = tokio::time::Instant::now(); + match tokio::time::timeout(Duration::from_millis(timeout_interval), recv.recv()) + .await + { Ok(value) => match value { Some((sig, tx, slot)) => { TXS_IN_CHANNEL.dec(); sigs_and_slots.push((sig, slot)); txs.push(tx); + // update the timeout inteval + timeout_interval = timeout_interval + .saturating_sub(instance.elapsed().as_millis() as u64).max(1); } None => { bail!("Channel Disconnected"); @@ -209,8 +216,10 @@ impl TxSender { Err(_) => { permit = semaphore.clone().try_acquire_owned().ok(); if permit.is_some() { - // we have a permit we can send collected transaction batch break; + } else { + // already timed out, but could not get a permit + timeout_interval = 1; } } }