From 45b4cf2887376a6aee76a83e2ea695d71c3b66f1 Mon Sep 17 00:00:00 2001 From: Stephen Akridge Date: Wed, 9 Jan 2019 12:47:06 -0800 Subject: [PATCH] Remove store_ledger_stage which is no longer needed --- src/lib.rs | 1 - src/replicator.rs | 7 +--- src/store_ledger_stage.rs | 73 --------------------------------------- 3 files changed, 1 insertion(+), 80 deletions(-) delete mode 100644 src/store_ledger_stage.rs diff --git a/src/lib.rs b/src/lib.rs index 168afbf112..e67a5e201c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -63,7 +63,6 @@ pub mod sigverify; pub mod sigverify_stage; pub mod status_deque; pub mod storage_stage; -pub mod store_ledger_stage; pub mod streamer; pub mod test_tx; pub mod thin_client; diff --git a/src/replicator.rs b/src/replicator.rs index b387fe80ad..772e9e8747 100644 --- a/src/replicator.rs +++ b/src/replicator.rs @@ -10,7 +10,6 @@ use crate::result::Result; use crate::rpc_request::{RpcClient, RpcRequest}; use crate::service::Service; use crate::storage_stage::ENTRIES_PER_SEGMENT; -use crate::store_ledger_stage::StoreLedgerStage; use crate::streamer::BlobReceiver; use crate::thin_client::retry_get_balance; use crate::window_service::window_service; @@ -41,7 +40,6 @@ use std::time::Duration; pub struct Replicator { gossip_service: GossipService, fetch_stage: BlobFetchStage, - store_ledger_stage: StoreLedgerStage, t_window: JoinHandle<()>, pub retransmit_receiver: BlobReceiver, exit: Arc, @@ -104,8 +102,7 @@ impl Replicator { cluster_info_w.set_leader(leader_info.id); } - let (entry_window_sender, entry_window_receiver) = channel(); - let store_ledger_stage = StoreLedgerStage::new(entry_window_receiver, ledger_path); + let (entry_window_sender, _entry_window_receiver) = channel(); // Create DbLedger, eventually will simply repurpose the input // ledger path as the DbLedger path once we replace the ledger with @@ -274,7 +271,6 @@ impl Replicator { Ok(Self { gossip_service, fetch_stage, - store_ledger_stage, t_window, retransmit_receiver, exit, @@ -290,7 +286,6 @@ impl Replicator { self.gossip_service.join().unwrap(); self.fetch_stage.join().unwrap(); self.t_window.join().unwrap(); - self.store_ledger_stage.join().unwrap(); // Drain the queue here to prevent self.retransmit_receiver from being dropped // before the window_service thread is joined diff --git a/src/store_ledger_stage.rs b/src/store_ledger_stage.rs deleted file mode 100644 index f68bfa3e9e..0000000000 --- a/src/store_ledger_stage.rs +++ /dev/null @@ -1,73 +0,0 @@ -//! The `store_ledger` stores the ledger from received entries for storage nodes - -use crate::counter::Counter; -use crate::entry::EntryReceiver; -use crate::ledger::LedgerWriter; -use crate::result::{Error, Result}; -use crate::service::Service; -use log::Level; -use std::sync::atomic::AtomicUsize; -use std::sync::mpsc::RecvTimeoutError; -use std::thread::{self, Builder, JoinHandle}; -use std::time::Duration; - -pub struct StoreLedgerStage { - thread_hdls: Vec>, -} - -impl StoreLedgerStage { - /// Process entries, already in order - fn store_requests( - window_receiver: &EntryReceiver, - ledger_writer: Option<&mut LedgerWriter>, - ) -> Result<()> { - let timer = Duration::new(1, 0); - let mut entries = window_receiver.recv_timeout(timer)?; - while let Ok(mut more) = window_receiver.try_recv() { - entries.append(&mut more); - } - - inc_new_counter_info!( - "store-transactions", - entries.iter().map(|x| x.transactions.len()).sum() - ); - - if let Some(ledger_writer) = ledger_writer { - ledger_writer.write_entries(&entries)?; - } - - Ok(()) - } - - pub fn new(window_receiver: EntryReceiver, ledger_path: Option<&str>) -> Self { - let mut ledger_writer = ledger_path.map(|p| LedgerWriter::open(p, true).unwrap()); - - let t_store_requests = Builder::new() - .name("solana-store-ledger-stage".to_string()) - .spawn(move || loop { - if let Err(e) = Self::store_requests(&window_receiver, ledger_writer.as_mut()) { - match e { - Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, - Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (), - _ => error!("{:?}", e), - } - } - }) - .unwrap(); - - let thread_hdls = vec![t_store_requests]; - - StoreLedgerStage { thread_hdls } - } -} - -impl Service for StoreLedgerStage { - type JoinReturnType = (); - - fn join(self) -> thread::Result<()> { - for thread_hdl in self.thread_hdls { - thread_hdl.join()?; - } - Ok(()) - } -}