Work on test_replicate to test replicate service

generate some messages to send to replicator service
This commit is contained in:
Stephen Akridge 2018-04-24 10:57:40 -07:00
parent 1b6cdd5637
commit 3be5f25f2f
1 changed files with 37 additions and 9 deletions

View File

@ -416,7 +416,7 @@ mod tests {
use accountant_skel::{to_packets, Request};
use bincode::serialize;
use ecdsa;
use packet::{PacketRecycler, NUM_PACKETS};
use packet::{BlobRecycler, PacketRecycler, NUM_PACKETS};
use transaction::{memfind, test_tx};
use accountant::Accountant;
@ -438,6 +438,10 @@ mod tests {
use transaction::Transaction;
use subscribers::{Node, Subscribers};
use streamer;
use std::sync::mpsc::channel;
use std::collections::VecDeque;
use packet::{PACKET_DATA_SIZE};
#[test]
fn test_layout() {
@ -545,14 +549,25 @@ mod tests {
#[test]
fn test_replicate() {
let serve_port = 9004;
let send_port = 9005;
let addr = format!("127.0.0.1:{}", serve_port);
let send_addr = format!("127.0.0.1:{}", send_port);
let read = UdpSocket::bind("127.0.0.1:0").expect("bind");
let addr = read.local_addr().unwrap();
let send = UdpSocket::bind("127.0.0.1:0").expect("bind");
let exit = Arc::new(AtomicBool::new(false));
let node_me = Node::default();
let node_leader = Node::new([0; 8], 0, send.local_addr().unwrap());
let subs = Subscribers::new(node_me, node_leader, &[]);
let recv_recycler = PacketRecycler::default();
let resp_recycler = BlobRecycler::default();
let (s_reader, r_reader) = channel();
let t_receiver = streamer::receiver(read, exit.clone(), recv_recycler.clone(), s_reader).unwrap();
let (s_responder, r_responder) = channel();
let t_responder = streamer::responder(send, exit.clone(), resp_recycler.clone(), r_responder);
let alice = Mint::new(10_000);
let acc = Accountant::new(&alice);
let bob_pubkey = KeyPair::new().pubkey();
let exit = Arc::new(AtomicBool::new(false));
let historian = Historian::new(&alice.last_id(), Some(30));
let acc = Arc::new(Mutex::new(AccountantSkel::new(
acc,
@ -560,11 +575,24 @@ mod tests {
sink(),
historian,
)));
let node_me = Node::default();
let node_leader = Node::default();
let subs = Subscribers::new(node_me, node_leader, &[]);
let _threads = AccountantSkel::replicate(&acc, subs, exit.clone()).unwrap();
let mut msgs = VecDeque::new();
for i in 0..10 {
let b = resp_recycler.allocate();
let b_ = b.clone();
let mut w = b.write().unwrap();
w.data[0] = i as u8;
w.meta.size = PACKET_DATA_SIZE;
w.meta.set_addr(&addr);
msgs.push_back(b_);
}
s_responder.send(msgs).expect("send");
exit.store(true, Ordering::Relaxed);
t_receiver.join().expect("join");
t_responder.join().expect("join");
}
}