From 7ab3331f0104fb939b7148c6edc903fd94496841 Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Sat, 12 May 2018 00:31:32 -0600 Subject: [PATCH] Move validation processor to its own module --- src/lib.rs | 1 + src/thin_client.rs | 28 ++-- src/tpu.rs | 347 --------------------------------------- src/tvu.rs | 394 +++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 410 insertions(+), 360 deletions(-) create mode 100644 src/tvu.rs diff --git a/src/lib.rs b/src/lib.rs index 17d730500..6cde272d6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -25,6 +25,7 @@ pub mod thin_client_service; pub mod timing; pub mod tpu; pub mod transaction; +pub mod tvu; extern crate bincode; extern crate byteorder; extern crate chrono; diff --git a/src/thin_client.rs b/src/thin_client.rs index ce5135cad..82a693a58 100644 --- a/src/thin_client.rs +++ b/src/thin_client.rs @@ -168,7 +168,8 @@ mod tests { use std::thread::sleep; use std::time::Duration; use std::time::Instant; - use tpu::{self, Tpu}; + use tpu::Tpu; + use tvu::{self, Tvu}; #[test] fn test_thin_client() { @@ -223,7 +224,7 @@ mod tests { #[test] fn test_bad_sig() { - let (leader_data, leader_gossip, _, leader_serve, leader_events) = tpu::test_node(); + let (leader_data, leader_gossip, _, leader_serve, leader_events) = tvu::test_node(); let alice = Mint::new(10_000); let accountant = Accountant::new(&alice); let bob_pubkey = KeyPair::new().pubkey(); @@ -307,19 +308,20 @@ mod tests { let replicant_acc = { let accountant = Accountant::new(&alice); let accounting_stage = AccountingStage::new(accountant, &alice.last_id(), Some(30)); - Arc::new(Tpu::new(accounting_stage)) + Arc::new(Tvu::new(accounting_stage)) }; - let leader_threads = Tpu::serve( - &leader_acc, - leader.0.clone(), - leader.2, - leader.4, - leader.1, - exit.clone(), - sink(), - ).unwrap(); - let replicant_threads = Tpu::replicate( + let leader_threads = leader_acc + .serve( + leader.0.clone(), + leader.2, + leader.4, + leader.1, + exit.clone(), + sink(), + ) + .unwrap(); + let replicant_threads = Tvu::serve( &replicant_acc, replicant.0.clone(), replicant.1, diff --git a/src/tpu.rs b/src/tpu.rs index 4f2ae287a..0843139ab 100644 --- a/src/tpu.rs +++ b/src/tpu.rs @@ -4,7 +4,6 @@ use accounting_stage::AccountingStage; use crdt::{Crdt, ReplicatedData}; use entry_writer::EntryWriter; -use ledger; use packet; use result::Result; use sig_verify_stage::SigVerifyStage; @@ -14,7 +13,6 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::channel; use std::sync::{Arc, Mutex, RwLock}; use std::thread::{spawn, JoinHandle}; -use std::time::Duration; use streamer; use thin_client_service::{RequestProcessor, ThinClientService}; @@ -23,8 +21,6 @@ pub struct Tpu { request_processor: Arc, } -type SharedTpu = Arc; - impl Tpu { /// Create a new Tpu that wraps the given Accountant. pub fn new(accounting_stage: AccountingStage) -> Self { @@ -150,347 +146,4 @@ impl Tpu { threads.extend(sig_verify_stage.thread_hdls.into_iter()); Ok(threads) } - - /// Process verified blobs, already in order - /// Respond with a signed hash of the state - fn replicate_state( - obj: &Tpu, - verified_receiver: &streamer::BlobReceiver, - blob_recycler: &packet::BlobRecycler, - ) -> Result<()> { - let timer = Duration::new(1, 0); - let blobs = verified_receiver.recv_timeout(timer)?; - trace!("replicating blobs {}", blobs.len()); - let entries = ledger::reconstruct_entries_from_blobs(&blobs); - obj.accounting_stage - .accountant - .process_verified_entries(entries)?; - for blob in blobs { - blob_recycler.recycle(blob); - } - Ok(()) - } - - /// This service receives messages from a leader in the network and processes the transactions - /// on the accountant state. - /// # Arguments - /// * `obj` - The accountant state. - /// * `me` - my configuration - /// * `leader` - leader configuration - /// * `exit` - The exit signal. - /// # Remarks - /// The pipeline is constructed as follows: - /// 1. receive blobs from the network, these are out of order - /// 2. verify blobs, PoH, signatures (TODO) - /// 3. reconstruct contiguous window - /// a. order the blobs - /// b. use erasure coding to reconstruct missing blobs - /// c. ask the network for missing blobs, if erasure coding is insufficient - /// d. make sure that the blobs PoH sequences connect (TODO) - /// 4. process the transaction state machine - /// 5. respond with the hash of the state back to the leader - pub fn replicate( - obj: &SharedTpu, - me: ReplicatedData, - gossip: UdpSocket, - requests_socket: UdpSocket, - replicate: UdpSocket, - leader: ReplicatedData, - exit: Arc, - ) -> Result>> { - //replicate pipeline - let crdt = Arc::new(RwLock::new(Crdt::new(me))); - crdt.write() - .expect("'crdt' write lock in pub fn replicate") - .set_leader(leader.id); - crdt.write() - .expect("'crdt' write lock before insert() in pub fn replicate") - .insert(leader); - let t_gossip = Crdt::gossip(crdt.clone(), exit.clone()); - let t_listen = Crdt::listen(crdt.clone(), gossip, exit.clone()); - - // make sure we are on the same interface - let mut local = replicate.local_addr()?; - local.set_port(0); - let write = UdpSocket::bind(local)?; - - let blob_recycler = packet::BlobRecycler::default(); - let (blob_sender, blob_receiver) = channel(); - let t_blob_receiver = streamer::blob_receiver( - exit.clone(), - blob_recycler.clone(), - replicate, - blob_sender.clone(), - )?; - let (window_sender, window_receiver) = channel(); - let (retransmit_sender, retransmit_receiver) = channel(); - - let t_retransmit = streamer::retransmitter( - write, - exit.clone(), - crdt.clone(), - blob_recycler.clone(), - retransmit_receiver, - ); - - //TODO - //the packets coming out of blob_receiver need to be sent to the GPU and verified - //then sent to the window, which does the erasure coding reconstruction - let t_window = streamer::window( - exit.clone(), - crdt.clone(), - blob_recycler.clone(), - blob_receiver, - window_sender, - retransmit_sender, - ); - - let tpu = obj.clone(); - let s_exit = exit.clone(); - let t_replicator = spawn(move || loop { - let e = Self::replicate_state(&tpu, &window_receiver, &blob_recycler); - if e.is_err() && s_exit.load(Ordering::Relaxed) { - break; - } - }); - - //serve pipeline - // make sure we are on the same interface - let mut local = requests_socket.local_addr()?; - local.set_port(0); - let respond_socket = UdpSocket::bind(local.clone())?; - - let packet_recycler = packet::PacketRecycler::default(); - let blob_recycler = packet::BlobRecycler::default(); - let (packet_sender, packet_receiver) = channel(); - let t_packet_receiver = streamer::receiver( - requests_socket, - exit.clone(), - packet_recycler.clone(), - packet_sender, - )?; - - let sig_verify_stage = SigVerifyStage::new(exit.clone(), packet_receiver); - - let thin_client_service = ThinClientService::new( - obj.request_processor.clone(), - obj.accounting_stage.clone(), - exit.clone(), - sig_verify_stage.output, - packet_recycler.clone(), - blob_recycler.clone(), - ); - - let t_write = Self::drain_service( - obj.accounting_stage.clone(), - obj.request_processor.clone(), - exit.clone(), - ); - - let t_responder = streamer::responder( - respond_socket, - exit.clone(), - blob_recycler.clone(), - thin_client_service.output, - ); - - let mut threads = vec![ - //replicate threads - t_blob_receiver, - t_retransmit, - t_window, - t_replicator, - t_gossip, - t_listen, - //serve threads - t_packet_receiver, - t_responder, - thin_client_service.thread_hdl, - t_write, - ]; - threads.extend(sig_verify_stage.thread_hdls.into_iter()); - Ok(threads) - } -} - -#[cfg(test)] -pub fn test_node() -> (ReplicatedData, UdpSocket, UdpSocket, UdpSocket, UdpSocket) { - use signature::{KeyPair, KeyPairUtil}; - - let events_socket = UdpSocket::bind("127.0.0.1:0").unwrap(); - let gossip = UdpSocket::bind("127.0.0.1:0").unwrap(); - let replicate = UdpSocket::bind("127.0.0.1:0").unwrap(); - let requests_socket = UdpSocket::bind("127.0.0.1:0").unwrap(); - let pubkey = KeyPair::new().pubkey(); - let d = ReplicatedData::new( - pubkey, - gossip.local_addr().unwrap(), - replicate.local_addr().unwrap(), - requests_socket.local_addr().unwrap(), - ); - (d, gossip, replicate, requests_socket, events_socket) -} - -#[cfg(test)] -mod tests { - use accountant::Accountant; - use accounting_stage::AccountingStage; - use bincode::serialize; - use chrono::prelude::*; - use crdt::Crdt; - use entry; - use event::Event; - use hash::{hash, Hash}; - use logger; - use mint::Mint; - use packet::BlobRecycler; - use signature::{KeyPair, KeyPairUtil}; - use std::collections::VecDeque; - use std::sync::atomic::{AtomicBool, Ordering}; - use std::sync::mpsc::channel; - use std::sync::{Arc, RwLock}; - use std::time::Duration; - use streamer; - use tpu::{test_node, Tpu}; - use transaction::Transaction; - - /// Test that mesasge sent from leader to target1 and repliated to target2 - #[test] - #[ignore] - fn test_replicate() { - logger::setup(); - let (leader_data, leader_gossip, _, leader_serve, _) = test_node(); - let (target1_data, target1_gossip, target1_replicate, target1_serve, _) = test_node(); - let (target2_data, target2_gossip, target2_replicate, _, _) = test_node(); - let exit = Arc::new(AtomicBool::new(false)); - - //start crdt_leader - let mut crdt_l = Crdt::new(leader_data.clone()); - crdt_l.set_leader(leader_data.id); - - let cref_l = Arc::new(RwLock::new(crdt_l)); - let t_l_gossip = Crdt::gossip(cref_l.clone(), exit.clone()); - let t_l_listen = Crdt::listen(cref_l, leader_gossip, exit.clone()); - - //start crdt2 - let mut crdt2 = Crdt::new(target2_data.clone()); - crdt2.insert(leader_data.clone()); - crdt2.set_leader(leader_data.id); - let leader_id = leader_data.id; - let cref2 = Arc::new(RwLock::new(crdt2)); - let t2_gossip = Crdt::gossip(cref2.clone(), exit.clone()); - let t2_listen = Crdt::listen(cref2, target2_gossip, exit.clone()); - - // setup some blob services to send blobs into the socket - // to simulate the source peer and get blobs out of the socket to - // simulate target peer - let recv_recycler = BlobRecycler::default(); - let resp_recycler = BlobRecycler::default(); - let (s_reader, r_reader) = channel(); - let t_receiver = streamer::blob_receiver( - exit.clone(), - recv_recycler.clone(), - target2_replicate, - s_reader, - ).unwrap(); - - // simulate leader sending messages - let (s_responder, r_responder) = channel(); - let t_responder = streamer::responder( - leader_serve, - exit.clone(), - resp_recycler.clone(), - r_responder, - ); - - let starting_balance = 10_000; - let alice = Mint::new(starting_balance); - let accountant = Accountant::new(&alice); - let accounting_stage = AccountingStage::new(accountant, &alice.last_id(), Some(30)); - let tpu = Arc::new(Tpu::new(accounting_stage)); - let replicate_addr = target1_data.replicate_addr; - let threads = Tpu::replicate( - &tpu, - target1_data, - target1_gossip, - target1_serve, - target1_replicate, - leader_data, - exit.clone(), - ).unwrap(); - - let mut alice_ref_balance = starting_balance; - let mut msgs = VecDeque::new(); - let mut cur_hash = Hash::default(); - let num_blobs = 10; - let transfer_amount = 501; - let bob_keypair = KeyPair::new(); - for i in 0..num_blobs { - let b = resp_recycler.allocate(); - let b_ = b.clone(); - let mut w = b.write().unwrap(); - w.set_index(i).unwrap(); - w.set_id(leader_id).unwrap(); - - let accountant = &tpu.accounting_stage.accountant; - - let tr0 = Event::new_timestamp(&bob_keypair, Utc::now()); - let entry0 = entry::create_entry(&cur_hash, i, vec![tr0]); - accountant.register_entry_id(&cur_hash); - cur_hash = hash(&cur_hash); - - let tr1 = Transaction::new( - &alice.keypair(), - bob_keypair.pubkey(), - transfer_amount, - cur_hash, - ); - accountant.register_entry_id(&cur_hash); - cur_hash = hash(&cur_hash); - let entry1 = - entry::create_entry(&cur_hash, i + num_blobs, vec![Event::Transaction(tr1)]); - accountant.register_entry_id(&cur_hash); - cur_hash = hash(&cur_hash); - - alice_ref_balance -= transfer_amount; - - let serialized_entry = serialize(&vec![entry0, entry1]).unwrap(); - - w.data_mut()[..serialized_entry.len()].copy_from_slice(&serialized_entry); - w.set_size(serialized_entry.len()); - w.meta.set_addr(&replicate_addr); - drop(w); - msgs.push_back(b_); - } - - // send the blobs into the socket - s_responder.send(msgs).expect("send"); - - // receive retransmitted messages - let timer = Duration::new(1, 0); - let mut msgs: Vec<_> = Vec::new(); - while let Ok(msg) = r_reader.recv_timeout(timer) { - trace!("msg: {:?}", msg); - msgs.push(msg); - } - - let accountant = &tpu.accounting_stage.accountant; - let alice_balance = accountant.get_balance(&alice.keypair().pubkey()).unwrap(); - assert_eq!(alice_balance, alice_ref_balance); - - let bob_balance = accountant.get_balance(&bob_keypair.pubkey()).unwrap(); - assert_eq!(bob_balance, starting_balance - alice_ref_balance); - - exit.store(true, Ordering::Relaxed); - for t in threads { - t.join().expect("join"); - } - t2_gossip.join().expect("join"); - t2_listen.join().expect("join"); - t_receiver.join().expect("join"); - t_responder.join().expect("join"); - t_l_gossip.join().expect("join"); - t_l_listen.join().expect("join"); - } - } diff --git a/src/tvu.rs b/src/tvu.rs new file mode 100644 index 000000000..e947d52cf --- /dev/null +++ b/src/tvu.rs @@ -0,0 +1,394 @@ +//! The `tvu` module implements the Transaction Validation Unit, a +//! 5-stage transaction validation pipeline in software. + +use accounting_stage::AccountingStage; +use crdt::{Crdt, ReplicatedData}; +use entry_writer::EntryWriter; +use ledger; +use packet; +use result::Result; +use sig_verify_stage::SigVerifyStage; +use std::net::UdpSocket; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::mpsc::channel; +use std::sync::{Arc, RwLock}; +use std::thread::{spawn, JoinHandle}; +use std::time::Duration; +use streamer; +use thin_client_service::{RequestProcessor, ThinClientService}; + +pub struct Tvu { + accounting_stage: Arc, + request_processor: Arc, +} + +impl Tvu { + /// Create a new Tvu that wraps the given Accountant. + pub fn new(accounting_stage: AccountingStage) -> Self { + let request_processor = RequestProcessor::new(accounting_stage.accountant.clone()); + Tvu { + accounting_stage: Arc::new(accounting_stage), + request_processor: Arc::new(request_processor), + } + } + + pub fn drain_service( + accounting_stage: Arc, + request_processor: Arc, + exit: Arc, + ) -> JoinHandle<()> { + spawn(move || { + let entry_writer = EntryWriter::new(&accounting_stage, &request_processor); + loop { + let _ = entry_writer.drain_entries(); + if exit.load(Ordering::Relaxed) { + info!("drain_service exiting"); + break; + } + } + }) + } + + /// Process verified blobs, already in order + /// Respond with a signed hash of the state + fn replicate_state( + obj: &Tvu, + verified_receiver: &streamer::BlobReceiver, + blob_recycler: &packet::BlobRecycler, + ) -> Result<()> { + let timer = Duration::new(1, 0); + let blobs = verified_receiver.recv_timeout(timer)?; + trace!("replicating blobs {}", blobs.len()); + let entries = ledger::reconstruct_entries_from_blobs(&blobs); + obj.accounting_stage + .accountant + .process_verified_entries(entries)?; + for blob in blobs { + blob_recycler.recycle(blob); + } + Ok(()) + } + + /// This service receives messages from a leader in the network and processes the transactions + /// on the accountant state. + /// # Arguments + /// * `obj` - The accountant state. + /// * `me` - my configuration + /// * `leader` - leader configuration + /// * `exit` - The exit signal. + /// # Remarks + /// The pipeline is constructed as follows: + /// 1. receive blobs from the network, these are out of order + /// 2. verify blobs, PoH, signatures (TODO) + /// 3. reconstruct contiguous window + /// a. order the blobs + /// b. use erasure coding to reconstruct missing blobs + /// c. ask the network for missing blobs, if erasure coding is insufficient + /// d. make sure that the blobs PoH sequences connect (TODO) + /// 4. process the transaction state machine + /// 5. respond with the hash of the state back to the leader + pub fn serve( + obj: &Arc, + me: ReplicatedData, + gossip: UdpSocket, + requests_socket: UdpSocket, + replicate: UdpSocket, + leader: ReplicatedData, + exit: Arc, + ) -> Result>> { + //replicate pipeline + let crdt = Arc::new(RwLock::new(Crdt::new(me))); + crdt.write() + .expect("'crdt' write lock in pub fn replicate") + .set_leader(leader.id); + crdt.write() + .expect("'crdt' write lock before insert() in pub fn replicate") + .insert(leader); + let t_gossip = Crdt::gossip(crdt.clone(), exit.clone()); + let t_listen = Crdt::listen(crdt.clone(), gossip, exit.clone()); + + // make sure we are on the same interface + let mut local = replicate.local_addr()?; + local.set_port(0); + let write = UdpSocket::bind(local)?; + + let blob_recycler = packet::BlobRecycler::default(); + let (blob_sender, blob_receiver) = channel(); + let t_blob_receiver = streamer::blob_receiver( + exit.clone(), + blob_recycler.clone(), + replicate, + blob_sender.clone(), + )?; + let (window_sender, window_receiver) = channel(); + let (retransmit_sender, retransmit_receiver) = channel(); + + let t_retransmit = streamer::retransmitter( + write, + exit.clone(), + crdt.clone(), + blob_recycler.clone(), + retransmit_receiver, + ); + + //TODO + //the packets coming out of blob_receiver need to be sent to the GPU and verified + //then sent to the window, which does the erasure coding reconstruction + let t_window = streamer::window( + exit.clone(), + crdt.clone(), + blob_recycler.clone(), + blob_receiver, + window_sender, + retransmit_sender, + ); + + let tvu = obj.clone(); + let s_exit = exit.clone(); + let t_replicator = spawn(move || loop { + let e = Self::replicate_state(&tvu, &window_receiver, &blob_recycler); + if e.is_err() && s_exit.load(Ordering::Relaxed) { + break; + } + }); + + //serve pipeline + // make sure we are on the same interface + let mut local = requests_socket.local_addr()?; + local.set_port(0); + let respond_socket = UdpSocket::bind(local.clone())?; + + let packet_recycler = packet::PacketRecycler::default(); + let blob_recycler = packet::BlobRecycler::default(); + let (packet_sender, packet_receiver) = channel(); + let t_packet_receiver = streamer::receiver( + requests_socket, + exit.clone(), + packet_recycler.clone(), + packet_sender, + )?; + + let sig_verify_stage = SigVerifyStage::new(exit.clone(), packet_receiver); + + let thin_client_service = ThinClientService::new( + obj.request_processor.clone(), + obj.accounting_stage.clone(), + exit.clone(), + sig_verify_stage.output, + packet_recycler.clone(), + blob_recycler.clone(), + ); + + let t_write = Self::drain_service( + obj.accounting_stage.clone(), + obj.request_processor.clone(), + exit.clone(), + ); + + let t_responder = streamer::responder( + respond_socket, + exit.clone(), + blob_recycler.clone(), + thin_client_service.output, + ); + + let mut threads = vec![ + //replicate threads + t_blob_receiver, + t_retransmit, + t_window, + t_replicator, + t_gossip, + t_listen, + //serve threads + t_packet_receiver, + t_responder, + thin_client_service.thread_hdl, + t_write, + ]; + threads.extend(sig_verify_stage.thread_hdls.into_iter()); + Ok(threads) + } +} + +#[cfg(test)] +pub fn test_node() -> (ReplicatedData, UdpSocket, UdpSocket, UdpSocket, UdpSocket) { + use signature::{KeyPair, KeyPairUtil}; + + let events_socket = UdpSocket::bind("127.0.0.1:0").unwrap(); + let gossip = UdpSocket::bind("127.0.0.1:0").unwrap(); + let replicate = UdpSocket::bind("127.0.0.1:0").unwrap(); + let requests_socket = UdpSocket::bind("127.0.0.1:0").unwrap(); + let pubkey = KeyPair::new().pubkey(); + let d = ReplicatedData::new( + pubkey, + gossip.local_addr().unwrap(), + replicate.local_addr().unwrap(), + requests_socket.local_addr().unwrap(), + ); + (d, gossip, replicate, requests_socket, events_socket) +} + +#[cfg(test)] +mod tests { + use accountant::Accountant; + use accounting_stage::AccountingStage; + use bincode::serialize; + use chrono::prelude::*; + use crdt::Crdt; + use entry; + use event::Event; + use hash::{hash, Hash}; + use logger; + use mint::Mint; + use packet::BlobRecycler; + use signature::{KeyPair, KeyPairUtil}; + use std::collections::VecDeque; + use std::sync::atomic::{AtomicBool, Ordering}; + use std::sync::mpsc::channel; + use std::sync::{Arc, RwLock}; + use std::time::Duration; + use streamer; + use transaction::Transaction; + use tvu::{test_node, Tvu}; + + /// Test that mesasge sent from leader to target1 and repliated to target2 + #[test] + #[ignore] + fn test_replicate() { + logger::setup(); + let (leader_data, leader_gossip, _, leader_serve, _) = test_node(); + let (target1_data, target1_gossip, target1_replicate, target1_serve, _) = test_node(); + let (target2_data, target2_gossip, target2_replicate, _, _) = test_node(); + let exit = Arc::new(AtomicBool::new(false)); + + //start crdt_leader + let mut crdt_l = Crdt::new(leader_data.clone()); + crdt_l.set_leader(leader_data.id); + + let cref_l = Arc::new(RwLock::new(crdt_l)); + let t_l_gossip = Crdt::gossip(cref_l.clone(), exit.clone()); + let t_l_listen = Crdt::listen(cref_l, leader_gossip, exit.clone()); + + //start crdt2 + let mut crdt2 = Crdt::new(target2_data.clone()); + crdt2.insert(leader_data.clone()); + crdt2.set_leader(leader_data.id); + let leader_id = leader_data.id; + let cref2 = Arc::new(RwLock::new(crdt2)); + let t2_gossip = Crdt::gossip(cref2.clone(), exit.clone()); + let t2_listen = Crdt::listen(cref2, target2_gossip, exit.clone()); + + // setup some blob services to send blobs into the socket + // to simulate the source peer and get blobs out of the socket to + // simulate target peer + let recv_recycler = BlobRecycler::default(); + let resp_recycler = BlobRecycler::default(); + let (s_reader, r_reader) = channel(); + let t_receiver = streamer::blob_receiver( + exit.clone(), + recv_recycler.clone(), + target2_replicate, + s_reader, + ).unwrap(); + + // simulate leader sending messages + let (s_responder, r_responder) = channel(); + let t_responder = streamer::responder( + leader_serve, + exit.clone(), + resp_recycler.clone(), + r_responder, + ); + + let starting_balance = 10_000; + let alice = Mint::new(starting_balance); + let accountant = Accountant::new(&alice); + let accounting_stage = AccountingStage::new(accountant, &alice.last_id(), Some(30)); + let tvu = Arc::new(Tvu::new(accounting_stage)); + let replicate_addr = target1_data.replicate_addr; + let threads = Tvu::serve( + &tvu, + target1_data, + target1_gossip, + target1_serve, + target1_replicate, + leader_data, + exit.clone(), + ).unwrap(); + + let mut alice_ref_balance = starting_balance; + let mut msgs = VecDeque::new(); + let mut cur_hash = Hash::default(); + let num_blobs = 10; + let transfer_amount = 501; + let bob_keypair = KeyPair::new(); + for i in 0..num_blobs { + let b = resp_recycler.allocate(); + let b_ = b.clone(); + let mut w = b.write().unwrap(); + w.set_index(i).unwrap(); + w.set_id(leader_id).unwrap(); + + let accountant = &tvu.accounting_stage.accountant; + + let tr0 = Event::new_timestamp(&bob_keypair, Utc::now()); + let entry0 = entry::create_entry(&cur_hash, i, vec![tr0]); + accountant.register_entry_id(&cur_hash); + cur_hash = hash(&cur_hash); + + let tr1 = Transaction::new( + &alice.keypair(), + bob_keypair.pubkey(), + transfer_amount, + cur_hash, + ); + accountant.register_entry_id(&cur_hash); + cur_hash = hash(&cur_hash); + let entry1 = + entry::create_entry(&cur_hash, i + num_blobs, vec![Event::Transaction(tr1)]); + accountant.register_entry_id(&cur_hash); + cur_hash = hash(&cur_hash); + + alice_ref_balance -= transfer_amount; + + let serialized_entry = serialize(&vec![entry0, entry1]).unwrap(); + + w.data_mut()[..serialized_entry.len()].copy_from_slice(&serialized_entry); + w.set_size(serialized_entry.len()); + w.meta.set_addr(&replicate_addr); + drop(w); + msgs.push_back(b_); + } + + // send the blobs into the socket + s_responder.send(msgs).expect("send"); + + // receive retransmitted messages + let timer = Duration::new(1, 0); + let mut msgs: Vec<_> = Vec::new(); + while let Ok(msg) = r_reader.recv_timeout(timer) { + trace!("msg: {:?}", msg); + msgs.push(msg); + } + + let accountant = &tvu.accounting_stage.accountant; + let alice_balance = accountant.get_balance(&alice.keypair().pubkey()).unwrap(); + assert_eq!(alice_balance, alice_ref_balance); + + let bob_balance = accountant.get_balance(&bob_keypair.pubkey()).unwrap(); + assert_eq!(bob_balance, starting_balance - alice_ref_balance); + + exit.store(true, Ordering::Relaxed); + for t in threads { + t.join().expect("join"); + } + t2_gossip.join().expect("join"); + t2_listen.join().expect("join"); + t_receiver.join().expect("join"); + t_responder.join().expect("join"); + t_l_gossip.join().expect("join"); + t_l_listen.join().expect("join"); + } + +}