diff --git a/Cargo.toml b/Cargo.toml index 4aeaba39e5..d6b0866c09 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -88,3 +88,15 @@ criterion = "0.2" [[bench]] name = "bank" harness = false + +[[bench]] +name = "banking_stage" + +[[bench]] +name = "ledger" + +[[bench]] +name = "signature" + +[[bench]] +name = "streamer" diff --git a/benches/banking_stage.rs b/benches/banking_stage.rs new file mode 100644 index 0000000000..dfef971ec3 --- /dev/null +++ b/benches/banking_stage.rs @@ -0,0 +1,236 @@ +#![feature(test)] + +extern crate bincode; +extern crate rayon; +extern crate solana; +extern crate test; +#[macro_use] +extern crate log; + +use rayon::prelude::*; +use solana::bank::Bank; +use solana::banking_stage::BankingStage; +use solana::logger; +use solana::mint::Mint; +use solana::packet::{to_packets_chunked, PacketRecycler}; +use solana::record_stage::Signal; +use solana::signature::{KeyPair, KeyPairUtil}; +use solana::transaction::Transaction; +use std::iter; +use std::sync::mpsc::{channel, Receiver}; +use std::sync::Arc; +use test::Bencher; + +// extern crate test; +// use self::test::Bencher; +// use bank::{Bank, MAX_ENTRY_IDS}; +// use bincode::serialize; +// use hash::hash; +// use mint::Mint; +// use rayon::prelude::*; +// use signature::{KeyPair, KeyPairUtil}; +// use std::collections::HashSet; +// use std::time::Instant; +// use transaction::Transaction; +// +// #[bench] +// fn bench_process_transactions(_bencher: &mut Bencher) { +// let mint = Mint::new(100_000_000); +// let bank = Bank::new(&mint); +// // Create transactions between unrelated parties. +// let txs = 100_000; +// let last_ids: Mutex> = Mutex::new(HashSet::new()); +// let transactions: Vec<_> = (0..txs) +// .into_par_iter() +// .map(|i| { +// // Seed the 'to' account and a cell for its signature. +// let dummy_id = i % (MAX_ENTRY_IDS as i32); +// let last_id = hash(&serialize(&dummy_id).unwrap()); // Semi-unique hash +// { +// let mut last_ids = last_ids.lock().unwrap(); +// if !last_ids.contains(&last_id) { +// last_ids.insert(last_id); +// bank.register_entry_id(&last_id); +// } +// } +// +// // Seed the 'from' account. +// let rando0 = KeyPair::new(); +// let tx = Transaction::new(&mint.keypair(), rando0.pubkey(), 1_000, last_id); +// bank.process_transaction(&tx).unwrap(); +// +// let rando1 = KeyPair::new(); +// let tx = Transaction::new(&rando0, rando1.pubkey(), 2, last_id); +// bank.process_transaction(&tx).unwrap(); +// +// // Finally, return a transaction that's unique +// Transaction::new(&rando0, rando1.pubkey(), 1, last_id) +// }) +// .collect(); +// +// let banking_stage = EventProcessor::new(bank, &mint.last_id(), None); +// +// let now = Instant::now(); +// assert!(banking_stage.process_transactions(transactions).is_ok()); +// let duration = now.elapsed(); +// let sec = duration.as_secs() as f64 + duration.subsec_nanos() as f64 / 1_000_000_000.0; +// let tps = txs as f64 / sec; +// +// // Ensure that all transactions were successfully logged. +// drop(banking_stage.historian_input); +// let entries: Vec = banking_stage.output.lock().unwrap().iter().collect(); +// assert_eq!(entries.len(), 1); +// assert_eq!(entries[0].transactions.len(), txs as usize); +// +// println!("{} tps", tps); +// } + +fn check_txs(batches: usize, receiver: &Receiver, ref_tx_count: usize) { + let mut total = 0; + for _ in 0..batches { + let signal = receiver.recv().unwrap(); + if let Signal::Transactions(transactions) = signal { + total += transactions.len(); + } else { + assert!(false); + } + } + assert_eq!(total, ref_tx_count); +} + +#[bench] +fn bench_banking_stage_multi_accounts(bencher: &mut Bencher) { + logger::setup(); + let tx = 10_000_usize; + let mint_total = 1_000_000_000_000; + let mint = Mint::new(mint_total); + let num_dst_accounts = 8 * 1024; + let num_src_accounts = 8 * 1024; + + let srckeys: Vec<_> = (0..num_src_accounts).map(|_| KeyPair::new()).collect(); + let dstkeys: Vec<_> = (0..num_dst_accounts) + .map(|_| KeyPair::new().pubkey()) + .collect(); + + info!("created keys src: {} dst: {}", srckeys.len(), dstkeys.len()); + + let transactions: Vec<_> = (0..tx) + .map(|i| { + Transaction::new( + &srckeys[i % num_src_accounts], + dstkeys[i % num_dst_accounts], + i as i64, + mint.last_id(), + ) + }) + .collect(); + + info!("created transactions"); + + let (verified_sender, verified_receiver) = channel(); + let (signal_sender, signal_receiver) = channel(); + let packet_recycler = PacketRecycler::default(); + + let setup_transactions: Vec<_> = (0..num_src_accounts) + .map(|i| { + Transaction::new( + &mint.keypair(), + srckeys[i].pubkey(), + mint_total / num_src_accounts as i64, + mint.last_id(), + ) + }) + .collect(); + + bencher.iter(move || { + let bank = Arc::new(Bank::new(&mint)); + + let verified_setup: Vec<_> = + to_packets_chunked(&packet_recycler, setup_transactions.clone(), tx) + .into_iter() + .map(|x| { + let len = (*x).read().unwrap().packets.len(); + (x, iter::repeat(1).take(len).collect()) + }) + .collect(); + + let verified_setup_len = verified_setup.len(); + verified_sender.send(verified_setup).unwrap(); + BankingStage::process_packets( + bank.clone(), + &verified_receiver, + &signal_sender, + &packet_recycler, + ).unwrap(); + + check_txs(verified_setup_len, &signal_receiver, num_src_accounts); + + let verified: Vec<_> = to_packets_chunked(&packet_recycler, transactions.clone(), 192) + .into_iter() + .map(|x| { + let len = (*x).read().unwrap().packets.len(); + (x, iter::repeat(1).take(len).collect()) + }) + .collect(); + + let verified_len = verified.len(); + verified_sender.send(verified).unwrap(); + BankingStage::process_packets( + bank.clone(), + &verified_receiver, + &signal_sender, + &packet_recycler, + ).unwrap(); + + check_txs(verified_len, &signal_receiver, tx); + }); +} + +#[bench] +fn bench_banking_stage_single_from(bencher: &mut Bencher) { + logger::setup(); + let tx = 10_000_usize; + let mint = Mint::new(1_000_000_000_000); + let mut pubkeys = Vec::new(); + let num_keys = 8; + for _ in 0..num_keys { + pubkeys.push(KeyPair::new().pubkey()); + } + + let transactions: Vec<_> = (0..tx) + .into_par_iter() + .map(|i| { + Transaction::new( + &mint.keypair(), + pubkeys[i % num_keys], + i as i64, + mint.last_id(), + ) + }) + .collect(); + + let (verified_sender, verified_receiver) = channel(); + let (signal_sender, signal_receiver) = channel(); + let packet_recycler = PacketRecycler::default(); + + bencher.iter(move || { + let bank = Arc::new(Bank::new(&mint)); + let verified: Vec<_> = to_packets_chunked(&packet_recycler, transactions.clone(), tx) + .into_iter() + .map(|x| { + let len = (*x).read().unwrap().packets.len(); + (x, iter::repeat(1).take(len).collect()) + }) + .collect(); + let verified_len = verified.len(); + verified_sender.send(verified).unwrap(); + BankingStage::process_packets( + bank.clone(), + &verified_receiver, + &signal_sender, + &packet_recycler, + ).unwrap(); + + check_txs(verified_len, &signal_receiver, tx); + }); +} diff --git a/benches/ledger.rs b/benches/ledger.rs new file mode 100644 index 0000000000..0126764b11 --- /dev/null +++ b/benches/ledger.rs @@ -0,0 +1,29 @@ +#![feature(test)] + +extern crate solana; +extern crate test; + +use solana::hash::{hash, Hash}; +use solana::ledger::{next_entries, reconstruct_entries_from_blobs, Block}; +use solana::packet::BlobRecycler; +use solana::signature::{KeyPair, KeyPairUtil}; +use solana::transaction::Transaction; +use std::collections::VecDeque; +use test::Bencher; + +#[bench] +fn bench_block_to_blobs_to_block(bencher: &mut Bencher) { + let zero = Hash::default(); + let one = hash(&zero); + let keypair = KeyPair::new(); + let tx0 = Transaction::new(&keypair, keypair.pubkey(), 1, one); + let transactions = vec![tx0; 10]; + let entries = next_entries(&zero, 1, transactions); + + let blob_recycler = BlobRecycler::default(); + bencher.iter(|| { + let mut blob_q = VecDeque::new(); + entries.to_blobs(&blob_recycler, &mut blob_q); + assert_eq!(reconstruct_entries_from_blobs(blob_q).unwrap(), entries); + }); +} diff --git a/benches/signature.rs b/benches/signature.rs new file mode 100644 index 0000000000..2e989be3d5 --- /dev/null +++ b/benches/signature.rs @@ -0,0 +1,12 @@ +#![feature(test)] +extern crate solana; +extern crate test; + +use solana::signature::GenKeys; +use test::Bencher; + +#[bench] +fn bench_gen_keys(b: &mut Bencher) { + let rnd = GenKeys::new([0u8; 32]); + b.iter(|| rnd.gen_n_keypairs(1000)); +} diff --git a/benches/streamer.rs b/benches/streamer.rs new file mode 100644 index 0000000000..4e003b19e6 --- /dev/null +++ b/benches/streamer.rs @@ -0,0 +1,104 @@ +#![feature(test)] + +#[macro_use] +extern crate log; +extern crate solana; +extern crate test; + +use solana::packet::{Packet, PacketRecycler, BLOB_SIZE, PACKET_DATA_SIZE}; +use solana::result::Result; +use solana::streamer::{receiver, PacketReceiver}; +use std::net::{SocketAddr, UdpSocket}; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::mpsc::channel; +use std::sync::{Arc, Mutex}; +use std::thread::sleep; +use std::thread::{spawn, JoinHandle}; +use std::time::Duration; +use std::time::SystemTime; +use test::Bencher; + +fn producer(addr: &SocketAddr, recycler: PacketRecycler, exit: Arc) -> JoinHandle<()> { + let send = UdpSocket::bind("0.0.0.0:0").unwrap(); + let msgs = recycler.allocate(); + let msgs_ = msgs.clone(); + msgs.write().unwrap().packets.resize(10, Packet::default()); + for w in msgs.write().unwrap().packets.iter_mut() { + w.meta.size = PACKET_DATA_SIZE; + w.meta.set_addr(&addr); + } + spawn(move || loop { + if exit.load(Ordering::Relaxed) { + return; + } + let mut num = 0; + for p in msgs_.read().unwrap().packets.iter() { + let a = p.meta.addr(); + assert!(p.meta.size < BLOB_SIZE); + send.send_to(&p.data[..p.meta.size], &a).unwrap(); + num += 1; + } + assert_eq!(num, 10); + }) +} + +fn sink( + recycler: PacketRecycler, + exit: Arc, + rvs: Arc>, + r: PacketReceiver, +) -> JoinHandle<()> { + spawn(move || loop { + if exit.load(Ordering::Relaxed) { + return; + } + let timer = Duration::new(1, 0); + match r.recv_timeout(timer) { + Ok(msgs) => { + *rvs.lock().unwrap() += msgs.read().unwrap().packets.len(); + recycler.recycle(msgs); + } + _ => (), + } + }) +} + +fn bench_streamer_with_result() -> Result<()> { + let read = UdpSocket::bind("127.0.0.1:0")?; + read.set_read_timeout(Some(Duration::new(1, 0)))?; + + let addr = read.local_addr()?; + let exit = Arc::new(AtomicBool::new(false)); + let pack_recycler = PacketRecycler::default(); + + let (s_reader, r_reader) = channel(); + let t_reader = receiver(read, exit.clone(), pack_recycler.clone(), s_reader); + let t_producer1 = producer(&addr, pack_recycler.clone(), exit.clone()); + let t_producer2 = producer(&addr, pack_recycler.clone(), exit.clone()); + let t_producer3 = producer(&addr, pack_recycler.clone(), exit.clone()); + + let rvs = Arc::new(Mutex::new(0)); + let t_sink = sink(pack_recycler.clone(), exit.clone(), rvs.clone(), r_reader); + + let start = SystemTime::now(); + let start_val = *rvs.lock().unwrap(); + sleep(Duration::new(5, 0)); + let elapsed = start.elapsed().unwrap(); + let end_val = *rvs.lock().unwrap(); + let time = elapsed.as_secs() * 10000000000 + elapsed.subsec_nanos() as u64; + let ftime = (time as f64) / 10000000000f64; + let fcount = (end_val - start_val) as f64; + trace!("performance: {:?}", fcount / ftime); + exit.store(true, Ordering::Relaxed); + t_reader.join()?; + t_producer1.join()?; + t_producer2.join()?; + t_producer3.join()?; + t_sink.join()?; + Ok(()) +} + +#[bench] +fn bench_streamer(_bench: &mut Bencher) { + bench_streamer_with_result().unwrap(); +} diff --git a/src/banking_stage.rs b/src/banking_stage.rs index 9229312bfe..4616d45809 100644 --- a/src/banking_stage.rs +++ b/src/banking_stage.rs @@ -71,7 +71,7 @@ impl BankingStage { /// Process the incoming packets and send output `Signal` messages to `signal_sender`. /// Discard packets via `packet_recycler`. - fn process_packets( + pub fn process_packets( bank: Arc, verified_receiver: &Receiver)>>, signal_sender: &Sender, @@ -200,239 +200,3 @@ impl Service for BankingStage { // assert_eq!(bank.get_balance(&alice.pubkey()), Some(1)); // } //} -// -//#[cfg(all(feature = "unstable", test))] -//mod bench { -// extern crate test; -// use self::test::Bencher; -// use bank::{Bank, MAX_ENTRY_IDS}; -// use bincode::serialize; -// use hash::hash; -// use mint::Mint; -// use rayon::prelude::*; -// use signature::{KeyPair, KeyPairUtil}; -// use std::collections::HashSet; -// use std::time::Instant; -// use transaction::Transaction; -// -// #[bench] -// fn bench_process_transactions(_bencher: &mut Bencher) { -// let mint = Mint::new(100_000_000); -// let bank = Bank::new(&mint); -// // Create transactions between unrelated parties. -// let txs = 100_000; -// let last_ids: Mutex> = Mutex::new(HashSet::new()); -// let transactions: Vec<_> = (0..txs) -// .into_par_iter() -// .map(|i| { -// // Seed the 'to' account and a cell for its signature. -// let dummy_id = i % (MAX_ENTRY_IDS as i32); -// let last_id = hash(&serialize(&dummy_id).unwrap()); // Semi-unique hash -// { -// let mut last_ids = last_ids.lock().unwrap(); -// if !last_ids.contains(&last_id) { -// last_ids.insert(last_id); -// bank.register_entry_id(&last_id); -// } -// } -// -// // Seed the 'from' account. -// let rando0 = KeyPair::new(); -// let tx = Transaction::new(&mint.keypair(), rando0.pubkey(), 1_000, last_id); -// bank.process_transaction(&tx).unwrap(); -// -// let rando1 = KeyPair::new(); -// let tx = Transaction::new(&rando0, rando1.pubkey(), 2, last_id); -// bank.process_transaction(&tx).unwrap(); -// -// // Finally, return a transaction that's unique -// Transaction::new(&rando0, rando1.pubkey(), 1, last_id) -// }) -// .collect(); -// -// let banking_stage = EventProcessor::new(bank, &mint.last_id(), None); -// -// let now = Instant::now(); -// assert!(banking_stage.process_transactions(transactions).is_ok()); -// let duration = now.elapsed(); -// let sec = duration.as_secs() as f64 + duration.subsec_nanos() as f64 / 1_000_000_000.0; -// let tps = txs as f64 / sec; -// -// // Ensure that all transactions were successfully logged. -// drop(banking_stage.historian_input); -// let entries: Vec = banking_stage.output.lock().unwrap().iter().collect(); -// assert_eq!(entries.len(), 1); -// assert_eq!(entries[0].transactions.len(), txs as usize); -// -// println!("{} tps", tps); -// } -//} - -#[cfg(all(feature = "unstable", test))] -mod bench { - extern crate test; - use self::test::Bencher; - use bank::*; - use banking_stage::BankingStage; - use logger; - use mint::Mint; - use packet::{to_packets_chunked, PacketRecycler}; - use rayon::prelude::*; - use record_stage::Signal; - use signature::{KeyPair, KeyPairUtil}; - use std::iter; - use std::sync::mpsc::{channel, Receiver}; - use std::sync::Arc; - use transaction::Transaction; - - fn check_txs(batches: usize, receiver: &Receiver, ref_tx_count: usize) { - let mut total = 0; - for _ in 0..batches { - let signal = receiver.recv().unwrap(); - if let Signal::Transactions(transactions) = signal { - total += transactions.len(); - } else { - assert!(false); - } - } - assert_eq!(total, ref_tx_count); - } - - #[bench] - fn bench_banking_stage_multi_accounts(bencher: &mut Bencher) { - logger::setup(); - let tx = 10_000_usize; - let mint_total = 1_000_000_000_000; - let mint = Mint::new(mint_total); - let num_dst_accounts = 8 * 1024; - let num_src_accounts = 8 * 1024; - - let srckeys: Vec<_> = (0..num_src_accounts).map(|_| KeyPair::new()).collect(); - let dstkeys: Vec<_> = (0..num_dst_accounts) - .map(|_| KeyPair::new().pubkey()) - .collect(); - - info!("created keys src: {} dst: {}", srckeys.len(), dstkeys.len()); - - let transactions: Vec<_> = (0..tx) - .map(|i| { - Transaction::new( - &srckeys[i % num_src_accounts], - dstkeys[i % num_dst_accounts], - i as i64, - mint.last_id(), - ) - }) - .collect(); - - info!("created transactions"); - - let (verified_sender, verified_receiver) = channel(); - let (signal_sender, signal_receiver) = channel(); - let packet_recycler = PacketRecycler::default(); - - let setup_transactions: Vec<_> = (0..num_src_accounts) - .map(|i| { - Transaction::new( - &mint.keypair(), - srckeys[i].pubkey(), - mint_total / num_src_accounts as i64, - mint.last_id(), - ) - }) - .collect(); - - bencher.iter(move || { - let bank = Arc::new(Bank::new(&mint)); - - let verified_setup: Vec<_> = - to_packets_chunked(&packet_recycler, setup_transactions.clone(), tx) - .into_iter() - .map(|x| { - let len = (*x).read().unwrap().packets.len(); - (x, iter::repeat(1).take(len).collect()) - }) - .collect(); - - let verified_setup_len = verified_setup.len(); - verified_sender.send(verified_setup).unwrap(); - BankingStage::process_packets( - bank.clone(), - &verified_receiver, - &signal_sender, - &packet_recycler, - ).unwrap(); - - check_txs(verified_setup_len, &signal_receiver, num_src_accounts); - - let verified: Vec<_> = to_packets_chunked(&packet_recycler, transactions.clone(), 192) - .into_iter() - .map(|x| { - let len = (*x).read().unwrap().packets.len(); - (x, iter::repeat(1).take(len).collect()) - }) - .collect(); - - let verified_len = verified.len(); - verified_sender.send(verified).unwrap(); - BankingStage::process_packets( - bank.clone(), - &verified_receiver, - &signal_sender, - &packet_recycler, - ).unwrap(); - - check_txs(verified_len, &signal_receiver, tx); - }); - } - - #[bench] - fn bench_banking_stage_single_from(bencher: &mut Bencher) { - logger::setup(); - let tx = 10_000_usize; - let mint = Mint::new(1_000_000_000_000); - let mut pubkeys = Vec::new(); - let num_keys = 8; - for _ in 0..num_keys { - pubkeys.push(KeyPair::new().pubkey()); - } - - let transactions: Vec<_> = (0..tx) - .into_par_iter() - .map(|i| { - Transaction::new( - &mint.keypair(), - pubkeys[i % num_keys], - i as i64, - mint.last_id(), - ) - }) - .collect(); - - let (verified_sender, verified_receiver) = channel(); - let (signal_sender, signal_receiver) = channel(); - let packet_recycler = PacketRecycler::default(); - - bencher.iter(move || { - let bank = Arc::new(Bank::new(&mint)); - let verified: Vec<_> = to_packets_chunked(&packet_recycler, transactions.clone(), tx) - .into_iter() - .map(|x| { - let len = (*x).read().unwrap().packets.len(); - (x, iter::repeat(1).take(len).collect()) - }) - .collect(); - let verified_len = verified.len(); - verified_sender.send(verified).unwrap(); - BankingStage::process_packets( - bank.clone(), - &verified_receiver, - &signal_sender, - &packet_recycler, - ).unwrap(); - - check_txs(verified_len, &signal_receiver, tx); - }); - } - -} diff --git a/src/ledger.rs b/src/ledger.rs index 8f97588200..7963df3c67 100644 --- a/src/ledger.rs +++ b/src/ledger.rs @@ -196,32 +196,3 @@ mod tests { // assert_eq!(entries0, entries1); } } - -#[cfg(all(feature = "unstable", test))] -mod bench { - extern crate test; - use self::test::Bencher; - use hash::hash; - use ledger::*; - use packet::BlobRecycler; - use signature::{KeyPair, KeyPairUtil}; - use transaction::Transaction; - - #[bench] - fn bench_block_to_blobs_to_block(bencher: &mut Bencher) { - let zero = Hash::default(); - let one = hash(&zero); - let keypair = KeyPair::new(); - let tx0 = Transaction::new(&keypair, keypair.pubkey(), 1, one); - let transactions = vec![tx0; 10]; - let entries = next_entries(&zero, 1, transactions); - - let blob_recycler = BlobRecycler::default(); - bencher.iter(|| { - let mut blob_q = VecDeque::new(); - entries.to_blobs(&blob_recycler, &mut blob_q); - assert_eq!(reconstruct_entries_from_blobs(blob_q).unwrap(), entries); - }); - } - -} diff --git a/src/signature.rs b/src/signature.rs index 36983c6e77..b4b558276e 100644 --- a/src/signature.rs +++ b/src/signature.rs @@ -121,17 +121,3 @@ mod tests { assert_eq!(gen_n_pubkeys(seed, 50), gen_n_pubkeys(seed, 50)); } } - -#[cfg(all(feature = "unstable", test))] -mod bench { - extern crate test; - - use self::test::Bencher; - use super::*; - - #[bench] - fn bench_gen_keys(b: &mut Bencher) { - let rnd = GenKeys::new([0u8; 32]); - b.iter(|| rnd.gen_n_keypairs(1000)); - } -} diff --git a/src/streamer.rs b/src/streamer.rs index cea54aa40d..c7634fa1bb 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -764,111 +764,6 @@ pub fn retransmitter( .unwrap() } -#[cfg(all(feature = "unstable", test))] -mod bench { - extern crate test; - use self::test::Bencher; - use packet::{Packet, PacketRecycler, BLOB_SIZE, PACKET_DATA_SIZE}; - use result::Result; - use std::net::{SocketAddr, UdpSocket}; - use std::sync::atomic::{AtomicBool, Ordering}; - use std::sync::mpsc::channel; - use std::sync::{Arc, Mutex}; - use std::thread::sleep; - use std::thread::{spawn, JoinHandle}; - use std::time::Duration; - use std::time::SystemTime; - use streamer::{receiver, PacketReceiver}; - - fn producer( - addr: &SocketAddr, - recycler: PacketRecycler, - exit: Arc, - ) -> JoinHandle<()> { - let send = UdpSocket::bind("0.0.0.0:0").unwrap(); - let msgs = recycler.allocate(); - let msgs_ = msgs.clone(); - msgs.write().unwrap().packets.resize(10, Packet::default()); - for w in msgs.write().unwrap().packets.iter_mut() { - w.meta.size = PACKET_DATA_SIZE; - w.meta.set_addr(&addr); - } - spawn(move || loop { - if exit.load(Ordering::Relaxed) { - return; - } - let mut num = 0; - for p in msgs_.read().unwrap().packets.iter() { - let a = p.meta.addr(); - assert!(p.meta.size < BLOB_SIZE); - send.send_to(&p.data[..p.meta.size], &a).unwrap(); - num += 1; - } - assert_eq!(num, 10); - }) - } - - fn sink( - recycler: PacketRecycler, - exit: Arc, - rvs: Arc>, - r: PacketReceiver, - ) -> JoinHandle<()> { - spawn(move || loop { - if exit.load(Ordering::Relaxed) { - return; - } - let timer = Duration::new(1, 0); - match r.recv_timeout(timer) { - Ok(msgs) => { - *rvs.lock().unwrap() += msgs.read().unwrap().packets.len(); - recycler.recycle(msgs); - } - _ => (), - } - }) - } - - fn bench_streamer_with_result() -> Result<()> { - let read = UdpSocket::bind("127.0.0.1:0")?; - read.set_read_timeout(Some(Duration::new(1, 0)))?; - - let addr = read.local_addr()?; - let exit = Arc::new(AtomicBool::new(false)); - let pack_recycler = PacketRecycler::default(); - - let (s_reader, r_reader) = channel(); - let t_reader = receiver(read, exit.clone(), pack_recycler.clone(), s_reader); - let t_producer1 = producer(&addr, pack_recycler.clone(), exit.clone()); - let t_producer2 = producer(&addr, pack_recycler.clone(), exit.clone()); - let t_producer3 = producer(&addr, pack_recycler.clone(), exit.clone()); - - let rvs = Arc::new(Mutex::new(0)); - let t_sink = sink(pack_recycler.clone(), exit.clone(), rvs.clone(), r_reader); - - let start = SystemTime::now(); - let start_val = *rvs.lock().unwrap(); - sleep(Duration::new(5, 0)); - let elapsed = start.elapsed().unwrap(); - let end_val = *rvs.lock().unwrap(); - let time = elapsed.as_secs() * 10000000000 + elapsed.subsec_nanos() as u64; - let ftime = (time as f64) / 10000000000f64; - let fcount = (end_val - start_val) as f64; - trace!("performance: {:?}", fcount / ftime); - exit.store(true, Ordering::Relaxed); - t_reader.join()?; - t_producer1.join()?; - t_producer2.join()?; - t_producer3.join()?; - t_sink.join()?; - Ok(()) - } - #[bench] - pub fn bench_streamer(_bench: &mut Bencher) { - bench_streamer_with_result().unwrap(); - } -} - #[cfg(test)] mod test { use crdt::{Crdt, TestNode};