Move all benchmarks to benches/

This commit is contained in:
Greg Fitzgerald 2018-07-10 20:33:16 -06:00 committed by Greg Fitzgerald
parent bed5438831
commit 3144a70b18
9 changed files with 394 additions and 385 deletions

View File

@ -88,3 +88,15 @@ criterion = "0.2"
[[bench]]
name = "bank"
harness = false
[[bench]]
name = "banking_stage"
[[bench]]
name = "ledger"
[[bench]]
name = "signature"
[[bench]]
name = "streamer"

236
benches/banking_stage.rs Normal file
View File

@ -0,0 +1,236 @@
#![feature(test)]
extern crate bincode;
extern crate rayon;
extern crate solana;
extern crate test;
#[macro_use]
extern crate log;
use rayon::prelude::*;
use solana::bank::Bank;
use solana::banking_stage::BankingStage;
use solana::logger;
use solana::mint::Mint;
use solana::packet::{to_packets_chunked, PacketRecycler};
use solana::record_stage::Signal;
use solana::signature::{KeyPair, KeyPairUtil};
use solana::transaction::Transaction;
use std::iter;
use std::sync::mpsc::{channel, Receiver};
use std::sync::Arc;
use test::Bencher;
// extern crate test;
// use self::test::Bencher;
// use bank::{Bank, MAX_ENTRY_IDS};
// use bincode::serialize;
// use hash::hash;
// use mint::Mint;
// use rayon::prelude::*;
// use signature::{KeyPair, KeyPairUtil};
// use std::collections::HashSet;
// use std::time::Instant;
// use transaction::Transaction;
//
// #[bench]
// fn bench_process_transactions(_bencher: &mut Bencher) {
// let mint = Mint::new(100_000_000);
// let bank = Bank::new(&mint);
// // Create transactions between unrelated parties.
// let txs = 100_000;
// let last_ids: Mutex<HashSet<Hash>> = Mutex::new(HashSet::new());
// let transactions: Vec<_> = (0..txs)
// .into_par_iter()
// .map(|i| {
// // Seed the 'to' account and a cell for its signature.
// let dummy_id = i % (MAX_ENTRY_IDS as i32);
// let last_id = hash(&serialize(&dummy_id).unwrap()); // Semi-unique hash
// {
// let mut last_ids = last_ids.lock().unwrap();
// if !last_ids.contains(&last_id) {
// last_ids.insert(last_id);
// bank.register_entry_id(&last_id);
// }
// }
//
// // Seed the 'from' account.
// let rando0 = KeyPair::new();
// let tx = Transaction::new(&mint.keypair(), rando0.pubkey(), 1_000, last_id);
// bank.process_transaction(&tx).unwrap();
//
// let rando1 = KeyPair::new();
// let tx = Transaction::new(&rando0, rando1.pubkey(), 2, last_id);
// bank.process_transaction(&tx).unwrap();
//
// // Finally, return a transaction that's unique
// Transaction::new(&rando0, rando1.pubkey(), 1, last_id)
// })
// .collect();
//
// let banking_stage = EventProcessor::new(bank, &mint.last_id(), None);
//
// let now = Instant::now();
// assert!(banking_stage.process_transactions(transactions).is_ok());
// let duration = now.elapsed();
// let sec = duration.as_secs() as f64 + duration.subsec_nanos() as f64 / 1_000_000_000.0;
// let tps = txs as f64 / sec;
//
// // Ensure that all transactions were successfully logged.
// drop(banking_stage.historian_input);
// let entries: Vec<Entry> = banking_stage.output.lock().unwrap().iter().collect();
// assert_eq!(entries.len(), 1);
// assert_eq!(entries[0].transactions.len(), txs as usize);
//
// println!("{} tps", tps);
// }
fn check_txs(batches: usize, receiver: &Receiver<Signal>, ref_tx_count: usize) {
let mut total = 0;
for _ in 0..batches {
let signal = receiver.recv().unwrap();
if let Signal::Transactions(transactions) = signal {
total += transactions.len();
} else {
assert!(false);
}
}
assert_eq!(total, ref_tx_count);
}
#[bench]
fn bench_banking_stage_multi_accounts(bencher: &mut Bencher) {
logger::setup();
let tx = 10_000_usize;
let mint_total = 1_000_000_000_000;
let mint = Mint::new(mint_total);
let num_dst_accounts = 8 * 1024;
let num_src_accounts = 8 * 1024;
let srckeys: Vec<_> = (0..num_src_accounts).map(|_| KeyPair::new()).collect();
let dstkeys: Vec<_> = (0..num_dst_accounts)
.map(|_| KeyPair::new().pubkey())
.collect();
info!("created keys src: {} dst: {}", srckeys.len(), dstkeys.len());
let transactions: Vec<_> = (0..tx)
.map(|i| {
Transaction::new(
&srckeys[i % num_src_accounts],
dstkeys[i % num_dst_accounts],
i as i64,
mint.last_id(),
)
})
.collect();
info!("created transactions");
let (verified_sender, verified_receiver) = channel();
let (signal_sender, signal_receiver) = channel();
let packet_recycler = PacketRecycler::default();
let setup_transactions: Vec<_> = (0..num_src_accounts)
.map(|i| {
Transaction::new(
&mint.keypair(),
srckeys[i].pubkey(),
mint_total / num_src_accounts as i64,
mint.last_id(),
)
})
.collect();
bencher.iter(move || {
let bank = Arc::new(Bank::new(&mint));
let verified_setup: Vec<_> =
to_packets_chunked(&packet_recycler, setup_transactions.clone(), tx)
.into_iter()
.map(|x| {
let len = (*x).read().unwrap().packets.len();
(x, iter::repeat(1).take(len).collect())
})
.collect();
let verified_setup_len = verified_setup.len();
verified_sender.send(verified_setup).unwrap();
BankingStage::process_packets(
bank.clone(),
&verified_receiver,
&signal_sender,
&packet_recycler,
).unwrap();
check_txs(verified_setup_len, &signal_receiver, num_src_accounts);
let verified: Vec<_> = to_packets_chunked(&packet_recycler, transactions.clone(), 192)
.into_iter()
.map(|x| {
let len = (*x).read().unwrap().packets.len();
(x, iter::repeat(1).take(len).collect())
})
.collect();
let verified_len = verified.len();
verified_sender.send(verified).unwrap();
BankingStage::process_packets(
bank.clone(),
&verified_receiver,
&signal_sender,
&packet_recycler,
).unwrap();
check_txs(verified_len, &signal_receiver, tx);
});
}
#[bench]
fn bench_banking_stage_single_from(bencher: &mut Bencher) {
logger::setup();
let tx = 10_000_usize;
let mint = Mint::new(1_000_000_000_000);
let mut pubkeys = Vec::new();
let num_keys = 8;
for _ in 0..num_keys {
pubkeys.push(KeyPair::new().pubkey());
}
let transactions: Vec<_> = (0..tx)
.into_par_iter()
.map(|i| {
Transaction::new(
&mint.keypair(),
pubkeys[i % num_keys],
i as i64,
mint.last_id(),
)
})
.collect();
let (verified_sender, verified_receiver) = channel();
let (signal_sender, signal_receiver) = channel();
let packet_recycler = PacketRecycler::default();
bencher.iter(move || {
let bank = Arc::new(Bank::new(&mint));
let verified: Vec<_> = to_packets_chunked(&packet_recycler, transactions.clone(), tx)
.into_iter()
.map(|x| {
let len = (*x).read().unwrap().packets.len();
(x, iter::repeat(1).take(len).collect())
})
.collect();
let verified_len = verified.len();
verified_sender.send(verified).unwrap();
BankingStage::process_packets(
bank.clone(),
&verified_receiver,
&signal_sender,
&packet_recycler,
).unwrap();
check_txs(verified_len, &signal_receiver, tx);
});
}

29
benches/ledger.rs Normal file
View File

@ -0,0 +1,29 @@
#![feature(test)]
extern crate solana;
extern crate test;
use solana::hash::{hash, Hash};
use solana::ledger::{next_entries, reconstruct_entries_from_blobs, Block};
use solana::packet::BlobRecycler;
use solana::signature::{KeyPair, KeyPairUtil};
use solana::transaction::Transaction;
use std::collections::VecDeque;
use test::Bencher;
#[bench]
fn bench_block_to_blobs_to_block(bencher: &mut Bencher) {
let zero = Hash::default();
let one = hash(&zero);
let keypair = KeyPair::new();
let tx0 = Transaction::new(&keypair, keypair.pubkey(), 1, one);
let transactions = vec![tx0; 10];
let entries = next_entries(&zero, 1, transactions);
let blob_recycler = BlobRecycler::default();
bencher.iter(|| {
let mut blob_q = VecDeque::new();
entries.to_blobs(&blob_recycler, &mut blob_q);
assert_eq!(reconstruct_entries_from_blobs(blob_q).unwrap(), entries);
});
}

12
benches/signature.rs Normal file
View File

@ -0,0 +1,12 @@
#![feature(test)]
extern crate solana;
extern crate test;
use solana::signature::GenKeys;
use test::Bencher;
#[bench]
fn bench_gen_keys(b: &mut Bencher) {
let rnd = GenKeys::new([0u8; 32]);
b.iter(|| rnd.gen_n_keypairs(1000));
}

104
benches/streamer.rs Normal file
View File

@ -0,0 +1,104 @@
#![feature(test)]
#[macro_use]
extern crate log;
extern crate solana;
extern crate test;
use solana::packet::{Packet, PacketRecycler, BLOB_SIZE, PACKET_DATA_SIZE};
use solana::result::Result;
use solana::streamer::{receiver, PacketReceiver};
use std::net::{SocketAddr, UdpSocket};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::channel;
use std::sync::{Arc, Mutex};
use std::thread::sleep;
use std::thread::{spawn, JoinHandle};
use std::time::Duration;
use std::time::SystemTime;
use test::Bencher;
fn producer(addr: &SocketAddr, recycler: PacketRecycler, exit: Arc<AtomicBool>) -> JoinHandle<()> {
let send = UdpSocket::bind("0.0.0.0:0").unwrap();
let msgs = recycler.allocate();
let msgs_ = msgs.clone();
msgs.write().unwrap().packets.resize(10, Packet::default());
for w in msgs.write().unwrap().packets.iter_mut() {
w.meta.size = PACKET_DATA_SIZE;
w.meta.set_addr(&addr);
}
spawn(move || loop {
if exit.load(Ordering::Relaxed) {
return;
}
let mut num = 0;
for p in msgs_.read().unwrap().packets.iter() {
let a = p.meta.addr();
assert!(p.meta.size < BLOB_SIZE);
send.send_to(&p.data[..p.meta.size], &a).unwrap();
num += 1;
}
assert_eq!(num, 10);
})
}
fn sink(
recycler: PacketRecycler,
exit: Arc<AtomicBool>,
rvs: Arc<Mutex<usize>>,
r: PacketReceiver,
) -> JoinHandle<()> {
spawn(move || loop {
if exit.load(Ordering::Relaxed) {
return;
}
let timer = Duration::new(1, 0);
match r.recv_timeout(timer) {
Ok(msgs) => {
*rvs.lock().unwrap() += msgs.read().unwrap().packets.len();
recycler.recycle(msgs);
}
_ => (),
}
})
}
fn bench_streamer_with_result() -> Result<()> {
let read = UdpSocket::bind("127.0.0.1:0")?;
read.set_read_timeout(Some(Duration::new(1, 0)))?;
let addr = read.local_addr()?;
let exit = Arc::new(AtomicBool::new(false));
let pack_recycler = PacketRecycler::default();
let (s_reader, r_reader) = channel();
let t_reader = receiver(read, exit.clone(), pack_recycler.clone(), s_reader);
let t_producer1 = producer(&addr, pack_recycler.clone(), exit.clone());
let t_producer2 = producer(&addr, pack_recycler.clone(), exit.clone());
let t_producer3 = producer(&addr, pack_recycler.clone(), exit.clone());
let rvs = Arc::new(Mutex::new(0));
let t_sink = sink(pack_recycler.clone(), exit.clone(), rvs.clone(), r_reader);
let start = SystemTime::now();
let start_val = *rvs.lock().unwrap();
sleep(Duration::new(5, 0));
let elapsed = start.elapsed().unwrap();
let end_val = *rvs.lock().unwrap();
let time = elapsed.as_secs() * 10000000000 + elapsed.subsec_nanos() as u64;
let ftime = (time as f64) / 10000000000f64;
let fcount = (end_val - start_val) as f64;
trace!("performance: {:?}", fcount / ftime);
exit.store(true, Ordering::Relaxed);
t_reader.join()?;
t_producer1.join()?;
t_producer2.join()?;
t_producer3.join()?;
t_sink.join()?;
Ok(())
}
#[bench]
fn bench_streamer(_bench: &mut Bencher) {
bench_streamer_with_result().unwrap();
}

View File

@ -71,7 +71,7 @@ impl BankingStage {
/// Process the incoming packets and send output `Signal` messages to `signal_sender`.
/// Discard packets via `packet_recycler`.
fn process_packets(
pub fn process_packets(
bank: Arc<Bank>,
verified_receiver: &Receiver<Vec<(SharedPackets, Vec<u8>)>>,
signal_sender: &Sender<Signal>,
@ -200,239 +200,3 @@ impl Service for BankingStage {
// assert_eq!(bank.get_balance(&alice.pubkey()), Some(1));
// }
//}
//
//#[cfg(all(feature = "unstable", test))]
//mod bench {
// extern crate test;
// use self::test::Bencher;
// use bank::{Bank, MAX_ENTRY_IDS};
// use bincode::serialize;
// use hash::hash;
// use mint::Mint;
// use rayon::prelude::*;
// use signature::{KeyPair, KeyPairUtil};
// use std::collections::HashSet;
// use std::time::Instant;
// use transaction::Transaction;
//
// #[bench]
// fn bench_process_transactions(_bencher: &mut Bencher) {
// let mint = Mint::new(100_000_000);
// let bank = Bank::new(&mint);
// // Create transactions between unrelated parties.
// let txs = 100_000;
// let last_ids: Mutex<HashSet<Hash>> = Mutex::new(HashSet::new());
// let transactions: Vec<_> = (0..txs)
// .into_par_iter()
// .map(|i| {
// // Seed the 'to' account and a cell for its signature.
// let dummy_id = i % (MAX_ENTRY_IDS as i32);
// let last_id = hash(&serialize(&dummy_id).unwrap()); // Semi-unique hash
// {
// let mut last_ids = last_ids.lock().unwrap();
// if !last_ids.contains(&last_id) {
// last_ids.insert(last_id);
// bank.register_entry_id(&last_id);
// }
// }
//
// // Seed the 'from' account.
// let rando0 = KeyPair::new();
// let tx = Transaction::new(&mint.keypair(), rando0.pubkey(), 1_000, last_id);
// bank.process_transaction(&tx).unwrap();
//
// let rando1 = KeyPair::new();
// let tx = Transaction::new(&rando0, rando1.pubkey(), 2, last_id);
// bank.process_transaction(&tx).unwrap();
//
// // Finally, return a transaction that's unique
// Transaction::new(&rando0, rando1.pubkey(), 1, last_id)
// })
// .collect();
//
// let banking_stage = EventProcessor::new(bank, &mint.last_id(), None);
//
// let now = Instant::now();
// assert!(banking_stage.process_transactions(transactions).is_ok());
// let duration = now.elapsed();
// let sec = duration.as_secs() as f64 + duration.subsec_nanos() as f64 / 1_000_000_000.0;
// let tps = txs as f64 / sec;
//
// // Ensure that all transactions were successfully logged.
// drop(banking_stage.historian_input);
// let entries: Vec<Entry> = banking_stage.output.lock().unwrap().iter().collect();
// assert_eq!(entries.len(), 1);
// assert_eq!(entries[0].transactions.len(), txs as usize);
//
// println!("{} tps", tps);
// }
//}
#[cfg(all(feature = "unstable", test))]
mod bench {
extern crate test;
use self::test::Bencher;
use bank::*;
use banking_stage::BankingStage;
use logger;
use mint::Mint;
use packet::{to_packets_chunked, PacketRecycler};
use rayon::prelude::*;
use record_stage::Signal;
use signature::{KeyPair, KeyPairUtil};
use std::iter;
use std::sync::mpsc::{channel, Receiver};
use std::sync::Arc;
use transaction::Transaction;
fn check_txs(batches: usize, receiver: &Receiver<Signal>, ref_tx_count: usize) {
let mut total = 0;
for _ in 0..batches {
let signal = receiver.recv().unwrap();
if let Signal::Transactions(transactions) = signal {
total += transactions.len();
} else {
assert!(false);
}
}
assert_eq!(total, ref_tx_count);
}
#[bench]
fn bench_banking_stage_multi_accounts(bencher: &mut Bencher) {
logger::setup();
let tx = 10_000_usize;
let mint_total = 1_000_000_000_000;
let mint = Mint::new(mint_total);
let num_dst_accounts = 8 * 1024;
let num_src_accounts = 8 * 1024;
let srckeys: Vec<_> = (0..num_src_accounts).map(|_| KeyPair::new()).collect();
let dstkeys: Vec<_> = (0..num_dst_accounts)
.map(|_| KeyPair::new().pubkey())
.collect();
info!("created keys src: {} dst: {}", srckeys.len(), dstkeys.len());
let transactions: Vec<_> = (0..tx)
.map(|i| {
Transaction::new(
&srckeys[i % num_src_accounts],
dstkeys[i % num_dst_accounts],
i as i64,
mint.last_id(),
)
})
.collect();
info!("created transactions");
let (verified_sender, verified_receiver) = channel();
let (signal_sender, signal_receiver) = channel();
let packet_recycler = PacketRecycler::default();
let setup_transactions: Vec<_> = (0..num_src_accounts)
.map(|i| {
Transaction::new(
&mint.keypair(),
srckeys[i].pubkey(),
mint_total / num_src_accounts as i64,
mint.last_id(),
)
})
.collect();
bencher.iter(move || {
let bank = Arc::new(Bank::new(&mint));
let verified_setup: Vec<_> =
to_packets_chunked(&packet_recycler, setup_transactions.clone(), tx)
.into_iter()
.map(|x| {
let len = (*x).read().unwrap().packets.len();
(x, iter::repeat(1).take(len).collect())
})
.collect();
let verified_setup_len = verified_setup.len();
verified_sender.send(verified_setup).unwrap();
BankingStage::process_packets(
bank.clone(),
&verified_receiver,
&signal_sender,
&packet_recycler,
).unwrap();
check_txs(verified_setup_len, &signal_receiver, num_src_accounts);
let verified: Vec<_> = to_packets_chunked(&packet_recycler, transactions.clone(), 192)
.into_iter()
.map(|x| {
let len = (*x).read().unwrap().packets.len();
(x, iter::repeat(1).take(len).collect())
})
.collect();
let verified_len = verified.len();
verified_sender.send(verified).unwrap();
BankingStage::process_packets(
bank.clone(),
&verified_receiver,
&signal_sender,
&packet_recycler,
).unwrap();
check_txs(verified_len, &signal_receiver, tx);
});
}
#[bench]
fn bench_banking_stage_single_from(bencher: &mut Bencher) {
logger::setup();
let tx = 10_000_usize;
let mint = Mint::new(1_000_000_000_000);
let mut pubkeys = Vec::new();
let num_keys = 8;
for _ in 0..num_keys {
pubkeys.push(KeyPair::new().pubkey());
}
let transactions: Vec<_> = (0..tx)
.into_par_iter()
.map(|i| {
Transaction::new(
&mint.keypair(),
pubkeys[i % num_keys],
i as i64,
mint.last_id(),
)
})
.collect();
let (verified_sender, verified_receiver) = channel();
let (signal_sender, signal_receiver) = channel();
let packet_recycler = PacketRecycler::default();
bencher.iter(move || {
let bank = Arc::new(Bank::new(&mint));
let verified: Vec<_> = to_packets_chunked(&packet_recycler, transactions.clone(), tx)
.into_iter()
.map(|x| {
let len = (*x).read().unwrap().packets.len();
(x, iter::repeat(1).take(len).collect())
})
.collect();
let verified_len = verified.len();
verified_sender.send(verified).unwrap();
BankingStage::process_packets(
bank.clone(),
&verified_receiver,
&signal_sender,
&packet_recycler,
).unwrap();
check_txs(verified_len, &signal_receiver, tx);
});
}
}

View File

@ -196,32 +196,3 @@ mod tests {
// assert_eq!(entries0, entries1);
}
}
#[cfg(all(feature = "unstable", test))]
mod bench {
extern crate test;
use self::test::Bencher;
use hash::hash;
use ledger::*;
use packet::BlobRecycler;
use signature::{KeyPair, KeyPairUtil};
use transaction::Transaction;
#[bench]
fn bench_block_to_blobs_to_block(bencher: &mut Bencher) {
let zero = Hash::default();
let one = hash(&zero);
let keypair = KeyPair::new();
let tx0 = Transaction::new(&keypair, keypair.pubkey(), 1, one);
let transactions = vec![tx0; 10];
let entries = next_entries(&zero, 1, transactions);
let blob_recycler = BlobRecycler::default();
bencher.iter(|| {
let mut blob_q = VecDeque::new();
entries.to_blobs(&blob_recycler, &mut blob_q);
assert_eq!(reconstruct_entries_from_blobs(blob_q).unwrap(), entries);
});
}
}

View File

@ -121,17 +121,3 @@ mod tests {
assert_eq!(gen_n_pubkeys(seed, 50), gen_n_pubkeys(seed, 50));
}
}
#[cfg(all(feature = "unstable", test))]
mod bench {
extern crate test;
use self::test::Bencher;
use super::*;
#[bench]
fn bench_gen_keys(b: &mut Bencher) {
let rnd = GenKeys::new([0u8; 32]);
b.iter(|| rnd.gen_n_keypairs(1000));
}
}

View File

@ -764,111 +764,6 @@ pub fn retransmitter(
.unwrap()
}
#[cfg(all(feature = "unstable", test))]
mod bench {
extern crate test;
use self::test::Bencher;
use packet::{Packet, PacketRecycler, BLOB_SIZE, PACKET_DATA_SIZE};
use result::Result;
use std::net::{SocketAddr, UdpSocket};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::channel;
use std::sync::{Arc, Mutex};
use std::thread::sleep;
use std::thread::{spawn, JoinHandle};
use std::time::Duration;
use std::time::SystemTime;
use streamer::{receiver, PacketReceiver};
fn producer(
addr: &SocketAddr,
recycler: PacketRecycler,
exit: Arc<AtomicBool>,
) -> JoinHandle<()> {
let send = UdpSocket::bind("0.0.0.0:0").unwrap();
let msgs = recycler.allocate();
let msgs_ = msgs.clone();
msgs.write().unwrap().packets.resize(10, Packet::default());
for w in msgs.write().unwrap().packets.iter_mut() {
w.meta.size = PACKET_DATA_SIZE;
w.meta.set_addr(&addr);
}
spawn(move || loop {
if exit.load(Ordering::Relaxed) {
return;
}
let mut num = 0;
for p in msgs_.read().unwrap().packets.iter() {
let a = p.meta.addr();
assert!(p.meta.size < BLOB_SIZE);
send.send_to(&p.data[..p.meta.size], &a).unwrap();
num += 1;
}
assert_eq!(num, 10);
})
}
fn sink(
recycler: PacketRecycler,
exit: Arc<AtomicBool>,
rvs: Arc<Mutex<usize>>,
r: PacketReceiver,
) -> JoinHandle<()> {
spawn(move || loop {
if exit.load(Ordering::Relaxed) {
return;
}
let timer = Duration::new(1, 0);
match r.recv_timeout(timer) {
Ok(msgs) => {
*rvs.lock().unwrap() += msgs.read().unwrap().packets.len();
recycler.recycle(msgs);
}
_ => (),
}
})
}
fn bench_streamer_with_result() -> Result<()> {
let read = UdpSocket::bind("127.0.0.1:0")?;
read.set_read_timeout(Some(Duration::new(1, 0)))?;
let addr = read.local_addr()?;
let exit = Arc::new(AtomicBool::new(false));
let pack_recycler = PacketRecycler::default();
let (s_reader, r_reader) = channel();
let t_reader = receiver(read, exit.clone(), pack_recycler.clone(), s_reader);
let t_producer1 = producer(&addr, pack_recycler.clone(), exit.clone());
let t_producer2 = producer(&addr, pack_recycler.clone(), exit.clone());
let t_producer3 = producer(&addr, pack_recycler.clone(), exit.clone());
let rvs = Arc::new(Mutex::new(0));
let t_sink = sink(pack_recycler.clone(), exit.clone(), rvs.clone(), r_reader);
let start = SystemTime::now();
let start_val = *rvs.lock().unwrap();
sleep(Duration::new(5, 0));
let elapsed = start.elapsed().unwrap();
let end_val = *rvs.lock().unwrap();
let time = elapsed.as_secs() * 10000000000 + elapsed.subsec_nanos() as u64;
let ftime = (time as f64) / 10000000000f64;
let fcount = (end_val - start_val) as f64;
trace!("performance: {:?}", fcount / ftime);
exit.store(true, Ordering::Relaxed);
t_reader.join()?;
t_producer1.join()?;
t_producer2.join()?;
t_producer3.join()?;
t_sink.join()?;
Ok(())
}
#[bench]
pub fn bench_streamer(_bench: &mut Bencher) {
bench_streamer_with_result().unwrap();
}
}
#[cfg(test)]
mod test {
use crdt::{Crdt, TestNode};