Merge branch 'main' into postgres_batching

This commit is contained in:
aniketfuryrocks 2023-03-31 23:07:36 +05:30
commit a25c2cdfea
No known key found for this signature in database
GPG Key ID: FA6BFCFAA7D4B764
5 changed files with 68 additions and 60 deletions

1
.gitignore vendored
View File

@ -3,5 +3,6 @@ node_modules
bench/metrics.csv
*.pem*
*.pks*
*out.json*
.env
test-ledger

View File

@ -69,13 +69,15 @@ impl BenchHelper {
funded_payer: &Keypair,
blockhash: Hash,
) -> Vec<Transaction> {
let random_bytes: Vec<u8> = Alphanumeric
.sample_iter(rand::thread_rng())
.take(10)
.collect();
(0..num_of_txs)
.map(|_| Self::create_memo_tx(&random_bytes, funded_payer, blockhash))
.map(|_| {
let random_bytes: Vec<u8> = Alphanumeric
.sample_iter(rand::thread_rng())
.take(10)
.collect();
Self::create_memo_tx(&random_bytes, funded_payer, blockhash)
})
.collect()
}

View File

@ -1,6 +1,6 @@
import { Connection, Keypair, LAMPORTS_PER_SOL, PublicKey } from '@solana/web3.js';
import * as fs from 'fs';
import { Connection, Keypair } from '@solana/web3.js';
import * as splToken from "@solana/spl-token";
import * as fs from 'fs';
import * as os from 'os';
// number of users
@ -9,45 +9,42 @@ const nbUsers = +process.argv[2];
const url = process.argv.length > 3 ? process.argv[3] : "http://0.0.0.0:8899";
// outfile
const outFile = process.argv.length > 4 ? process.argv[4] : "out.json";
console.log("creating " + nbUsers + " Users on " + url + " out file " + outFile);
function delay(ms: number) {
return new Promise( resolve => setTimeout(resolve, ms) );
}
export async function main() {
const connection = new Connection(url, 'confirmed');
let authority = Keypair.fromSecretKey(
Uint8Array.from(
JSON.parse(
process.env.KEYPAIR ||
fs.readFileSync(os.homedir() + '/.config/solana/id.json', 'utf-8'),
),
),
);
let userKps = [...Array(nbUsers)].map(_x => Keypair.generate())
let mint = await splToken.createMint(
(async function main() {
console.log("Creating " + nbUsers + " Users on " + url + " out file " + outFile);
console.time('Time taken');
const connection = new Connection(url, 'confirmed');
const authority = Keypair.fromSecretKey(
Uint8Array.from(
JSON.parse(
process.env.KEYPAIR ||
fs.readFileSync(os.homedir() + '/.config/solana/id.json', 'utf-8'),
),
),
);
const userKps = Array(nbUsers).fill(0).map(() => Keypair.generate());
const mint = await splToken.createMint(
connection,
authority,
authority.publicKey,
null,
6,
);
let accounts : PublicKey[] = [];
for (const user of userKps) {
console.log("account created");
let account = await splToken.createAccount(
const accounts = await Promise.all(userKps.map(async user => {
const account = await splToken.createAccount(
connection,
authority,
mint,
user.publicKey,
)
accounts.push(account)
await delay(100)
};
);
console.log("Account created");
for (const account of accounts) {
console.log("account minted");
await splToken.mintTo(
connection,
authority,
@ -56,30 +53,27 @@ export async function main() {
authority,
1_000_000_000_000,
)
await delay(100)
};
const users = userKps.map(x => {
const info = {
'publicKey' : x.publicKey.toBase58(),
'secretKey' : Array.from(x.secretKey)
};
return info;
});
console.log("Account minted");
return account;
}));
console.timeLog('Time taken');
const users = userKps.map(x => ({
'publicKey': x.publicKey.toBase58(),
'secretKey': Array.from(x.secretKey)
}));
const data = {
'users' : users,
'tokenAccounts' : accounts,
'mint' : mint,
'minted_amount' : 1_000_000_000_000
'users': users,
'tokenAccounts': accounts,
'mint': mint,
'minted_amount': 1_000_000_000_000
};
console.log('created ' + nbUsers + ' Users and minted 10^12 tokens for mint ' + mint);
fs.writeFileSync(outFile, JSON.stringify(data));
}
main().then(x => {
console.log('finished sucessfully')
}).catch(e => {
console.log('caught an error : ' + e)
})
fs.writeFileSync(outFile, JSON.stringify(data));
})()

View File

@ -21,10 +21,12 @@ pub const DEFAULT_WS_ADDR: &str = "ws://0.0.0.0:8900";
pub const DEFAULT_TX_MAX_RETRIES: u16 = 1;
#[from_env]
pub const DEFAULT_TX_BATCH_SIZE: usize = 512;
/// 25 slots in 10s send to little more leaders
#[from_env]
pub const DEFAULT_FANOUT_SIZE: u64 = 100;
pub const DEFAULT_FANOUT_SIZE: u64 = 30;
#[from_env]
pub const DEFAULT_TX_BATCH_INTERVAL_MS: u64 = 10;
pub const DEFAULT_TX_BATCH_INTERVAL_MS: u64 = 100;
#[from_env]
pub const DEFAULT_CLEAN_INTERVAL_MS: u64 = 5 * 60 * 1000; // five minute
pub const DEFAULT_TRANSACTION_CONFIRMATION_STATUS: TransactionConfirmationStatus =

View File

@ -45,7 +45,7 @@ lazy_static::lazy_static! {
}
pub type WireTransaction = Vec<u8>;
const NUMBER_OF_TX_SENDERS: usize = 5;
const NUMBER_OF_TX_SENDERS: usize = 7;
// making 250 as sleep time will effectively make lite rpc send
// (1000/250) * 5 * 512 = 10240 tps
const SLEEP_TIME_FOR_SENDING_TASK_MS: u64 = 250;
@ -193,14 +193,21 @@ impl TxSender {
let mut txs = Vec::with_capacity(tx_batch_size);
let mut permit = None;
let tasks_counter = tasks_counter.clone();
let mut timeout_interval = tx_send_interval.as_millis() as u64;
while txs.len() <= tx_batch_size {
match tokio::time::timeout(tx_send_interval, recv.recv()).await {
let instance = tokio::time::Instant::now();
match tokio::time::timeout(Duration::from_millis(timeout_interval), recv.recv())
.await
{
Ok(value) => match value {
Some((sig, tx, slot)) => {
TXS_IN_CHANNEL.dec();
sigs_and_slots.push((sig, slot));
txs.push(tx);
// update the timeout inteval
timeout_interval = timeout_interval
.saturating_sub(instance.elapsed().as_millis() as u64).max(1);
}
None => {
bail!("Channel Disconnected");
@ -209,8 +216,10 @@ impl TxSender {
Err(_) => {
permit = semaphore.clone().try_acquire_owned().ok();
if permit.is_some() {
// we have a permit we can send collected transaction batch
break;
} else {
// already timed out, but could not get a permit
timeout_interval = 1;
}
}
}