fix tpu tvu bank race (#2707)

* Fix tpu tvu bank race

* Test highlighting race between tvu and tpu banks during leader to leader transitions
This commit is contained in:
carllin 2019-02-10 16:28:52 -08:00 committed by GitHub
parent 02c0098d57
commit 4b38ecd916
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 203 additions and 111 deletions

View File

@ -26,7 +26,7 @@ use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::mpsc::{channel, Receiver, RecvTimeoutError, Sender, SyncSender}; use std::sync::mpsc::{channel, Receiver, RecvTimeoutError, Sender, SyncSender};
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use std::thread::{spawn, Result}; use std::thread::{sleep, spawn, Result};
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
struct NodeServices { struct NodeServices {
@ -66,6 +66,7 @@ pub struct FullnodeConfig {
pub storage_rotate_count: u64, pub storage_rotate_count: u64,
pub leader_scheduler_config: LeaderSchedulerConfig, pub leader_scheduler_config: LeaderSchedulerConfig,
pub ledger_config: BlocktreeConfig, pub ledger_config: BlocktreeConfig,
pub tick_config: PohServiceConfig,
} }
impl Default for FullnodeConfig { impl Default for FullnodeConfig {
fn default() -> Self { fn default() -> Self {
@ -80,6 +81,7 @@ impl Default for FullnodeConfig {
storage_rotate_count: NUM_HASHES_FOR_STORAGE_ROTATE, storage_rotate_count: NUM_HASHES_FOR_STORAGE_ROTATE,
leader_scheduler_config: LeaderSchedulerConfig::default(), leader_scheduler_config: LeaderSchedulerConfig::default(),
ledger_config: BlocktreeConfig::default(), ledger_config: BlocktreeConfig::default(),
tick_config: PohServiceConfig::default(),
} }
} }
} }
@ -315,11 +317,16 @@ impl Fullnode {
fn leader_to_validator(&mut self, tick_height: u64) -> FullnodeReturnType { fn leader_to_validator(&mut self, tick_height: u64) -> FullnodeReturnType {
trace!( trace!(
"leader_to_validator({:?}): tick_height={}", "leader_to_validator({:?}): tick_height={}, bt: {}",
self.id, self.id,
tick_height, tick_height,
self.bank.tick_height()
); );
while self.bank.tick_height() < tick_height {
sleep(Duration::from_millis(10));
}
let scheduled_leader = { let scheduled_leader = {
let mut leader_scheduler = self.bank.leader_scheduler.write().unwrap(); let mut leader_scheduler = self.bank.leader_scheduler.write().unwrap();
@ -338,7 +345,7 @@ impl Fullnode {
if scheduled_leader == self.id { if scheduled_leader == self.id {
debug!("node is still the leader"); debug!("node is still the leader");
let last_entry_id = self.node_services.tvu.get_state(); let last_entry_id = self.bank.last_id();
self.validator_to_leader(tick_height, last_entry_id); self.validator_to_leader(tick_height, last_entry_id);
FullnodeReturnType::LeaderToLeaderRotation FullnodeReturnType::LeaderToLeaderRotation
} else { } else {
@ -554,11 +561,17 @@ impl Service for Fullnode {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use crate::blob_fetch_stage::BlobFetchStage;
use crate::blocktree::{create_tmp_sample_ledger, tmp_copy_ledger}; use crate::blocktree::{create_tmp_sample_ledger, tmp_copy_ledger};
use crate::entry::make_consecutive_blobs; use crate::entry::make_consecutive_blobs;
use crate::entry::EntrySlice;
use crate::gossip_service::{converge, make_listening_node};
use crate::leader_scheduler::make_active_set_entries; use crate::leader_scheduler::make_active_set_entries;
use crate::streamer::responder; use crate::streamer::responder;
use std::cmp::min;
use std::fs::remove_dir_all; use std::fs::remove_dir_all;
use std::sync::atomic::Ordering;
use std::thread::sleep;
#[test] #[test]
fn validator_exit() { fn validator_exit() {
@ -854,13 +867,12 @@ mod tests {
} }
#[test] #[test]
#[ignore] // TODO: Make this test less hacky
fn test_tvu_behind() { fn test_tvu_behind() {
solana_logger::setup(); solana_logger::setup();
// Make leader node // Make leader node
let ticks_per_slot = 5; let ticks_per_slot = 5;
let slots_per_epoch = 2; let slots_per_epoch = 1;
let leader_keypair = Arc::new(Keypair::new()); let leader_keypair = Arc::new(Keypair::new());
let validator_keypair = Arc::new(Keypair::new()); let validator_keypair = Arc::new(Keypair::new());
@ -885,6 +897,18 @@ mod tests {
slots_per_epoch, slots_per_epoch,
ticks_per_slot * slots_per_epoch, ticks_per_slot * slots_per_epoch,
)); ));
let config = PohServiceConfig::Sleep(Duration::from_millis(200));
fullnode_config.tick_config = config;
info!("Start up a listener");
let blob_receiver_exit = Arc::new(AtomicBool::new(false));
let (_, _, mut listening_node, _) = make_listening_node(&leader_node.info);
let (blob_fetch_sender, blob_fetch_receiver) = channel();
let blob_fetch_stage = BlobFetchStage::new(
Arc::new(listening_node.sockets.tvu.pop().unwrap()),
&blob_fetch_sender,
blob_receiver_exit.clone(),
);
let voting_keypair = VotingKeypair::new_local(&leader_keypair); let voting_keypair = VotingKeypair::new_local(&leader_keypair);
info!("Start the bootstrap leader"); info!("Start the bootstrap leader");
@ -897,9 +921,14 @@ mod tests {
&fullnode_config, &fullnode_config,
); );
info!("Hold Tvu bank lock to prevent tvu from making progress"); let (rotation_sender, rotation_receiver) = channel();
{
let w_last_ids = leader.bank.last_ids().write().unwrap(); info!("Pause the Tvu");
let pause_tvu = leader.node_services.tvu.get_pause();
pause_tvu.store(true, Ordering::Relaxed);
// Wait for convergence
converge(&leader_node_info, 2);
info!("Wait for leader -> validator transition"); info!("Wait for leader -> validator transition");
let signal = leader let signal = leader
@ -910,25 +939,32 @@ mod tests {
rn_sender.send(signal).expect("send"); rn_sender.send(signal).expect("send");
leader.to_validator_receiver = rn_receiver; leader.to_validator_receiver = rn_receiver;
info!("Make sure the tvu bank is behind"); info!("Make sure the tvu bank has not reached the last tick for the slot (the last tick is ticks_per_slot - 1)");
assert_eq!(w_last_ids.tick_height, 2); {
let w_last_ids = leader.bank.last_ids().write().unwrap();
assert!(w_last_ids.tick_height < ticks_per_slot - 1);
} }
// Release tvu bank lock, tvu should start making progress again and should signal a // Clear the blobs we've recieved so far. After this rotation, we should
// rotate. After rotation it will still be the slot leader as a new leader schedule has // no longer receive blobs from slot 0
// not been computed yet (still in epoch 0) while let Ok(_) = blob_fetch_receiver.try_recv() {}
info!("Release tvu bank lock");
let (rotation_sender, rotation_receiver) = channel();
let leader_exit = leader.run(Some(rotation_sender)); let leader_exit = leader.run(Some(rotation_sender));
// Wait for leader_to_validator() function execution to trigger a leader to leader rotation
sleep(Duration::from_millis(1000));
// Tvu bank lock is released here, so tvu should start making progress again and should signal a
// rotation. After rotation it will still be the slot leader as a new leader schedule has
// not been computed yet (still in epoch 0). In the next epoch (epoch 1), the node will
// transition to a validator.
info!("Unpause the Tvu");
pause_tvu.store(false, Ordering::Relaxed);
let expected_rotations = vec![ let expected_rotations = vec![
(FullnodeReturnType::LeaderToLeaderRotation, ticks_per_slot), (FullnodeReturnType::LeaderToLeaderRotation, ticks_per_slot),
(
FullnodeReturnType::LeaderToLeaderRotation,
2 * ticks_per_slot,
),
( (
FullnodeReturnType::LeaderToValidatorRotation, FullnodeReturnType::LeaderToValidatorRotation,
3 * ticks_per_slot, 2 * ticks_per_slot,
), ),
]; ];
@ -943,6 +979,28 @@ mod tests {
info!("Shut down"); info!("Shut down");
leader_exit(); leader_exit();
// Make sure that after rotation we don't receive any blobs from slot 0 (make sure
// broadcast started again at the correct place)
while let Ok(new_blobs) = blob_fetch_receiver.try_recv() {
for blob in new_blobs {
assert_ne!(blob.read().unwrap().slot(), 0);
}
}
// Check the ledger to make sure the PoH chains
{
let blocktree = Blocktree::open(&leader_ledger_path).unwrap();
let entries: Vec<_> = (0..3)
.flat_map(|slot_height| blocktree.get_slot_entries(slot_height, 0, None).unwrap())
.collect();
assert!(entries[1..].verify(&entries[0].id))
}
blob_receiver_exit.store(true, Ordering::Relaxed);
blob_fetch_stage.join().unwrap();
Blocktree::destroy(&leader_ledger_path).expect("Expected successful database destruction"); Blocktree::destroy(&leader_ledger_path).expect("Expected successful database destruction");
let _ignored = remove_dir_all(&leader_ledger_path).unwrap(); let _ignored = remove_dir_all(&leader_ledger_path).unwrap();
} }
@ -989,8 +1047,10 @@ mod tests {
let non_tick_active_entries_len = active_set_entries.len() - num_ending_ticks as usize; let non_tick_active_entries_len = active_set_entries.len() - num_ending_ticks as usize;
let remaining_ticks_in_zeroth_slot = ticks_per_block - num_genesis_ticks; let remaining_ticks_in_zeroth_slot = ticks_per_block - num_genesis_ticks;
let entries_for_zeroth_slot = let entries_for_zeroth_slot = min(
non_tick_active_entries_len + remaining_ticks_in_zeroth_slot as usize; active_set_entries.len(),
non_tick_active_entries_len + remaining_ticks_in_zeroth_slot as usize,
);
let entry_chunks: Vec<_> = active_set_entries[entries_for_zeroth_slot..] let entry_chunks: Vec<_> = active_set_entries[entries_for_zeroth_slot..]
.chunks(ticks_per_block as usize) .chunks(ticks_per_block as usize)
.collect(); .collect();

View File

@ -1,14 +1,18 @@
//! The `gossip_service` module implements the network control plane. //! The `gossip_service` module implements the network control plane.
use crate::blocktree::Blocktree; use crate::blocktree::Blocktree;
use crate::cluster_info::ClusterInfo; use crate::cluster_info::{ClusterInfo, Node, NodeInfo};
use crate::service::Service; use crate::service::Service;
use crate::streamer; use crate::streamer;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::signature::{Keypair, KeypairUtil};
use std::net::UdpSocket; use std::net::UdpSocket;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::channel; use std::sync::mpsc::channel;
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use std::thread::sleep;
use std::thread::{self, JoinHandle}; use std::thread::{self, JoinHandle};
use std::time::Duration;
pub struct GossipService { pub struct GossipService {
exit: Arc<AtomicBool>, exit: Arc<AtomicBool>,
@ -51,6 +55,82 @@ impl GossipService {
} }
} }
pub fn make_listening_node(
leader: &NodeInfo,
) -> (GossipService, Arc<RwLock<ClusterInfo>>, Node, Pubkey) {
let keypair = Keypair::new();
let exit = Arc::new(AtomicBool::new(false));
let new_node = Node::new_localhost_with_pubkey(keypair.pubkey());
let new_node_info = new_node.info.clone();
let id = new_node.info.id;
let mut new_node_cluster_info = ClusterInfo::new_with_keypair(new_node_info, Arc::new(keypair));
new_node_cluster_info.insert_info(leader.clone());
new_node_cluster_info.set_leader(leader.id);
let new_node_cluster_info_ref = Arc::new(RwLock::new(new_node_cluster_info));
let gossip_service = GossipService::new(
&new_node_cluster_info_ref,
None,
new_node
.sockets
.gossip
.try_clone()
.expect("Failed to clone gossip"),
exit.clone(),
);
(gossip_service, new_node_cluster_info_ref, new_node, id)
}
pub fn converge(node: &NodeInfo, num_nodes: usize) -> Vec<NodeInfo> {
info!("Wait for convergence with {} nodes", num_nodes);
// Let's spy on the network
let (gossip_service, spy_ref, id) = make_spy_node(node);
trace!(
"converge spy_node {} looking for at least {} nodes",
id,
num_nodes
);
// Wait for the cluster to converge
for _ in 0..15 {
let rpc_peers = spy_ref.read().unwrap().rpc_peers();
if rpc_peers.len() == num_nodes {
debug!("converge found {} nodes: {:?}", rpc_peers.len(), rpc_peers);
gossip_service.close().unwrap();
return rpc_peers;
}
debug!(
"converge found {} nodes, need {} more",
rpc_peers.len(),
num_nodes - rpc_peers.len()
);
sleep(Duration::new(1, 0));
}
panic!("Failed to converge");
}
pub fn make_spy_node(leader: &NodeInfo) -> (GossipService, Arc<RwLock<ClusterInfo>>, Pubkey) {
let keypair = Keypair::new();
let exit = Arc::new(AtomicBool::new(false));
let mut spy = Node::new_localhost_with_pubkey(keypair.pubkey());
let id = spy.info.id;
let daddr = "0.0.0.0:0".parse().unwrap();
spy.info.tvu = daddr;
spy.info.rpc = daddr;
let mut spy_cluster_info = ClusterInfo::new_with_keypair(spy.info, Arc::new(keypair));
spy_cluster_info.insert_info(leader.clone());
spy_cluster_info.set_leader(leader.id);
let spy_cluster_info_ref = Arc::new(RwLock::new(spy_cluster_info));
let gossip_service = GossipService::new(
&spy_cluster_info_ref,
None,
spy.sockets.gossip,
exit.clone(),
);
(gossip_service, spy_cluster_info_ref, id)
}
impl Service for GossipService { impl Service for GossipService {
type JoinReturnType = (); type JoinReturnType = ();

View File

@ -24,7 +24,11 @@ use solana_sdk::vote_transaction::VoteTransaction;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::mpsc::{channel, Receiver, SyncSender}; use std::sync::mpsc::{channel, Receiver, SyncSender};
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
#[cfg(test)]
use std::thread::sleep;
use std::thread::{self, Builder, JoinHandle}; use std::thread::{self, Builder, JoinHandle};
#[cfg(test)]
use std::time::Duration;
use std::time::Instant; use std::time::Instant;
pub const MAX_ENTRY_RECV_PER_ITER: usize = 512; pub const MAX_ENTRY_RECV_PER_ITER: usize = 512;
@ -51,6 +55,8 @@ pub struct ReplayStage {
t_replay: JoinHandle<()>, t_replay: JoinHandle<()>,
exit: Arc<AtomicBool>, exit: Arc<AtomicBool>,
ledger_signal_sender: SyncSender<bool>, ledger_signal_sender: SyncSender<bool>,
#[cfg(test)]
pause: Arc<AtomicBool>,
} }
impl ReplayStage { impl ReplayStage {
@ -90,11 +96,12 @@ impl ReplayStage {
duration_as_ms(&now.elapsed()) as usize duration_as_ms(&now.elapsed()) as usize
); );
let num_ticks = bank.tick_height();
let mut num_ticks_to_next_vote = bank let mut num_ticks_to_next_vote = bank
.leader_scheduler .leader_scheduler
.read() .read()
.unwrap() .unwrap()
.num_ticks_left_in_slot(bank.tick_height()); .num_ticks_left_in_slot(num_ticks);
for (i, entry) in entries.iter().enumerate() { for (i, entry) in entries.iter().enumerate() {
inc_new_counter_info!("replicate-stage_bank-tick", bank.tick_height() as usize); inc_new_counter_info!("replicate-stage_bank-tick", bank.tick_height() as usize);
@ -188,7 +195,12 @@ impl ReplayStage {
) -> (Self, EntryReceiver) { ) -> (Self, EntryReceiver) {
let (ledger_entry_sender, ledger_entry_receiver) = channel(); let (ledger_entry_sender, ledger_entry_receiver) = channel();
let mut entry_stream = entry_stream.cloned().map(EntryStream::new); let mut entry_stream = entry_stream.cloned().map(EntryStream::new);
#[cfg(test)]
let (pause, pause_) = {
let pause = Arc::new(AtomicBool::new(false));
let pause_ = pause.clone();
(pause, pause_)
};
let exit_ = exit.clone(); let exit_ = exit.clone();
let t_replay = Builder::new() let t_replay = Builder::new()
.name("solana-replay-stage".to_string()) .name("solana-replay-stage".to_string())
@ -215,6 +227,10 @@ impl ReplayStage {
if exit_.load(Ordering::Relaxed) { if exit_.load(Ordering::Relaxed) {
break; break;
} }
#[cfg(test)]
while pause_.load(Ordering::Relaxed) {
sleep(Duration::from_millis(200));
}
if current_slot.is_none() { if current_slot.is_none() {
let new_slot = Self::get_next_slot( let new_slot = Self::get_next_slot(
@ -309,11 +325,18 @@ impl ReplayStage {
t_replay, t_replay,
exit, exit,
ledger_signal_sender, ledger_signal_sender,
#[cfg(test)]
pause,
}, },
ledger_entry_receiver, ledger_entry_receiver,
) )
} }
#[cfg(test)]
pub fn get_pause(&self) -> Arc<AtomicBool> {
self.pause.clone()
}
pub fn close(self) -> thread::Result<()> { pub fn close(self) -> thread::Result<()> {
self.exit(); self.exit();
self.join() self.join()

View File

@ -158,6 +158,11 @@ impl Tvu {
) )
} }
#[cfg(test)]
pub fn get_pause(&self) -> Arc<AtomicBool> {
self.replay_stage.get_pause()
}
pub fn get_state(&self) -> Hash { pub fn get_state(&self) -> Hash {
*self.last_entry_id.read().unwrap() *self.last_entry_id.read().unwrap()
} }

View File

@ -3,10 +3,10 @@ use solana::blob_fetch_stage::BlobFetchStage;
use solana::blocktree::{create_tmp_sample_ledger, tmp_copy_ledger}; use solana::blocktree::{create_tmp_sample_ledger, tmp_copy_ledger};
use solana::blocktree::{Blocktree, BlocktreeConfig, DEFAULT_SLOT_HEIGHT}; use solana::blocktree::{Blocktree, BlocktreeConfig, DEFAULT_SLOT_HEIGHT};
use solana::client::mk_client; use solana::client::mk_client;
use solana::cluster_info::{ClusterInfo, Node, NodeInfo}; use solana::cluster_info::{Node, NodeInfo};
use solana::entry::{reconstruct_entries_from_blobs, Entry}; use solana::entry::{reconstruct_entries_from_blobs, Entry};
use solana::fullnode::{new_bank_from_ledger, Fullnode, FullnodeConfig, FullnodeReturnType}; use solana::fullnode::{new_bank_from_ledger, Fullnode, FullnodeConfig, FullnodeReturnType};
use solana::gossip_service::GossipService; use solana::gossip_service::{converge, make_listening_node};
use solana::leader_scheduler::{make_active_set_entries, LeaderSchedulerConfig}; use solana::leader_scheduler::{make_active_set_entries, LeaderSchedulerConfig};
use solana::packet::SharedBlob; use solana::packet::SharedBlob;
use solana::result; use solana::result;
@ -34,82 +34,6 @@ fn read_ledger(ledger_path: &str) -> Vec<Entry> {
.collect() .collect()
} }
fn make_spy_node(leader: &NodeInfo) -> (GossipService, Arc<RwLock<ClusterInfo>>, Pubkey) {
let keypair = Keypair::new();
let exit = Arc::new(AtomicBool::new(false));
let mut spy = Node::new_localhost_with_pubkey(keypair.pubkey());
let id = spy.info.id.clone();
let daddr = "0.0.0.0:0".parse().unwrap();
spy.info.tvu = daddr;
spy.info.rpc = daddr;
let mut spy_cluster_info = ClusterInfo::new_with_keypair(spy.info, Arc::new(keypair));
spy_cluster_info.insert_info(leader.clone());
spy_cluster_info.set_leader(leader.id);
let spy_cluster_info_ref = Arc::new(RwLock::new(spy_cluster_info));
let gossip_service = GossipService::new(
&spy_cluster_info_ref,
None,
spy.sockets.gossip,
exit.clone(),
);
(gossip_service, spy_cluster_info_ref, id)
}
fn make_listening_node(
leader: &NodeInfo,
) -> (GossipService, Arc<RwLock<ClusterInfo>>, Node, Pubkey) {
let keypair = Keypair::new();
let exit = Arc::new(AtomicBool::new(false));
let new_node = Node::new_localhost_with_pubkey(keypair.pubkey());
let new_node_info = new_node.info.clone();
let id = new_node.info.id.clone();
let mut new_node_cluster_info = ClusterInfo::new_with_keypair(new_node_info, Arc::new(keypair));
new_node_cluster_info.insert_info(leader.clone());
new_node_cluster_info.set_leader(leader.id);
let new_node_cluster_info_ref = Arc::new(RwLock::new(new_node_cluster_info));
let gossip_service = GossipService::new(
&new_node_cluster_info_ref,
None,
new_node
.sockets
.gossip
.try_clone()
.expect("Failed to clone gossip"),
exit.clone(),
);
(gossip_service, new_node_cluster_info_ref, new_node, id)
}
fn converge(node: &NodeInfo, num_nodes: usize) -> Vec<NodeInfo> {
info!("Wait for convergence with {} nodes", num_nodes);
// Let's spy on the network
let (gossip_service, spy_ref, id) = make_spy_node(node);
trace!(
"converge spy_node {} looking for at least {} nodes",
id,
num_nodes
);
// Wait for the cluster to converge
for _ in 0..15 {
let rpc_peers = spy_ref.read().unwrap().rpc_peers();
if rpc_peers.len() == num_nodes {
debug!("converge found {} nodes: {:?}", rpc_peers.len(), rpc_peers);
gossip_service.close().unwrap();
return rpc_peers;
}
debug!(
"converge found {} nodes, need {} more",
rpc_peers.len(),
num_nodes - rpc_peers.len()
);
sleep(Duration::new(1, 0));
}
panic!("Failed to converge");
}
#[test] #[test]
fn test_multi_node_ledger_window() -> result::Result<()> { fn test_multi_node_ledger_window() -> result::Result<()> {
solana_logger::setup(); solana_logger::setup();