diff --git a/src/accountant_skel.rs b/src/accountant_skel.rs index d3e8ab14b..4901b979b 100644 --- a/src/accountant_skel.rs +++ b/src/accountant_skel.rs @@ -416,7 +416,7 @@ mod tests { use accountant_skel::{to_packets, Request}; use bincode::serialize; use ecdsa; - use packet::{PacketRecycler, NUM_PACKETS}; + use packet::{BlobRecycler, PacketRecycler, NUM_PACKETS}; use transaction::{memfind, test_tx}; use accountant::Accountant; @@ -438,6 +438,10 @@ mod tests { use transaction::Transaction; use subscribers::{Node, Subscribers}; + use streamer; + use std::sync::mpsc::channel; + use std::collections::VecDeque; + use packet::{PACKET_DATA_SIZE}; #[test] fn test_layout() { @@ -545,14 +549,25 @@ mod tests { #[test] fn test_replicate() { - let serve_port = 9004; - let send_port = 9005; - let addr = format!("127.0.0.1:{}", serve_port); - let send_addr = format!("127.0.0.1:{}", send_port); + let read = UdpSocket::bind("127.0.0.1:0").expect("bind"); + let addr = read.local_addr().unwrap(); + let send = UdpSocket::bind("127.0.0.1:0").expect("bind"); + let exit = Arc::new(AtomicBool::new(false)); + + let node_me = Node::default(); + let node_leader = Node::new([0; 8], 0, send.local_addr().unwrap()); + let subs = Subscribers::new(node_me, node_leader, &[]); + + let recv_recycler = PacketRecycler::default(); + let resp_recycler = BlobRecycler::default(); + let (s_reader, r_reader) = channel(); + let t_receiver = streamer::receiver(read, exit.clone(), recv_recycler.clone(), s_reader).unwrap(); + let (s_responder, r_responder) = channel(); + let t_responder = streamer::responder(send, exit.clone(), resp_recycler.clone(), r_responder); + let alice = Mint::new(10_000); let acc = Accountant::new(&alice); let bob_pubkey = KeyPair::new().pubkey(); - let exit = Arc::new(AtomicBool::new(false)); let historian = Historian::new(&alice.last_id(), Some(30)); let acc = Arc::new(Mutex::new(AccountantSkel::new( acc, @@ -560,11 +575,24 @@ mod tests { sink(), historian, ))); - let node_me = Node::default(); - let node_leader = Node::default(); - let subs = Subscribers::new(node_me, node_leader, &[]); + let _threads = AccountantSkel::replicate(&acc, subs, exit.clone()).unwrap(); + + let mut msgs = VecDeque::new(); + for i in 0..10 { + let b = resp_recycler.allocate(); + let b_ = b.clone(); + let mut w = b.write().unwrap(); + w.data[0] = i as u8; + w.meta.size = PACKET_DATA_SIZE; + w.meta.set_addr(&addr); + msgs.push_back(b_); + } + s_responder.send(msgs).expect("send"); + exit.store(true, Ordering::Relaxed); + t_receiver.join().expect("join"); + t_responder.join().expect("join"); } }