From f07c0382668480fb1a5f852386f7977ef29b9d23 Mon Sep 17 00:00:00 2001 From: anatoly yakovenko Date: Sun, 12 Aug 2018 10:04:21 -0700 Subject: [PATCH] Fix bank coalescing (#949) * fix bank coalescing * comments * fix bench * fix bench * backout banking stage coalescing * 120 nodes * 100 --- benches/banking_stage.rs | 10 ++--- src/banking_stage.rs | 88 +++------------------------------------- tests/multinode.rs | 2 +- 3 files changed, 12 insertions(+), 88 deletions(-) diff --git a/benches/banking_stage.rs b/benches/banking_stage.rs index 115e48c4f..43a2dd3a8 100644 --- a/benches/banking_stage.rs +++ b/benches/banking_stage.rs @@ -79,9 +79,9 @@ use std::sync::Arc; // println!("{} tps", tps); // } -fn check_txs(batches: usize, receiver: &Receiver, ref_tx_count: usize) { +fn check_txs(receiver: &Receiver, ref_tx_count: usize) { let mut total = 0; - for _ in 0..batches { + loop { let signal = receiver.recv().unwrap(); if let Signal::Transactions(transactions) = signal { total += transactions.len(); @@ -150,7 +150,7 @@ fn bench_banking_stage_multi_accounts(bencher: &mut Bencher) { BankingStage::process_packets(&bank, &verified_receiver, &signal_sender, &packet_recycler) .unwrap(); - check_txs(verified_setup_len, &signal_receiver, num_src_accounts); + check_txs(&signal_receiver, num_src_accounts); let verified: Vec<_> = to_packets_chunked(&packet_recycler, &transactions.clone(), 192) .into_iter() @@ -165,7 +165,7 @@ fn bench_banking_stage_multi_accounts(bencher: &mut Bencher) { BankingStage::process_packets(&bank, &verified_receiver, &signal_sender, &packet_recycler) .unwrap(); - check_txs(verified_len, &signal_receiver, tx); + check_txs(&signal_receiver, tx); }); } @@ -208,7 +208,7 @@ fn bench_banking_stage_single_from(bencher: &mut Bencher) { BankingStage::process_packets(&bank, &verified_receiver, &signal_sender, &packet_recycler) .unwrap(); - check_txs(verified_len, &signal_receiver, tx); + check_txs(&signal_receiver, tx); }); } diff --git a/src/banking_stage.rs b/src/banking_stage.rs index 56ccc443d..6bb46e91c 100644 --- a/src/banking_stage.rs +++ b/src/banking_stage.rs @@ -12,7 +12,6 @@ use record_stage::Signal; use result::{Error, Result}; use service::Service; use std::net::SocketAddr; -use std::result; use std::sync::atomic::AtomicUsize; use std::sync::mpsc::{channel, Receiver, RecvTimeoutError, Sender}; use std::sync::Arc; @@ -28,29 +27,6 @@ pub struct BankingStage { thread_hdl: JoinHandle<()>, } -fn recv_multiple_packets( - verified_receiver: &Receiver)>>, - wait_ms: u64, - max_tries: usize, -) -> result::Result)>, RecvTimeoutError> { - let timer = Duration::new(1, 0); - let mut mms = verified_receiver.recv_timeout(timer)?; - let mut recv_tries = 1; - - // Try receiving more packets from verified_receiver. Let's coalesce any packets - // that are received within "wait_ms" ms of each other. - while let Ok(mut nq) = verified_receiver.recv_timeout(Duration::from_millis(wait_ms)) { - recv_tries += 1; - mms.append(&mut nq); - - if recv_tries >= max_tries { - inc_new_counter_info!("banking_stage-max_packets_coalesced", 1); - break; - } - } - Ok(mms) -} - impl BankingStage { /// Create the stage using `bank`. Exit when `verified_receiver` is dropped. /// Discard input packets using `packet_recycler` to minimize memory @@ -102,11 +78,9 @@ impl BankingStage { signal_sender: &Sender, packet_recycler: &PacketRecycler, ) -> Result<()> { - // Coalesce upto 512 transactions before sending it to the next stage - let max_coalesced_txs = 512; - let max_recv_tries = 10; + let timer = Duration::new(1, 0); let recv_start = Instant::now(); - let mms = recv_multiple_packets(verified_receiver, 20, max_recv_tries)?; + let mms = verified_receiver.recv_timeout(timer)?; let mut reqs_len = 0; let mms_len = mms.len(); info!( @@ -118,8 +92,6 @@ impl BankingStage { let bank_starting_tx_count = bank.transaction_count(); let count = mms.iter().map(|x| x.1.len()).sum(); let proc_start = Instant::now(); - let mut txs: Vec = Vec::new(); - let mut num_sent = 0; for (msgs, vers) in mms { let transactions = Self::deserialize_transactions(&msgs.read().unwrap()); reqs_len += transactions.len(); @@ -138,24 +110,12 @@ impl BankingStage { debug!("process_transactions"); let results = bank.process_transactions(transactions); - let mut transactions: Vec = - results.into_iter().filter_map(|x| x.ok()).collect(); - txs.append(&mut transactions); - if txs.len() >= max_coalesced_txs { - signal_sender.send(Signal::Transactions(txs.clone()))?; - txs.clear(); - num_sent += 1; - } + let transactions = results.into_iter().filter_map(|x| x.ok()).collect(); + signal_sender.send(Signal::Transactions(transactions))?; debug!("done process_transactions"); packet_recycler.recycle(msgs); } - - // Send now, if there are pending transactions, or if there was - // no transactions sent to the next stage yet. - if !txs.is_empty() || num_sent == 0 { - signal_sender.send(Signal::Transactions(txs))?; - } let total_time_s = timing::duration_as_s(&proc_start.elapsed()); let total_time_ms = timing::duration_as_ms(&proc_start.elapsed()); info!( @@ -185,42 +145,6 @@ impl Service for BankingStage { } } -#[cfg(test)] -mod test { - use banking_stage::recv_multiple_packets; - use packet::SharedPackets; - use std::sync::atomic::{AtomicBool, Ordering}; - use std::sync::mpsc::{channel, RecvTimeoutError}; - use std::sync::Arc; - use std::thread; - use std::thread::sleep; - use std::time::Duration; - - #[test] - pub fn recv_multiple_packets_test() { - let (sender, receiver) = channel(); - let exit = Arc::new(AtomicBool::new(false)); - - assert_eq!( - recv_multiple_packets(&receiver, 20, 10).unwrap_err(), - RecvTimeoutError::Timeout - ); - - { - let exit = exit.clone(); - thread::spawn(move || { - while !exit.load(Ordering::Relaxed) { - let testdata: Vec<(SharedPackets, Vec)> = Vec::new(); - sender.send(testdata).expect("Failed to send message"); - sleep(Duration::from_millis(10)); - } - }); - } - - assert_eq!(recv_multiple_packets(&receiver, 20, 10).is_ok(), true); - exit.store(true, Ordering::Relaxed); - } -} // TODO: When banking is pulled out of RequestStage, add this test back in. //use bank::Bank; @@ -238,7 +162,7 @@ mod test { //mod tests { // use bank::Bank; // use mint::Mint; -// use signature::{Keypair, KeypairUtil}; +// use signature::{KeyPair, KeyPairUtil}; // use transaction::Transaction; // // #[test] @@ -253,7 +177,7 @@ mod test { // let banking_stage = EventProcessor::new(bank, &mint.last_id(), None); // // // Process a batch that includes a transaction that receives two tokens. -// let alice = Keypair::new(); +// let alice = KeyPair::new(); // let tx = Transaction::new(&mint.keypair(), alice.pubkey(), 2, mint.last_id()); // let transactions = vec![tx]; // let entry0 = banking_stage.process_transactions(transactions).unwrap(); diff --git a/tests/multinode.rs b/tests/multinode.rs index 6b9cf569a..a193f5f82 100755 --- a/tests/multinode.rs +++ b/tests/multinode.rs @@ -524,7 +524,7 @@ fn test_multi_node_dynamic_network() { Ok(val) => val .parse() .expect(&format!("env var {} is not parse-able as usize", key)), - Err(_) => 230, + Err(_) => 100, }; let purge_key = "SOLANA_DYNAMIC_NODES_PURGE_LAG";