diff --git a/src/replicator.rs b/src/replicator.rs index d69b594a7..cc5ccf700 100644 --- a/src/replicator.rs +++ b/src/replicator.rs @@ -8,6 +8,7 @@ use std::sync::atomic::AtomicBool; use std::sync::mpsc::channel; use std::sync::{Arc, RwLock}; use std::thread::JoinHandle; +use std::time::Duration; use store_ledger_stage::StoreLedgerStage; use streamer::BlobReceiver; use window; @@ -86,6 +87,14 @@ impl Replicator { self.fetch_stage.join().unwrap(); self.t_window.join().unwrap(); self.store_ledger_stage.join().unwrap(); + + // Drain the queue here to prevent self.retransmit_receiver from being dropped + // before the window_service thread is joined + let mut retransmit_queue_count = 0; + while let Ok(_blob) = self.retransmit_receiver.recv_timeout(Duration::new(1, 0)) { + retransmit_queue_count += 1; + } + debug!("retransmit channel count: {}", retransmit_queue_count); } } @@ -105,7 +114,6 @@ mod tests { use std::time::Duration; #[test] - #[ignore] fn test_replicator_startup() { logger::setup(); info!("starting replicator test"); @@ -154,7 +162,7 @@ mod tests { ); let mut num_entries = 0; - for _ in 0..10 { + for _ in 0..60 { match read_ledger(replicator_ledger_path, true) { Ok(entries) => { for _ in entries { @@ -169,7 +177,7 @@ mod tests { info!("error reading ledger: {:?}", e); } } - sleep(Duration::new(1, 0)); + sleep(Duration::from_millis(300)); let last_id = leader_client.get_last_id(); leader_client .transfer(1, &mint.keypair(), bob.pubkey(), &last_id)