diff --git a/bench_transactions_send.ts b/bench_transactions_send.ts index 30c307df..456c52ea 100644 --- a/bench_transactions_send.ts +++ b/bench_transactions_send.ts @@ -59,7 +59,9 @@ export async function main() { ); transaction.recentBlockhash = blockhash; transaction.feePayer = authority.publicKey; - promises.push(connection.sendTransaction(transaction, [authority, users[fromIndex]], {skipPreflight: true})) + const p =connection.sendTransaction(transaction, [authority, users[fromIndex]], {skipPreflight: true}); + promises.push(p) + await p } } if (skip_confirmations === false) diff --git a/src/main.rs b/src/main.rs index 9fc97c0e..7b969c91 100644 --- a/src/main.rs +++ b/src/main.rs @@ -104,7 +104,7 @@ fn run(port: u16, subscription_port: u16, rpc_url: String, websocket_url: String let runtime = Arc::new( tokio::runtime::Builder::new_multi_thread() - .worker_threads(32) + .worker_threads(64) .on_thread_start(move || renice_this_thread(0).unwrap()) .thread_name("solLiteRpcProcessor") .enable_all() @@ -122,7 +122,7 @@ fn run(port: u16, subscription_port: u16, rpc_url: String, websocket_url: String request_processor.clone() }) .event_loop_executor(runtime.handle().clone()) - .threads(32) + .threads(1) .cors(DomainsValidation::AllowOnly(vec![ AccessControlAllowOrigin::Any, ])) diff --git a/src/rpc.rs b/src/rpc.rs index a3007004..46e7e38e 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -42,7 +42,7 @@ use { }, }; -const TPU_BATCH_SIZE: usize = 32; +const TPU_BATCH_SIZE: usize = 8; #[derive(Clone)] pub struct LightRpcRequestProcessor { @@ -187,28 +187,42 @@ impl LightRpcRequestProcessor { let recv_res = receiver.recv(); match recv_res { Ok(transaction) => { - let mut transactions_vec = vec![transaction]; - let mut time_remaining = Duration::from_micros(1000); - for _i in 1..TPU_BATCH_SIZE { - let start = std::time::Instant::now(); - let another = receiver.recv_timeout(time_remaining); + let (fut_res, count) = if TPU_BATCH_SIZE > 1 { + let mut transactions_vec = vec![transaction]; + let mut time_remaining = Duration::from_micros(1000); + for _i in 1..TPU_BATCH_SIZE { + let start = std::time::Instant::now(); + let another = receiver.recv_timeout(time_remaining); - match another { - Ok(x) => transactions_vec.push(x), - Err(_) => break, + match another { + Ok(x) => transactions_vec.push(x), + Err(_) => break, + } + match time_remaining.checked_sub(start.elapsed()) { + Some(x) => time_remaining = x, + None => break, + } } - match time_remaining.checked_sub(start.elapsed()) { - Some(x) => time_remaining = x, - None => break, - } - } - let count: u64 = transactions_vec.len() as u64; - let slice = transactions_vec.as_slice(); - let fut_res = tpu_client.try_send_transaction_batch(slice); + let count: u64 = transactions_vec.len() as u64; + let slice = transactions_vec.as_slice(); + let fut_res = tpu_client.try_send_transaction_batch(slice); - // insert sent transactions into signature status map - transactions_vec.iter().for_each(|x| { - let signature = x.signatures[0].to_string(); + // insert sent transactions into signature status map + transactions_vec.iter().for_each(|x| { + let signature = x.signatures[0].to_string(); + context.signature_status.insert( + signature.clone(), + SignatureStatus { + status: None, + error: None, + created: Instant::now(), + }, + ); + }); + (fut_res, count) + } else { + let fut_res = tpu_client.try_send_transaction(&transaction); + let signature = transaction.signatures[0].to_string(); context.signature_status.insert( signature.clone(), SignatureStatus { @@ -217,7 +231,8 @@ impl LightRpcRequestProcessor { created: Instant::now(), }, ); - }); + (fut_res, 1) + }; match fut_res { Ok(_) => {