From 300e3d151dcc8a7a466a236cd658eb0f38c4fa8d Mon Sep 17 00:00:00 2001 From: Anatoly Yakovenko Date: Sun, 17 Feb 2019 13:05:47 -0800 Subject: [PATCH] remove the signal sender since its superfelous to a recv error --- src/blocktree.rs | 14 +++++++------- src/fullnode.rs | 13 +++---------- src/replay_stage.rs | 32 ++++++++++++-------------------- src/tvu.rs | 10 +++------- 4 files changed, 25 insertions(+), 44 deletions(-) diff --git a/src/blocktree.rs b/src/blocktree.rs index 10bc99d634..c6f56433e9 100644 --- a/src/blocktree.rs +++ b/src/blocktree.rs @@ -393,12 +393,12 @@ impl Blocktree { }) } - pub fn open_with_signal(ledger_path: &str) -> Result<(Self, SyncSender, Receiver)> { + pub fn open_with_signal(ledger_path: &str) -> Result<(Self, Receiver)> { let mut blocktree = Self::open(ledger_path)?; let (signal_sender, signal_receiver) = sync_channel(1); - blocktree.new_blobs_signals = vec![signal_sender.clone()]; + blocktree.new_blobs_signals = vec![signal_sender]; - Ok((blocktree, signal_sender, signal_receiver)) + Ok((blocktree, signal_receiver)) } #[allow(clippy::trivially_copy_pass_by_ref)] @@ -412,13 +412,13 @@ impl Blocktree { pub fn open_with_config_signal( ledger_path: &str, config: &BlocktreeConfig, - ) -> Result<(Self, SyncSender, Receiver)> { + ) -> Result<(Self, Receiver)> { let mut blocktree = Self::open(ledger_path)?; let (signal_sender, signal_receiver) = sync_channel(1); - blocktree.new_blobs_signals = vec![signal_sender.clone()]; + blocktree.new_blobs_signals = vec![signal_sender]; blocktree.ticks_per_slot = config.ticks_per_slot; - Ok((blocktree, signal_sender, signal_receiver)) + Ok((blocktree, signal_receiver)) } pub fn meta(&self, slot_height: u64) -> Result> { @@ -1855,7 +1855,7 @@ mod tests { pub fn test_new_blobs_signal() { // Initialize ledger let ledger_path = get_tmp_ledger_path("test_new_blobs_signal"); - let (ledger, _, recvr) = Blocktree::open_with_signal(&ledger_path).unwrap(); + let (ledger, recvr) = Blocktree::open_with_signal(&ledger_path).unwrap(); let ledger = Arc::new(ledger); let entries_per_slot = 10; diff --git a/src/fullnode.rs b/src/fullnode.rs index 20372c81f4..5eb8bff115 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -25,7 +25,7 @@ use solana_sdk::timing::{duration_as_ms, timestamp}; use std::net::UdpSocket; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::mpsc::{channel, Receiver, RecvTimeoutError, Sender, SyncSender}; +use std::sync::mpsc::{channel, Receiver, RecvTimeoutError, Sender}; use std::sync::{Arc, RwLock}; use std::thread::{sleep, spawn, Result}; use std::time::{Duration, Instant}; @@ -132,7 +132,6 @@ impl Fullnode { entry_height, last_entry_id, blocktree, - ledger_signal_sender, ledger_signal_receiver, ) = new_bank_from_ledger(ledger_path, &config.ledger_config(), &leader_scheduler); @@ -269,7 +268,6 @@ impl Fullnode { &rotation_sender, &storage_state, config.entry_stream.as_ref(), - ledger_signal_sender, ledger_signal_receiver, leader_scheduler.clone(), ); @@ -465,10 +463,9 @@ fn new_banks_from_blocktree( u64, Hash, Blocktree, - SyncSender, Receiver, ) { - let (blocktree, ledger_signal_sender, ledger_signal_receiver) = + let (blocktree, ledger_signal_receiver) = Blocktree::open_with_config_signal(blocktree_path, blocktree_config) .expect("Expected to successfully open database ledger"); let genesis_block = @@ -497,7 +494,6 @@ fn new_banks_from_blocktree( entry_height, last_entry_id, blocktree, - ledger_signal_sender, ledger_signal_receiver, ) } @@ -512,7 +508,6 @@ pub fn new_bank_from_ledger( u64, Hash, Blocktree, - SyncSender, Receiver, ) { let ( @@ -520,7 +515,6 @@ pub fn new_bank_from_ledger( entry_height, last_entry_id, blocktree, - ledger_signal_sender, ledger_signal_receiver, ) = new_banks_from_blocktree(ledger_path, ledger_config, leader_scheduler); ( @@ -528,7 +522,6 @@ pub fn new_bank_from_ledger( entry_height, last_entry_id, blocktree, - ledger_signal_sender, ledger_signal_receiver, ) } @@ -869,7 +862,7 @@ mod tests { // Close the validator so that rocksdb has locks available validator_exit(); let leader_scheduler = Arc::new(RwLock::new(LeaderScheduler::default())); - let (bank, entry_height, _, _, _, _) = new_bank_from_ledger( + let (bank, entry_height, _, _, _) = new_bank_from_ledger( &validator_ledger_path, &BlocktreeConfig::default(), &leader_scheduler, diff --git a/src/replay_stage.rs b/src/replay_stage.rs index cc61d21a36..fe5e1f0c38 100644 --- a/src/replay_stage.rs +++ b/src/replay_stage.rs @@ -6,6 +6,7 @@ use crate::blocktree_processor; use crate::cluster_info::ClusterInfo; use crate::counter::Counter; use crate::entry::{Entry, EntryReceiver, EntrySender, EntrySlice}; +use std::sync::mpsc::RecvTimeoutError; use crate::leader_scheduler::LeaderScheduler; use crate::packet::BlobError; use crate::result::{Error, Result}; @@ -19,12 +20,11 @@ use solana_sdk::pubkey::Pubkey; use solana_sdk::timing::duration_as_ms; use solana_sdk::vote_transaction::VoteTransaction; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::mpsc::{channel, Receiver, SyncSender}; +use std::sync::mpsc::{channel, Receiver}; use std::sync::{Arc, RwLock}; #[cfg(test)] use std::thread::sleep; use std::thread::{self, Builder, JoinHandle}; -#[cfg(test)] use std::time::Duration; use std::time::Instant; @@ -51,7 +51,6 @@ impl Drop for Finalizer { pub struct ReplayStage { t_replay: JoinHandle<()>, exit: Arc, - ledger_signal_sender: SyncSender, #[cfg(test)] pause: Arc, } @@ -181,7 +180,6 @@ impl ReplayStage { mut current_blob_index: u64, last_entry_id: Arc>, to_leader_sender: &TvuRotationSender, - ledger_signal_sender: SyncSender, ledger_signal_receiver: Receiver, leader_scheduler: &Arc>, ) -> (Self, EntryReceiver) { @@ -225,6 +223,13 @@ impl ReplayStage { while pause_.load(Ordering::Relaxed) { sleep(Duration::from_millis(200)); } + let timer = Duration::from_millis(100); + let e = ledger_signal_receiver.recv_timeout(timer); + match e { + Err(RecvTimeoutError::Disconnected) => continue, + Err(_) => break, + Ok(_) => (), + }; if current_slot.is_none() { let new_slot = Self::get_next_slot( @@ -260,7 +265,6 @@ impl ReplayStage { } }; - let entry_len = entries.len(); // Fetch the next entries from the database if !entries.is_empty() { if let Err(e) = Self::process_entries( @@ -303,13 +307,6 @@ impl ReplayStage { continue; } } - - // Block until there are updates again - if entry_len < MAX_ENTRY_RECV_PER_ITER && ledger_signal_receiver.recv().is_err() - { - // Update disconnected, exit - break; - } } }) .unwrap(); @@ -318,7 +315,6 @@ impl ReplayStage { Self { t_replay, exit, - ledger_signal_sender, #[cfg(test)] pause, }, @@ -338,7 +334,6 @@ impl ReplayStage { pub fn exit(&self) { self.exit.store(true, Ordering::Relaxed); - let _ = self.ledger_signal_sender.send(true); } fn get_leader_for_next_tick( @@ -460,7 +455,7 @@ mod test { { // Set up the bank let blocktree_config = BlocktreeConfig::new(ticks_per_slot); - let (bank, _entry_height, last_entry_id, blocktree, l_sender, l_receiver) = + let (bank, _entry_height, last_entry_id, blocktree, l_receiver) = new_bank_from_ledger(&my_ledger_path, &blocktree_config, &leader_scheduler); // Set up the replay stage @@ -478,7 +473,6 @@ mod test { meta.consumed, Arc::new(RwLock::new(last_entry_id)), &rotation_sender, - l_sender, l_receiver, &leader_scheduler, ); @@ -565,7 +559,7 @@ mod test { let (to_leader_sender, _) = channel(); { let leader_scheduler = Arc::new(RwLock::new(LeaderScheduler::default())); - let (bank, entry_height, last_entry_id, blocktree, l_sender, l_receiver) = + let (bank, entry_height, last_entry_id, blocktree, l_receiver) = new_bank_from_ledger( &my_ledger_path, &BlocktreeConfig::default(), @@ -583,7 +577,6 @@ mod test { entry_height, Arc::new(RwLock::new(last_entry_id)), &to_leader_sender, - l_sender, l_receiver, &leader_scheduler, ); @@ -689,7 +682,7 @@ mod test { let (rotation_tx, rotation_rx) = channel(); let exit = Arc::new(AtomicBool::new(false)); { - let (bank, _entry_height, last_entry_id, blocktree, l_sender, l_receiver) = + let (bank, _entry_height, last_entry_id, blocktree, l_receiver) = new_bank_from_ledger(&my_ledger_path, &blocktree_config, &leader_scheduler); let meta = blocktree @@ -709,7 +702,6 @@ mod test { meta.consumed, Arc::new(RwLock::new(last_entry_id)), &rotation_tx, - l_sender, l_receiver, &leader_scheduler, ); diff --git a/src/tvu.rs b/src/tvu.rs index a74d02c02d..4d46d1bce2 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -28,7 +28,7 @@ use solana_sdk::hash::Hash; use solana_sdk::signature::{Keypair, KeypairUtil}; use std::net::UdpSocket; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::mpsc::{channel, Receiver, SyncSender}; +use std::sync::mpsc::{channel, Receiver}; use std::sync::{Arc, RwLock}; use std::thread; @@ -77,7 +77,6 @@ impl Tvu { to_leader_sender: &TvuRotationSender, storage_state: &StorageState, entry_stream: Option<&String>, - ledger_signal_sender: SyncSender, ledger_signal_receiver: Receiver, leader_scheduler: Arc>, ) -> Self { @@ -129,7 +128,6 @@ impl Tvu { blob_index, l_last_entry_id.clone(), to_leader_sender, - ledger_signal_sender, ledger_signal_receiver, &leader_scheduler, ); @@ -258,7 +256,7 @@ pub mod tests { let cur_hash = Hash::default(); let blocktree_path = get_tmp_ledger_path("test_tvu_exit"); - let (blocktree, l_sender, l_receiver) = Blocktree::open_with_signal(&blocktree_path) + let (blocktree, l_receiver) = Blocktree::open_with_signal(&blocktree_path) .expect("Expected to successfully open ledger"); let vote_account_keypair = Arc::new(Keypair::new()); let voting_keypair = VotingKeypair::new_local(&vote_account_keypair); @@ -282,7 +280,6 @@ pub mod tests { &sender, &StorageState::default(), None, - l_sender, l_receiver, leader_scheduler, ); @@ -356,7 +353,7 @@ pub mod tests { let mut cur_hash = Hash::default(); let blocktree_path = get_tmp_ledger_path("test_replay"); - let (blocktree, l_sender, l_receiver) = + let (blocktree, l_receiver) = Blocktree::open_with_config_signal(&blocktree_path, &blocktree_config) .expect("Expected to successfully open ledger"); let vote_account_keypair = Arc::new(Keypair::new()); @@ -381,7 +378,6 @@ pub mod tests { &sender, &StorageState::default(), None, - l_sender, l_receiver, leader_scheduler, );