Coalesce multiple blobs if received close together (#869)
- This helps reduce unnecessary growth of window if small blobs are received in small space of time
This commit is contained in:
parent
1c38e40dee
commit
16772d3d51
|
@ -85,6 +85,9 @@ fn check_txs(batches: usize, receiver: &Receiver<Signal>, ref_tx_count: usize) {
|
|||
let signal = receiver.recv().unwrap();
|
||||
if let Signal::Transactions(transactions) = signal {
|
||||
total += transactions.len();
|
||||
if total >= ref_tx_count {
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
assert!(false);
|
||||
}
|
||||
|
|
|
@ -16,5 +16,10 @@ export LD_LIBRARY_PATH+=:$PWD
|
|||
|
||||
export RUST_LOG=multinode=info
|
||||
|
||||
if [[ $(ulimit -n) -le 65000 ]]; then
|
||||
echo 'Error: nofiles too small, run "ulimit -n 65000" to continue'
|
||||
exit 1
|
||||
fi
|
||||
|
||||
set -x
|
||||
exec cargo test --release --features=erasure test_multi_node_dynamic_network -- --ignored
|
||||
|
|
|
@ -11,6 +11,7 @@ use record_stage::Signal;
|
|||
use result::{Error, Result};
|
||||
use service::Service;
|
||||
use std::net::SocketAddr;
|
||||
use std::result;
|
||||
use std::sync::atomic::AtomicUsize;
|
||||
use std::sync::mpsc::{channel, Receiver, RecvTimeoutError, Sender};
|
||||
use std::sync::Arc;
|
||||
|
@ -26,6 +27,28 @@ pub struct BankingStage {
|
|||
thread_hdl: JoinHandle<()>,
|
||||
}
|
||||
|
||||
fn recv_multiple_packets(
|
||||
verified_receiver: &Receiver<Vec<(SharedPackets, Vec<u8>)>>,
|
||||
wait_ms: u64,
|
||||
max_tries: usize,
|
||||
) -> result::Result<Vec<(SharedPackets, Vec<u8>)>, RecvTimeoutError> {
|
||||
let timer = Duration::new(1, 0);
|
||||
let mut mms = verified_receiver.recv_timeout(timer)?;
|
||||
let mut recv_tries = 1;
|
||||
|
||||
// Try receiving more packets from verified_receiver. Let's coalesce any packets
|
||||
// that are received within "wait_ms" ms of each other.
|
||||
while let Ok(mut nq) = verified_receiver.recv_timeout(Duration::from_millis(wait_ms)) {
|
||||
recv_tries += 1;
|
||||
mms.append(&mut nq);
|
||||
|
||||
if recv_tries >= max_tries {
|
||||
break;
|
||||
}
|
||||
}
|
||||
Ok(mms)
|
||||
}
|
||||
|
||||
impl BankingStage {
|
||||
/// Create the stage using `bank`. Exit when `verified_receiver` is dropped.
|
||||
/// Discard input packets using `packet_recycler` to minimize memory
|
||||
|
@ -77,9 +100,11 @@ impl BankingStage {
|
|||
signal_sender: &Sender<Signal>,
|
||||
packet_recycler: &PacketRecycler,
|
||||
) -> Result<()> {
|
||||
let timer = Duration::new(1, 0);
|
||||
// Coalesce upto 512 transactions before sending it to the next stage
|
||||
let max_coalesced_txs = 512;
|
||||
let max_recv_tries = 10;
|
||||
let recv_start = Instant::now();
|
||||
let mms = verified_receiver.recv_timeout(timer)?;
|
||||
let mms = recv_multiple_packets(verified_receiver, 20, max_recv_tries)?;
|
||||
let mut reqs_len = 0;
|
||||
let mms_len = mms.len();
|
||||
info!(
|
||||
|
@ -91,6 +116,8 @@ impl BankingStage {
|
|||
let bank_starting_tx_count = bank.transaction_count();
|
||||
let count = mms.iter().map(|x| x.1.len()).sum();
|
||||
let proc_start = Instant::now();
|
||||
let mut txs: Vec<Transaction> = Vec::new();
|
||||
let mut num_sent = 0;
|
||||
for (msgs, vers) in mms {
|
||||
let transactions = Self::deserialize_transactions(&msgs.read().unwrap());
|
||||
reqs_len += transactions.len();
|
||||
|
@ -109,12 +136,24 @@ impl BankingStage {
|
|||
|
||||
debug!("process_transactions");
|
||||
let results = bank.process_transactions(transactions);
|
||||
let transactions = results.into_iter().filter_map(|x| x.ok()).collect();
|
||||
signal_sender.send(Signal::Transactions(transactions))?;
|
||||
let mut transactions: Vec<Transaction> =
|
||||
results.into_iter().filter_map(|x| x.ok()).collect();
|
||||
txs.append(&mut transactions);
|
||||
if txs.len() >= max_coalesced_txs {
|
||||
signal_sender.send(Signal::Transactions(txs.clone()))?;
|
||||
txs.clear();
|
||||
num_sent += 1;
|
||||
}
|
||||
debug!("done process_transactions");
|
||||
|
||||
packet_recycler.recycle(msgs);
|
||||
}
|
||||
|
||||
// Send now, if there are pending transactions, or if there was
|
||||
// no transactions sent to the next stage yet.
|
||||
if !txs.is_empty() || num_sent == 0 {
|
||||
signal_sender.send(Signal::Transactions(txs))?;
|
||||
}
|
||||
let total_time_s = timing::duration_as_s(&proc_start.elapsed());
|
||||
let total_time_ms = timing::duration_as_ms(&proc_start.elapsed());
|
||||
info!(
|
||||
|
@ -144,6 +183,42 @@ impl Service for BankingStage {
|
|||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use banking_stage::recv_multiple_packets;
|
||||
use packet::SharedPackets;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::mpsc::{channel, RecvTimeoutError};
|
||||
use std::sync::Arc;
|
||||
use std::thread;
|
||||
use std::thread::sleep;
|
||||
use std::time::Duration;
|
||||
|
||||
#[test]
|
||||
pub fn recv_multiple_packets_test() {
|
||||
let (sender, receiver) = channel();
|
||||
let exit = Arc::new(AtomicBool::new(false));
|
||||
|
||||
assert_eq!(
|
||||
recv_multiple_packets(&receiver, 20, 10).unwrap_err(),
|
||||
RecvTimeoutError::Timeout
|
||||
);
|
||||
|
||||
{
|
||||
let exit = exit.clone();
|
||||
thread::spawn(move || {
|
||||
while !exit.load(Ordering::Relaxed) {
|
||||
let testdata: Vec<(SharedPackets, Vec<u8>)> = Vec::new();
|
||||
sender.send(testdata).expect("Failed to send message");
|
||||
sleep(Duration::from_millis(10));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
assert_eq!(recv_multiple_packets(&receiver, 20, 10).is_ok(), true);
|
||||
exit.store(true, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
// TODO: When banking is pulled out of RequestStage, add this test back in.
|
||||
|
||||
//use bank::Bank;
|
||||
|
|
|
@ -663,7 +663,7 @@ fn retry_get_balance(
|
|||
bob_pubkey: &PublicKey,
|
||||
expected: Option<i64>,
|
||||
) -> Option<i64> {
|
||||
const LAST: usize = 20;
|
||||
const LAST: usize = 30;
|
||||
for run in 0..(LAST + 1) {
|
||||
let out = client.poll_get_balance(bob_pubkey);
|
||||
if expected.is_none() || run == LAST {
|
||||
|
@ -714,7 +714,12 @@ fn retry_send_tx_and_retry_get_balance(
|
|||
if expected.is_none() || run == LAST {
|
||||
return out.ok().clone();
|
||||
}
|
||||
trace!("retry_get_balance[{}] {:?} {:?}", run, out, expected);
|
||||
trace!(
|
||||
"retry_send_tx_and_retry_get_balance[{}] {:?} {:?}",
|
||||
run,
|
||||
out,
|
||||
expected
|
||||
);
|
||||
if let (Some(e), Ok(o)) = (expected, out) {
|
||||
if o == e {
|
||||
return Some(o);
|
||||
|
|
Loading…
Reference in New Issue