diff --git a/src/ledger_write_stage.rs b/src/ledger_write_stage.rs index c55965689e..908943a8ef 100644 --- a/src/ledger_write_stage.rs +++ b/src/ledger_write_stage.rs @@ -8,7 +8,7 @@ use log::Level; use result::{Error, Result}; use service::Service; use std::sync::atomic::AtomicUsize; -use std::sync::mpsc::RecvTimeoutError; +use std::sync::mpsc::{channel, RecvTimeoutError}; use std::thread::{self, Builder, JoinHandle}; use std::time::{Duration, Instant}; use timing::duration_as_ms; @@ -21,7 +21,7 @@ impl LedgerWriteStage { pub fn write( ledger_writer: Option<&mut LedgerWriter>, entry_receiver: &EntryReceiver, - forwarder: &Option, + entry_sender: &EntrySender, ) -> Result<()> { let mut ventries = Vec::new(); let mut received_entries = entry_receiver.recv_timeout(Duration::new(1, 0))?; @@ -44,10 +44,8 @@ impl LedgerWriteStage { } inc_new_counter_info!("ledger_writer_stage-entries_received", num_new_entries); - if let Some(forwarder) = forwarder { - for entries in ventries { - forwarder.send(entries)?; - } + for entries in ventries { + entry_sender.send(entries)?; } inc_new_counter_info!( "ledger_writer_stage-time_ms", @@ -56,17 +54,15 @@ impl LedgerWriteStage { Ok(()) } - pub fn new( - ledger_path: Option<&str>, - entry_receiver: EntryReceiver, - forwarder: Option, - ) -> Self { + pub fn new(ledger_path: Option<&str>, entry_receiver: EntryReceiver) -> (Self, EntryReceiver) { let mut ledger_writer = ledger_path.map(|p| LedgerWriter::open(p, false).unwrap()); + let (entry_sender, entry_forwarder) = channel(); let write_thread = Builder::new() .name("solana-ledger-writer".to_string()) .spawn(move || loop { - if let Err(e) = Self::write(ledger_writer.as_mut(), &entry_receiver, &forwarder) { + if let Err(e) = Self::write(ledger_writer.as_mut(), &entry_receiver, &entry_sender) + { match e { Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => { break; @@ -83,7 +79,7 @@ impl LedgerWriteStage { }; }).unwrap(); - LedgerWriteStage { write_thread } + (LedgerWriteStage { write_thread }, entry_forwarder) } } diff --git a/src/tpu.rs b/src/tpu.rs index 41095a87dd..3a51699917 100644 --- a/src/tpu.rs +++ b/src/tpu.rs @@ -12,7 +12,6 @@ use service::Service; use sigverify_stage::SigVerifyStage; use std::net::UdpSocket; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::mpsc::channel; use std::sync::mpsc::Receiver; use std::sync::Arc; use std::thread; @@ -54,9 +53,8 @@ impl Tpu { max_tick_height, ); - let (ledger_entry_sender, entry_forwarder) = channel(); - let ledger_write_stage = - LedgerWriteStage::new(Some(ledger_path), entry_receiver, Some(ledger_entry_sender)); + let (ledger_write_stage, entry_forwarder) = + LedgerWriteStage::new(Some(ledger_path), entry_receiver); let tpu = Tpu { fetch_stage, diff --git a/src/tvu.rs b/src/tvu.rs index d7f5c6115b..f0aa65f5db 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -21,7 +21,6 @@ use service::Service; use signature::Keypair; use std::net::UdpSocket; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::mpsc::channel; use std::sync::{Arc, RwLock}; use std::thread; use storage_stage::{StorageStage, StorageState}; @@ -90,19 +89,8 @@ impl Tvu { bank.leader_scheduler.clone(), ); - let (storage_entry_sender, storage_entry_receiver) = channel(); - let storage_state = StorageState::new(); - let storage_stage = StorageStage::new( - &storage_state, - storage_entry_receiver, - ledger_path, - keypair.clone(), - exit.clone(), - entry_height, - ); - let (replicate_stage, ledger_entry_receiver) = ReplicateStage::new( - keypair, + keypair.clone(), vote_account_keypair, bank.clone(), cluster_info, @@ -111,10 +99,17 @@ impl Tvu { entry_height, ); - let ledger_write_stage = LedgerWriteStage::new( + let (ledger_write_stage, storage_entry_receiver) = + LedgerWriteStage::new(ledger_path, ledger_entry_receiver); + + let storage_state = StorageState::new(); + let storage_stage = StorageStage::new( + &storage_state, + storage_entry_receiver, ledger_path, - ledger_entry_receiver, - Some(storage_entry_sender), + keypair, + exit.clone(), + entry_height, ); Tvu {