remove the signal sender since its superfelous to a recv error

This commit is contained in:
Anatoly Yakovenko 2019-02-17 13:05:47 -08:00 committed by Grimes
parent 2f7911b62a
commit 300e3d151d
4 changed files with 25 additions and 44 deletions

View File

@ -393,12 +393,12 @@ impl Blocktree {
}) })
} }
pub fn open_with_signal(ledger_path: &str) -> Result<(Self, SyncSender<bool>, Receiver<bool>)> { pub fn open_with_signal(ledger_path: &str) -> Result<(Self, Receiver<bool>)> {
let mut blocktree = Self::open(ledger_path)?; let mut blocktree = Self::open(ledger_path)?;
let (signal_sender, signal_receiver) = sync_channel(1); let (signal_sender, signal_receiver) = sync_channel(1);
blocktree.new_blobs_signals = vec![signal_sender.clone()]; blocktree.new_blobs_signals = vec![signal_sender];
Ok((blocktree, signal_sender, signal_receiver)) Ok((blocktree, signal_receiver))
} }
#[allow(clippy::trivially_copy_pass_by_ref)] #[allow(clippy::trivially_copy_pass_by_ref)]
@ -412,13 +412,13 @@ impl Blocktree {
pub fn open_with_config_signal( pub fn open_with_config_signal(
ledger_path: &str, ledger_path: &str,
config: &BlocktreeConfig, config: &BlocktreeConfig,
) -> Result<(Self, SyncSender<bool>, Receiver<bool>)> { ) -> Result<(Self, Receiver<bool>)> {
let mut blocktree = Self::open(ledger_path)?; let mut blocktree = Self::open(ledger_path)?;
let (signal_sender, signal_receiver) = sync_channel(1); let (signal_sender, signal_receiver) = sync_channel(1);
blocktree.new_blobs_signals = vec![signal_sender.clone()]; blocktree.new_blobs_signals = vec![signal_sender];
blocktree.ticks_per_slot = config.ticks_per_slot; blocktree.ticks_per_slot = config.ticks_per_slot;
Ok((blocktree, signal_sender, signal_receiver)) Ok((blocktree, signal_receiver))
} }
pub fn meta(&self, slot_height: u64) -> Result<Option<SlotMeta>> { pub fn meta(&self, slot_height: u64) -> Result<Option<SlotMeta>> {
@ -1855,7 +1855,7 @@ mod tests {
pub fn test_new_blobs_signal() { pub fn test_new_blobs_signal() {
// Initialize ledger // Initialize ledger
let ledger_path = get_tmp_ledger_path("test_new_blobs_signal"); let ledger_path = get_tmp_ledger_path("test_new_blobs_signal");
let (ledger, _, recvr) = Blocktree::open_with_signal(&ledger_path).unwrap(); let (ledger, recvr) = Blocktree::open_with_signal(&ledger_path).unwrap();
let ledger = Arc::new(ledger); let ledger = Arc::new(ledger);
let entries_per_slot = 10; let entries_per_slot = 10;

View File

@ -25,7 +25,7 @@ use solana_sdk::timing::{duration_as_ms, timestamp};
use std::net::UdpSocket; use std::net::UdpSocket;
use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{channel, Receiver, RecvTimeoutError, Sender, SyncSender}; use std::sync::mpsc::{channel, Receiver, RecvTimeoutError, Sender};
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use std::thread::{sleep, spawn, Result}; use std::thread::{sleep, spawn, Result};
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
@ -132,7 +132,6 @@ impl Fullnode {
entry_height, entry_height,
last_entry_id, last_entry_id,
blocktree, blocktree,
ledger_signal_sender,
ledger_signal_receiver, ledger_signal_receiver,
) = new_bank_from_ledger(ledger_path, &config.ledger_config(), &leader_scheduler); ) = new_bank_from_ledger(ledger_path, &config.ledger_config(), &leader_scheduler);
@ -269,7 +268,6 @@ impl Fullnode {
&rotation_sender, &rotation_sender,
&storage_state, &storage_state,
config.entry_stream.as_ref(), config.entry_stream.as_ref(),
ledger_signal_sender,
ledger_signal_receiver, ledger_signal_receiver,
leader_scheduler.clone(), leader_scheduler.clone(),
); );
@ -465,10 +463,9 @@ fn new_banks_from_blocktree(
u64, u64,
Hash, Hash,
Blocktree, Blocktree,
SyncSender<bool>,
Receiver<bool>, Receiver<bool>,
) { ) {
let (blocktree, ledger_signal_sender, ledger_signal_receiver) = let (blocktree, ledger_signal_receiver) =
Blocktree::open_with_config_signal(blocktree_path, blocktree_config) Blocktree::open_with_config_signal(blocktree_path, blocktree_config)
.expect("Expected to successfully open database ledger"); .expect("Expected to successfully open database ledger");
let genesis_block = let genesis_block =
@ -497,7 +494,6 @@ fn new_banks_from_blocktree(
entry_height, entry_height,
last_entry_id, last_entry_id,
blocktree, blocktree,
ledger_signal_sender,
ledger_signal_receiver, ledger_signal_receiver,
) )
} }
@ -512,7 +508,6 @@ pub fn new_bank_from_ledger(
u64, u64,
Hash, Hash,
Blocktree, Blocktree,
SyncSender<bool>,
Receiver<bool>, Receiver<bool>,
) { ) {
let ( let (
@ -520,7 +515,6 @@ pub fn new_bank_from_ledger(
entry_height, entry_height,
last_entry_id, last_entry_id,
blocktree, blocktree,
ledger_signal_sender,
ledger_signal_receiver, ledger_signal_receiver,
) = new_banks_from_blocktree(ledger_path, ledger_config, leader_scheduler); ) = new_banks_from_blocktree(ledger_path, ledger_config, leader_scheduler);
( (
@ -528,7 +522,6 @@ pub fn new_bank_from_ledger(
entry_height, entry_height,
last_entry_id, last_entry_id,
blocktree, blocktree,
ledger_signal_sender,
ledger_signal_receiver, ledger_signal_receiver,
) )
} }
@ -869,7 +862,7 @@ mod tests {
// Close the validator so that rocksdb has locks available // Close the validator so that rocksdb has locks available
validator_exit(); validator_exit();
let leader_scheduler = Arc::new(RwLock::new(LeaderScheduler::default())); let leader_scheduler = Arc::new(RwLock::new(LeaderScheduler::default()));
let (bank, entry_height, _, _, _, _) = new_bank_from_ledger( let (bank, entry_height, _, _, _) = new_bank_from_ledger(
&validator_ledger_path, &validator_ledger_path,
&BlocktreeConfig::default(), &BlocktreeConfig::default(),
&leader_scheduler, &leader_scheduler,

View File

@ -6,6 +6,7 @@ use crate::blocktree_processor;
use crate::cluster_info::ClusterInfo; use crate::cluster_info::ClusterInfo;
use crate::counter::Counter; use crate::counter::Counter;
use crate::entry::{Entry, EntryReceiver, EntrySender, EntrySlice}; use crate::entry::{Entry, EntryReceiver, EntrySender, EntrySlice};
use std::sync::mpsc::RecvTimeoutError;
use crate::leader_scheduler::LeaderScheduler; use crate::leader_scheduler::LeaderScheduler;
use crate::packet::BlobError; use crate::packet::BlobError;
use crate::result::{Error, Result}; use crate::result::{Error, Result};
@ -19,12 +20,11 @@ use solana_sdk::pubkey::Pubkey;
use solana_sdk::timing::duration_as_ms; use solana_sdk::timing::duration_as_ms;
use solana_sdk::vote_transaction::VoteTransaction; use solana_sdk::vote_transaction::VoteTransaction;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{channel, Receiver, SyncSender}; use std::sync::mpsc::{channel, Receiver};
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
#[cfg(test)] #[cfg(test)]
use std::thread::sleep; use std::thread::sleep;
use std::thread::{self, Builder, JoinHandle}; use std::thread::{self, Builder, JoinHandle};
#[cfg(test)]
use std::time::Duration; use std::time::Duration;
use std::time::Instant; use std::time::Instant;
@ -51,7 +51,6 @@ impl Drop for Finalizer {
pub struct ReplayStage { pub struct ReplayStage {
t_replay: JoinHandle<()>, t_replay: JoinHandle<()>,
exit: Arc<AtomicBool>, exit: Arc<AtomicBool>,
ledger_signal_sender: SyncSender<bool>,
#[cfg(test)] #[cfg(test)]
pause: Arc<AtomicBool>, pause: Arc<AtomicBool>,
} }
@ -181,7 +180,6 @@ impl ReplayStage {
mut current_blob_index: u64, mut current_blob_index: u64,
last_entry_id: Arc<RwLock<Hash>>, last_entry_id: Arc<RwLock<Hash>>,
to_leader_sender: &TvuRotationSender, to_leader_sender: &TvuRotationSender,
ledger_signal_sender: SyncSender<bool>,
ledger_signal_receiver: Receiver<bool>, ledger_signal_receiver: Receiver<bool>,
leader_scheduler: &Arc<RwLock<LeaderScheduler>>, leader_scheduler: &Arc<RwLock<LeaderScheduler>>,
) -> (Self, EntryReceiver) { ) -> (Self, EntryReceiver) {
@ -225,6 +223,13 @@ impl ReplayStage {
while pause_.load(Ordering::Relaxed) { while pause_.load(Ordering::Relaxed) {
sleep(Duration::from_millis(200)); sleep(Duration::from_millis(200));
} }
let timer = Duration::from_millis(100);
let e = ledger_signal_receiver.recv_timeout(timer);
match e {
Err(RecvTimeoutError::Disconnected) => continue,
Err(_) => break,
Ok(_) => (),
};
if current_slot.is_none() { if current_slot.is_none() {
let new_slot = Self::get_next_slot( let new_slot = Self::get_next_slot(
@ -260,7 +265,6 @@ impl ReplayStage {
} }
}; };
let entry_len = entries.len();
// Fetch the next entries from the database // Fetch the next entries from the database
if !entries.is_empty() { if !entries.is_empty() {
if let Err(e) = Self::process_entries( if let Err(e) = Self::process_entries(
@ -303,13 +307,6 @@ impl ReplayStage {
continue; continue;
} }
} }
// Block until there are updates again
if entry_len < MAX_ENTRY_RECV_PER_ITER && ledger_signal_receiver.recv().is_err()
{
// Update disconnected, exit
break;
}
} }
}) })
.unwrap(); .unwrap();
@ -318,7 +315,6 @@ impl ReplayStage {
Self { Self {
t_replay, t_replay,
exit, exit,
ledger_signal_sender,
#[cfg(test)] #[cfg(test)]
pause, pause,
}, },
@ -338,7 +334,6 @@ impl ReplayStage {
pub fn exit(&self) { pub fn exit(&self) {
self.exit.store(true, Ordering::Relaxed); self.exit.store(true, Ordering::Relaxed);
let _ = self.ledger_signal_sender.send(true);
} }
fn get_leader_for_next_tick( fn get_leader_for_next_tick(
@ -460,7 +455,7 @@ mod test {
{ {
// Set up the bank // Set up the bank
let blocktree_config = BlocktreeConfig::new(ticks_per_slot); let blocktree_config = BlocktreeConfig::new(ticks_per_slot);
let (bank, _entry_height, last_entry_id, blocktree, l_sender, l_receiver) = let (bank, _entry_height, last_entry_id, blocktree, l_receiver) =
new_bank_from_ledger(&my_ledger_path, &blocktree_config, &leader_scheduler); new_bank_from_ledger(&my_ledger_path, &blocktree_config, &leader_scheduler);
// Set up the replay stage // Set up the replay stage
@ -478,7 +473,6 @@ mod test {
meta.consumed, meta.consumed,
Arc::new(RwLock::new(last_entry_id)), Arc::new(RwLock::new(last_entry_id)),
&rotation_sender, &rotation_sender,
l_sender,
l_receiver, l_receiver,
&leader_scheduler, &leader_scheduler,
); );
@ -565,7 +559,7 @@ mod test {
let (to_leader_sender, _) = channel(); let (to_leader_sender, _) = channel();
{ {
let leader_scheduler = Arc::new(RwLock::new(LeaderScheduler::default())); let leader_scheduler = Arc::new(RwLock::new(LeaderScheduler::default()));
let (bank, entry_height, last_entry_id, blocktree, l_sender, l_receiver) = let (bank, entry_height, last_entry_id, blocktree, l_receiver) =
new_bank_from_ledger( new_bank_from_ledger(
&my_ledger_path, &my_ledger_path,
&BlocktreeConfig::default(), &BlocktreeConfig::default(),
@ -583,7 +577,6 @@ mod test {
entry_height, entry_height,
Arc::new(RwLock::new(last_entry_id)), Arc::new(RwLock::new(last_entry_id)),
&to_leader_sender, &to_leader_sender,
l_sender,
l_receiver, l_receiver,
&leader_scheduler, &leader_scheduler,
); );
@ -689,7 +682,7 @@ mod test {
let (rotation_tx, rotation_rx) = channel(); let (rotation_tx, rotation_rx) = channel();
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
{ {
let (bank, _entry_height, last_entry_id, blocktree, l_sender, l_receiver) = let (bank, _entry_height, last_entry_id, blocktree, l_receiver) =
new_bank_from_ledger(&my_ledger_path, &blocktree_config, &leader_scheduler); new_bank_from_ledger(&my_ledger_path, &blocktree_config, &leader_scheduler);
let meta = blocktree let meta = blocktree
@ -709,7 +702,6 @@ mod test {
meta.consumed, meta.consumed,
Arc::new(RwLock::new(last_entry_id)), Arc::new(RwLock::new(last_entry_id)),
&rotation_tx, &rotation_tx,
l_sender,
l_receiver, l_receiver,
&leader_scheduler, &leader_scheduler,
); );

View File

@ -28,7 +28,7 @@ use solana_sdk::hash::Hash;
use solana_sdk::signature::{Keypair, KeypairUtil}; use solana_sdk::signature::{Keypair, KeypairUtil};
use std::net::UdpSocket; use std::net::UdpSocket;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{channel, Receiver, SyncSender}; use std::sync::mpsc::{channel, Receiver};
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use std::thread; use std::thread;
@ -77,7 +77,6 @@ impl Tvu {
to_leader_sender: &TvuRotationSender, to_leader_sender: &TvuRotationSender,
storage_state: &StorageState, storage_state: &StorageState,
entry_stream: Option<&String>, entry_stream: Option<&String>,
ledger_signal_sender: SyncSender<bool>,
ledger_signal_receiver: Receiver<bool>, ledger_signal_receiver: Receiver<bool>,
leader_scheduler: Arc<RwLock<LeaderScheduler>>, leader_scheduler: Arc<RwLock<LeaderScheduler>>,
) -> Self { ) -> Self {
@ -129,7 +128,6 @@ impl Tvu {
blob_index, blob_index,
l_last_entry_id.clone(), l_last_entry_id.clone(),
to_leader_sender, to_leader_sender,
ledger_signal_sender,
ledger_signal_receiver, ledger_signal_receiver,
&leader_scheduler, &leader_scheduler,
); );
@ -258,7 +256,7 @@ pub mod tests {
let cur_hash = Hash::default(); let cur_hash = Hash::default();
let blocktree_path = get_tmp_ledger_path("test_tvu_exit"); let blocktree_path = get_tmp_ledger_path("test_tvu_exit");
let (blocktree, l_sender, l_receiver) = Blocktree::open_with_signal(&blocktree_path) let (blocktree, l_receiver) = Blocktree::open_with_signal(&blocktree_path)
.expect("Expected to successfully open ledger"); .expect("Expected to successfully open ledger");
let vote_account_keypair = Arc::new(Keypair::new()); let vote_account_keypair = Arc::new(Keypair::new());
let voting_keypair = VotingKeypair::new_local(&vote_account_keypair); let voting_keypair = VotingKeypair::new_local(&vote_account_keypair);
@ -282,7 +280,6 @@ pub mod tests {
&sender, &sender,
&StorageState::default(), &StorageState::default(),
None, None,
l_sender,
l_receiver, l_receiver,
leader_scheduler, leader_scheduler,
); );
@ -356,7 +353,7 @@ pub mod tests {
let mut cur_hash = Hash::default(); let mut cur_hash = Hash::default();
let blocktree_path = get_tmp_ledger_path("test_replay"); let blocktree_path = get_tmp_ledger_path("test_replay");
let (blocktree, l_sender, l_receiver) = let (blocktree, l_receiver) =
Blocktree::open_with_config_signal(&blocktree_path, &blocktree_config) Blocktree::open_with_config_signal(&blocktree_path, &blocktree_config)
.expect("Expected to successfully open ledger"); .expect("Expected to successfully open ledger");
let vote_account_keypair = Arc::new(Keypair::new()); let vote_account_keypair = Arc::new(Keypair::new());
@ -381,7 +378,6 @@ pub mod tests {
&sender, &sender,
&StorageState::default(), &StorageState::default(),
None, None,
l_sender,
l_receiver, l_receiver,
leader_scheduler, leader_scheduler,
); );