Remove store_ledger_stage which is no longer needed

This commit is contained in:
Stephen Akridge 2019-01-09 12:47:06 -08:00 committed by sakridge
parent 4b5acc065a
commit 45b4cf2887
3 changed files with 1 additions and 80 deletions

View File

@ -63,7 +63,6 @@ pub mod sigverify;
pub mod sigverify_stage; pub mod sigverify_stage;
pub mod status_deque; pub mod status_deque;
pub mod storage_stage; pub mod storage_stage;
pub mod store_ledger_stage;
pub mod streamer; pub mod streamer;
pub mod test_tx; pub mod test_tx;
pub mod thin_client; pub mod thin_client;

View File

@ -10,7 +10,6 @@ use crate::result::Result;
use crate::rpc_request::{RpcClient, RpcRequest}; use crate::rpc_request::{RpcClient, RpcRequest};
use crate::service::Service; use crate::service::Service;
use crate::storage_stage::ENTRIES_PER_SEGMENT; use crate::storage_stage::ENTRIES_PER_SEGMENT;
use crate::store_ledger_stage::StoreLedgerStage;
use crate::streamer::BlobReceiver; use crate::streamer::BlobReceiver;
use crate::thin_client::retry_get_balance; use crate::thin_client::retry_get_balance;
use crate::window_service::window_service; use crate::window_service::window_service;
@ -41,7 +40,6 @@ use std::time::Duration;
pub struct Replicator { pub struct Replicator {
gossip_service: GossipService, gossip_service: GossipService,
fetch_stage: BlobFetchStage, fetch_stage: BlobFetchStage,
store_ledger_stage: StoreLedgerStage,
t_window: JoinHandle<()>, t_window: JoinHandle<()>,
pub retransmit_receiver: BlobReceiver, pub retransmit_receiver: BlobReceiver,
exit: Arc<AtomicBool>, exit: Arc<AtomicBool>,
@ -104,8 +102,7 @@ impl Replicator {
cluster_info_w.set_leader(leader_info.id); cluster_info_w.set_leader(leader_info.id);
} }
let (entry_window_sender, entry_window_receiver) = channel(); let (entry_window_sender, _entry_window_receiver) = channel();
let store_ledger_stage = StoreLedgerStage::new(entry_window_receiver, ledger_path);
// Create DbLedger, eventually will simply repurpose the input // Create DbLedger, eventually will simply repurpose the input
// ledger path as the DbLedger path once we replace the ledger with // ledger path as the DbLedger path once we replace the ledger with
@ -274,7 +271,6 @@ impl Replicator {
Ok(Self { Ok(Self {
gossip_service, gossip_service,
fetch_stage, fetch_stage,
store_ledger_stage,
t_window, t_window,
retransmit_receiver, retransmit_receiver,
exit, exit,
@ -290,7 +286,6 @@ impl Replicator {
self.gossip_service.join().unwrap(); self.gossip_service.join().unwrap();
self.fetch_stage.join().unwrap(); self.fetch_stage.join().unwrap();
self.t_window.join().unwrap(); self.t_window.join().unwrap();
self.store_ledger_stage.join().unwrap();
// Drain the queue here to prevent self.retransmit_receiver from being dropped // Drain the queue here to prevent self.retransmit_receiver from being dropped
// before the window_service thread is joined // before the window_service thread is joined

View File

@ -1,73 +0,0 @@
//! The `store_ledger` stores the ledger from received entries for storage nodes
use crate::counter::Counter;
use crate::entry::EntryReceiver;
use crate::ledger::LedgerWriter;
use crate::result::{Error, Result};
use crate::service::Service;
use log::Level;
use std::sync::atomic::AtomicUsize;
use std::sync::mpsc::RecvTimeoutError;
use std::thread::{self, Builder, JoinHandle};
use std::time::Duration;
pub struct StoreLedgerStage {
thread_hdls: Vec<JoinHandle<()>>,
}
impl StoreLedgerStage {
/// Process entries, already in order
fn store_requests(
window_receiver: &EntryReceiver,
ledger_writer: Option<&mut LedgerWriter>,
) -> Result<()> {
let timer = Duration::new(1, 0);
let mut entries = window_receiver.recv_timeout(timer)?;
while let Ok(mut more) = window_receiver.try_recv() {
entries.append(&mut more);
}
inc_new_counter_info!(
"store-transactions",
entries.iter().map(|x| x.transactions.len()).sum()
);
if let Some(ledger_writer) = ledger_writer {
ledger_writer.write_entries(&entries)?;
}
Ok(())
}
pub fn new(window_receiver: EntryReceiver, ledger_path: Option<&str>) -> Self {
let mut ledger_writer = ledger_path.map(|p| LedgerWriter::open(p, true).unwrap());
let t_store_requests = Builder::new()
.name("solana-store-ledger-stage".to_string())
.spawn(move || loop {
if let Err(e) = Self::store_requests(&window_receiver, ledger_writer.as_mut()) {
match e {
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
_ => error!("{:?}", e),
}
}
})
.unwrap();
let thread_hdls = vec![t_store_requests];
StoreLedgerStage { thread_hdls }
}
}
impl Service for StoreLedgerStage {
type JoinReturnType = ();
fn join(self) -> thread::Result<()> {
for thread_hdl in self.thread_hdls {
thread_hdl.join()?;
}
Ok(())
}
}