Fix bank coalescing (#949)
* fix bank coalescing * comments * fix bench * fix bench * backout banking stage coalescing * 120 nodes * 100
This commit is contained in:
parent
8eed120c38
commit
f07c038266
|
@ -79,9 +79,9 @@ use std::sync::Arc;
|
||||||
// println!("{} tps", tps);
|
// println!("{} tps", tps);
|
||||||
// }
|
// }
|
||||||
|
|
||||||
fn check_txs(batches: usize, receiver: &Receiver<Signal>, ref_tx_count: usize) {
|
fn check_txs(receiver: &Receiver<Signal>, ref_tx_count: usize) {
|
||||||
let mut total = 0;
|
let mut total = 0;
|
||||||
for _ in 0..batches {
|
loop {
|
||||||
let signal = receiver.recv().unwrap();
|
let signal = receiver.recv().unwrap();
|
||||||
if let Signal::Transactions(transactions) = signal {
|
if let Signal::Transactions(transactions) = signal {
|
||||||
total += transactions.len();
|
total += transactions.len();
|
||||||
|
@ -150,7 +150,7 @@ fn bench_banking_stage_multi_accounts(bencher: &mut Bencher) {
|
||||||
BankingStage::process_packets(&bank, &verified_receiver, &signal_sender, &packet_recycler)
|
BankingStage::process_packets(&bank, &verified_receiver, &signal_sender, &packet_recycler)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
check_txs(verified_setup_len, &signal_receiver, num_src_accounts);
|
check_txs(&signal_receiver, num_src_accounts);
|
||||||
|
|
||||||
let verified: Vec<_> = to_packets_chunked(&packet_recycler, &transactions.clone(), 192)
|
let verified: Vec<_> = to_packets_chunked(&packet_recycler, &transactions.clone(), 192)
|
||||||
.into_iter()
|
.into_iter()
|
||||||
|
@ -165,7 +165,7 @@ fn bench_banking_stage_multi_accounts(bencher: &mut Bencher) {
|
||||||
BankingStage::process_packets(&bank, &verified_receiver, &signal_sender, &packet_recycler)
|
BankingStage::process_packets(&bank, &verified_receiver, &signal_sender, &packet_recycler)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
check_txs(verified_len, &signal_receiver, tx);
|
check_txs(&signal_receiver, tx);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -208,7 +208,7 @@ fn bench_banking_stage_single_from(bencher: &mut Bencher) {
|
||||||
BankingStage::process_packets(&bank, &verified_receiver, &signal_sender, &packet_recycler)
|
BankingStage::process_packets(&bank, &verified_receiver, &signal_sender, &packet_recycler)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
check_txs(verified_len, &signal_receiver, tx);
|
check_txs(&signal_receiver, tx);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -12,7 +12,6 @@ use record_stage::Signal;
|
||||||
use result::{Error, Result};
|
use result::{Error, Result};
|
||||||
use service::Service;
|
use service::Service;
|
||||||
use std::net::SocketAddr;
|
use std::net::SocketAddr;
|
||||||
use std::result;
|
|
||||||
use std::sync::atomic::AtomicUsize;
|
use std::sync::atomic::AtomicUsize;
|
||||||
use std::sync::mpsc::{channel, Receiver, RecvTimeoutError, Sender};
|
use std::sync::mpsc::{channel, Receiver, RecvTimeoutError, Sender};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
@ -28,29 +27,6 @@ pub struct BankingStage {
|
||||||
thread_hdl: JoinHandle<()>,
|
thread_hdl: JoinHandle<()>,
|
||||||
}
|
}
|
||||||
|
|
||||||
fn recv_multiple_packets(
|
|
||||||
verified_receiver: &Receiver<Vec<(SharedPackets, Vec<u8>)>>,
|
|
||||||
wait_ms: u64,
|
|
||||||
max_tries: usize,
|
|
||||||
) -> result::Result<Vec<(SharedPackets, Vec<u8>)>, RecvTimeoutError> {
|
|
||||||
let timer = Duration::new(1, 0);
|
|
||||||
let mut mms = verified_receiver.recv_timeout(timer)?;
|
|
||||||
let mut recv_tries = 1;
|
|
||||||
|
|
||||||
// Try receiving more packets from verified_receiver. Let's coalesce any packets
|
|
||||||
// that are received within "wait_ms" ms of each other.
|
|
||||||
while let Ok(mut nq) = verified_receiver.recv_timeout(Duration::from_millis(wait_ms)) {
|
|
||||||
recv_tries += 1;
|
|
||||||
mms.append(&mut nq);
|
|
||||||
|
|
||||||
if recv_tries >= max_tries {
|
|
||||||
inc_new_counter_info!("banking_stage-max_packets_coalesced", 1);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Ok(mms)
|
|
||||||
}
|
|
||||||
|
|
||||||
impl BankingStage {
|
impl BankingStage {
|
||||||
/// Create the stage using `bank`. Exit when `verified_receiver` is dropped.
|
/// Create the stage using `bank`. Exit when `verified_receiver` is dropped.
|
||||||
/// Discard input packets using `packet_recycler` to minimize memory
|
/// Discard input packets using `packet_recycler` to minimize memory
|
||||||
|
@ -102,11 +78,9 @@ impl BankingStage {
|
||||||
signal_sender: &Sender<Signal>,
|
signal_sender: &Sender<Signal>,
|
||||||
packet_recycler: &PacketRecycler,
|
packet_recycler: &PacketRecycler,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
// Coalesce upto 512 transactions before sending it to the next stage
|
let timer = Duration::new(1, 0);
|
||||||
let max_coalesced_txs = 512;
|
|
||||||
let max_recv_tries = 10;
|
|
||||||
let recv_start = Instant::now();
|
let recv_start = Instant::now();
|
||||||
let mms = recv_multiple_packets(verified_receiver, 20, max_recv_tries)?;
|
let mms = verified_receiver.recv_timeout(timer)?;
|
||||||
let mut reqs_len = 0;
|
let mut reqs_len = 0;
|
||||||
let mms_len = mms.len();
|
let mms_len = mms.len();
|
||||||
info!(
|
info!(
|
||||||
|
@ -118,8 +92,6 @@ impl BankingStage {
|
||||||
let bank_starting_tx_count = bank.transaction_count();
|
let bank_starting_tx_count = bank.transaction_count();
|
||||||
let count = mms.iter().map(|x| x.1.len()).sum();
|
let count = mms.iter().map(|x| x.1.len()).sum();
|
||||||
let proc_start = Instant::now();
|
let proc_start = Instant::now();
|
||||||
let mut txs: Vec<Transaction> = Vec::new();
|
|
||||||
let mut num_sent = 0;
|
|
||||||
for (msgs, vers) in mms {
|
for (msgs, vers) in mms {
|
||||||
let transactions = Self::deserialize_transactions(&msgs.read().unwrap());
|
let transactions = Self::deserialize_transactions(&msgs.read().unwrap());
|
||||||
reqs_len += transactions.len();
|
reqs_len += transactions.len();
|
||||||
|
@ -138,24 +110,12 @@ impl BankingStage {
|
||||||
|
|
||||||
debug!("process_transactions");
|
debug!("process_transactions");
|
||||||
let results = bank.process_transactions(transactions);
|
let results = bank.process_transactions(transactions);
|
||||||
let mut transactions: Vec<Transaction> =
|
let transactions = results.into_iter().filter_map(|x| x.ok()).collect();
|
||||||
results.into_iter().filter_map(|x| x.ok()).collect();
|
signal_sender.send(Signal::Transactions(transactions))?;
|
||||||
txs.append(&mut transactions);
|
|
||||||
if txs.len() >= max_coalesced_txs {
|
|
||||||
signal_sender.send(Signal::Transactions(txs.clone()))?;
|
|
||||||
txs.clear();
|
|
||||||
num_sent += 1;
|
|
||||||
}
|
|
||||||
debug!("done process_transactions");
|
debug!("done process_transactions");
|
||||||
|
|
||||||
packet_recycler.recycle(msgs);
|
packet_recycler.recycle(msgs);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Send now, if there are pending transactions, or if there was
|
|
||||||
// no transactions sent to the next stage yet.
|
|
||||||
if !txs.is_empty() || num_sent == 0 {
|
|
||||||
signal_sender.send(Signal::Transactions(txs))?;
|
|
||||||
}
|
|
||||||
let total_time_s = timing::duration_as_s(&proc_start.elapsed());
|
let total_time_s = timing::duration_as_s(&proc_start.elapsed());
|
||||||
let total_time_ms = timing::duration_as_ms(&proc_start.elapsed());
|
let total_time_ms = timing::duration_as_ms(&proc_start.elapsed());
|
||||||
info!(
|
info!(
|
||||||
|
@ -185,42 +145,6 @@ impl Service for BankingStage {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
mod test {
|
|
||||||
use banking_stage::recv_multiple_packets;
|
|
||||||
use packet::SharedPackets;
|
|
||||||
use std::sync::atomic::{AtomicBool, Ordering};
|
|
||||||
use std::sync::mpsc::{channel, RecvTimeoutError};
|
|
||||||
use std::sync::Arc;
|
|
||||||
use std::thread;
|
|
||||||
use std::thread::sleep;
|
|
||||||
use std::time::Duration;
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
pub fn recv_multiple_packets_test() {
|
|
||||||
let (sender, receiver) = channel();
|
|
||||||
let exit = Arc::new(AtomicBool::new(false));
|
|
||||||
|
|
||||||
assert_eq!(
|
|
||||||
recv_multiple_packets(&receiver, 20, 10).unwrap_err(),
|
|
||||||
RecvTimeoutError::Timeout
|
|
||||||
);
|
|
||||||
|
|
||||||
{
|
|
||||||
let exit = exit.clone();
|
|
||||||
thread::spawn(move || {
|
|
||||||
while !exit.load(Ordering::Relaxed) {
|
|
||||||
let testdata: Vec<(SharedPackets, Vec<u8>)> = Vec::new();
|
|
||||||
sender.send(testdata).expect("Failed to send message");
|
|
||||||
sleep(Duration::from_millis(10));
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
assert_eq!(recv_multiple_packets(&receiver, 20, 10).is_ok(), true);
|
|
||||||
exit.store(true, Ordering::Relaxed);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// TODO: When banking is pulled out of RequestStage, add this test back in.
|
// TODO: When banking is pulled out of RequestStage, add this test back in.
|
||||||
|
|
||||||
//use bank::Bank;
|
//use bank::Bank;
|
||||||
|
@ -238,7 +162,7 @@ mod test {
|
||||||
//mod tests {
|
//mod tests {
|
||||||
// use bank::Bank;
|
// use bank::Bank;
|
||||||
// use mint::Mint;
|
// use mint::Mint;
|
||||||
// use signature::{Keypair, KeypairUtil};
|
// use signature::{KeyPair, KeyPairUtil};
|
||||||
// use transaction::Transaction;
|
// use transaction::Transaction;
|
||||||
//
|
//
|
||||||
// #[test]
|
// #[test]
|
||||||
|
@ -253,7 +177,7 @@ mod test {
|
||||||
// let banking_stage = EventProcessor::new(bank, &mint.last_id(), None);
|
// let banking_stage = EventProcessor::new(bank, &mint.last_id(), None);
|
||||||
//
|
//
|
||||||
// // Process a batch that includes a transaction that receives two tokens.
|
// // Process a batch that includes a transaction that receives two tokens.
|
||||||
// let alice = Keypair::new();
|
// let alice = KeyPair::new();
|
||||||
// let tx = Transaction::new(&mint.keypair(), alice.pubkey(), 2, mint.last_id());
|
// let tx = Transaction::new(&mint.keypair(), alice.pubkey(), 2, mint.last_id());
|
||||||
// let transactions = vec![tx];
|
// let transactions = vec![tx];
|
||||||
// let entry0 = banking_stage.process_transactions(transactions).unwrap();
|
// let entry0 = banking_stage.process_transactions(transactions).unwrap();
|
||||||
|
|
|
@ -524,7 +524,7 @@ fn test_multi_node_dynamic_network() {
|
||||||
Ok(val) => val
|
Ok(val) => val
|
||||||
.parse()
|
.parse()
|
||||||
.expect(&format!("env var {} is not parse-able as usize", key)),
|
.expect(&format!("env var {} is not parse-able as usize", key)),
|
||||||
Err(_) => 230,
|
Err(_) => 100,
|
||||||
};
|
};
|
||||||
|
|
||||||
let purge_key = "SOLANA_DYNAMIC_NODES_PURGE_LAG";
|
let purge_key = "SOLANA_DYNAMIC_NODES_PURGE_LAG";
|
||||||
|
|
Loading…
Reference in New Issue