From e7383a7e6631fb04e2228cd805be6d2dc2bb9687 Mon Sep 17 00:00:00 2001 From: carllin Date: Tue, 25 Sep 2018 15:41:29 -0700 Subject: [PATCH] Validator to leader (#1303) * Add check in window_service to exit in checks for leader rotation, and propagate that service exit up to fullnode * Added logic to shutdown Tvu once ReplicateStage finishes * Added test for successfully shutting down validator and starting up leader * Add test for leader validator interaction * fix streamer to check for exit signal before checking socket again to prevent busy leaders from never returning * PR comments - Rewrite make_consecutive_blobs() function, revert genesis function change --- src/broadcast_stage.rs | 6 ++ src/crdt.rs | 23 +++++- src/fullnode.rs | 158 ++++++++++++++++++++++++++++++++++++++-- src/ledger.rs | 32 +++++++- src/packet.rs | 23 ++++++ src/replicate_stage.rs | 53 ++++++++++---- src/replicator.rs | 4 +- src/retransmit_stage.rs | 32 +++++--- src/streamer.rs | 11 +-- src/tvu.rs | 38 +++++++--- src/window.rs | 74 ++++++++++++++----- src/window_service.rs | 158 ++++++++++++++++++++++++++++++---------- tests/multinode.rs | 126 +++++++++++++++++++++++++++++++- 13 files changed, 629 insertions(+), 109 deletions(-) diff --git a/src/broadcast_stage.rs b/src/broadcast_stage.rs index 7b998af24c..78f3105333 100644 --- a/src/broadcast_stage.rs +++ b/src/broadcast_stage.rs @@ -27,6 +27,8 @@ pub enum BroadcastStageReturnType { } fn broadcast( + crdt: &Arc>, + leader_rotation_interval: u64, node_info: &NodeInfo, broadcast_table: &[NodeInfo], window: &SharedWindow, @@ -120,6 +122,8 @@ fn broadcast( // Send blobs out from the window Crdt::broadcast( + crdt, + leader_rotation_interval, &node_info, &broadcast_table, &window, @@ -202,6 +206,8 @@ impl BroadcastStage { let broadcast_table = crdt.read().unwrap().compute_broadcast_table(); if let Err(e) = broadcast( + crdt, + leader_rotation_interval, &me, &broadcast_table, &window, diff --git a/src/crdt.rs b/src/crdt.rs index f024ee37b1..958e99b441 100644 --- a/src/crdt.rs +++ b/src/crdt.rs @@ -515,6 +515,8 @@ impl Crdt { /// # Remarks /// We need to avoid having obj locked while doing any io, such as the `send_to` pub fn broadcast( + crdt: &Arc>, + leader_rotation_interval: u64, me: &NodeInfo, broadcast_table: &[NodeInfo], window: &SharedWindow, @@ -538,8 +540,10 @@ impl Crdt { let old_transmit_index = transmit_index.data; // enumerate all the blobs in the window, those are the indices - // transmit them to nodes, starting from a different node - let mut orders = Vec::with_capacity((received_index - transmit_index.data) as usize); + // transmit them to nodes, starting from a different node. Add one + // to the capacity in case we want to send an extra blob notifying the + // next leader about the blob right before leader rotation + let mut orders = Vec::with_capacity((received_index - transmit_index.data + 1) as usize); let window_l = window.read().unwrap(); let mut br_idx = transmit_index.data as usize % broadcast_table.len(); @@ -554,6 +558,21 @@ impl Crdt { br_idx ); + // Make sure the next leader in line knows about the last entry before rotation + // so he can initiate repairs if necessary + let entry_height = idx + 1; + if entry_height % leader_rotation_interval == 0 { + let next_leader_id = crdt.read().unwrap().get_scheduled_leader(entry_height); + if next_leader_id.is_some() && next_leader_id != Some(me.id) { + let info_result = broadcast_table + .iter() + .position(|n| n.id == next_leader_id.unwrap()); + if let Some(index) = info_result { + orders.push((window_l[w_idx].data.clone(), &broadcast_table[index])); + } + } + } + orders.push((window_l[w_idx].data.clone(), &broadcast_table[br_idx])); br_idx += 1; br_idx %= broadcast_table.len(); diff --git a/src/fullnode.rs b/src/fullnode.rs index 66cbe8567f..3c204e371b 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -17,7 +17,7 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, RwLock}; use std::thread::Result; use tpu::{Tpu, TpuReturnType}; -use tvu::Tvu; +use tvu::{Tvu, TvuReturnType}; use untrusted::Input; use window; @@ -58,12 +58,12 @@ impl ValidatorServices { ValidatorServices { tvu } } - pub fn join(self) -> Result<()> { + pub fn join(self) -> Result> { self.tvu.join() } pub fn exit(&self) -> () { - //TODO: implement exit for Tvu + self.tvu.exit() } } @@ -81,10 +81,13 @@ pub struct Fullnode { bank: Arc, crdt: Arc>, ledger_path: String, + sigverify_disabled: bool, shared_window: window::SharedWindow, replicate_socket: Vec, repair_socket: UdpSocket, retransmit_socket: UdpSocket, + transaction_sockets: Vec, + broadcast_socket: UdpSocket, requests_socket: UdpSocket, respond_socket: UdpSocket, } @@ -307,7 +310,6 @@ impl Fullnode { .try_clone() .expect("Failed to clone retransmit socket"), Some(ledger_path), - exit.clone(), ); let validator_state = ValidatorServices::new(tvu); node_role = Some(NodeRole::Validator(validator_state)); @@ -353,6 +355,7 @@ impl Fullnode { crdt, shared_window, bank, + sigverify_disabled, rpu, ncp, rpc_service, @@ -362,6 +365,8 @@ impl Fullnode { replicate_socket: node.sockets.replicate, repair_socket: node.sockets.repair, retransmit_socket: node.sockets.retransmit, + transaction_sockets: node.sockets.transaction, + broadcast_socket: node.sockets.broadcast, requests_socket: node.sockets.requests, respond_socket: node.sockets.respond, } @@ -417,13 +422,45 @@ impl Fullnode { .try_clone() .expect("Failed to clone retransmit socket"), Some(&self.ledger_path), - self.exit.clone(), ); let validator_state = ValidatorServices::new(tvu); self.node_role = Some(NodeRole::Validator(validator_state)); Ok(()) } + fn validator_to_leader(&mut self, entry_height: u64) { + self.crdt.write().unwrap().set_leader(self.keypair.pubkey()); + let tick_duration = None; + // TODO: To light up PoH, uncomment the following line: + //let tick_duration = Some(Duration::from_millis(1000)); + let (tpu, blob_receiver, tpu_exit) = Tpu::new( + self.keypair.clone(), + &self.bank, + &self.crdt, + tick_duration, + self.transaction_sockets + .iter() + .map(|s| s.try_clone().expect("Failed to clone transaction sockets")) + .collect(), + &self.ledger_path, + self.sigverify_disabled, + entry_height, + ); + + let broadcast_stage = BroadcastStage::new( + self.broadcast_socket + .try_clone() + .expect("Failed to clone broadcast socket"), + self.crdt.clone(), + self.shared_window.clone(), + entry_height, + blob_receiver, + tpu_exit, + ); + let leader_state = LeaderServices::new(tpu, broadcast_stage); + self.node_role = Some(NodeRole::Leader(leader_state)); + } + pub fn handle_role_transition(&mut self) -> Result> { let node_role = self.node_role.take(); match node_role { @@ -435,6 +472,10 @@ impl Fullnode { _ => Ok(None), }, Some(NodeRole::Validator(validator_services)) => match validator_services.join()? { + Some(TvuReturnType::LeaderRotation(entry_height)) => { + self.validator_to_leader(entry_height); + Ok(Some(FullnodeReturnType::LeaderRotation)) + } _ => Ok(None), }, None => Ok(None), @@ -494,7 +535,9 @@ impl Service for Fullnode { match self.node_role { Some(NodeRole::Validator(validator_service)) => { - validator_service.join()?; + if let Some(TvuReturnType::LeaderRotation(_)) = validator_service.join()? { + return Ok(Some(FullnodeReturnType::LeaderRotation)); + } } Some(NodeRole::Leader(leader_service)) => { if let Some(TpuReturnType::LeaderRotation) = leader_service.join()? { @@ -512,11 +555,17 @@ impl Service for Fullnode { mod tests { use bank::Bank; use crdt::Node; - use fullnode::Fullnode; + use fullnode::{Fullnode, FullnodeReturnType}; use ledger::genesis; + use packet::{make_consecutive_blobs, BlobRecycler}; use service::Service; use signature::{Keypair, KeypairUtil}; + use std::cmp; use std::fs::remove_dir_all; + use std::net::UdpSocket; + use std::sync::mpsc::channel; + use std::sync::Arc; + use streamer::responder; #[test] fn validator_exit() { @@ -540,6 +589,7 @@ mod tests { v.close().unwrap(); remove_dir_all(validator_ledger_path).unwrap(); } + #[test] fn validator_parallel_exit() { let mut ledger_paths = vec![]; @@ -578,4 +628,98 @@ mod tests { remove_dir_all(path).unwrap(); } } + + #[test] + fn test_validator_to_leader_transition() { + // Make a leader identity + let leader_keypair = Keypair::new(); + let leader_node = Node::new_localhost_with_pubkey(leader_keypair.pubkey()); + let leader_id = leader_node.info.id; + let leader_ncp = leader_node.info.contact_info.ncp; + + // Start the validator node + let leader_rotation_interval = 10; + let (mint, validator_ledger_path) = genesis("test_validator_to_leader_transition", 10_000); + let validator_keypair = Keypair::new(); + let validator_node = Node::new_localhost_with_pubkey(validator_keypair.pubkey()); + let validator_info = validator_node.info.clone(); + let mut validator = Fullnode::new( + validator_node, + &validator_ledger_path, + validator_keypair, + Some(leader_ncp), + false, + Some(leader_rotation_interval), + ); + + // Set the leader schedule for the validator + let my_leader_begin_epoch = 2; + for i in 0..my_leader_begin_epoch { + validator.set_scheduled_leader(leader_id, leader_rotation_interval * i); + } + validator.set_scheduled_leader( + validator_info.id, + my_leader_begin_epoch * leader_rotation_interval, + ); + + // Send blobs to the validator from our mock leader + let resp_recycler = BlobRecycler::default(); + let t_responder = { + let (s_responder, r_responder) = channel(); + let blob_sockets: Vec> = leader_node + .sockets + .replicate + .into_iter() + .map(Arc::new) + .collect(); + + let t_responder = responder( + "test_validator_to_leader_transition", + blob_sockets[0].clone(), + r_responder, + ); + + // Send the blobs out of order, in reverse. Also send an extra + // "extra_blobs" number of blobs to make sure the window stops in the right place. + let extra_blobs = cmp::max(leader_rotation_interval / 3, 1); + let total_blobs_to_send = + my_leader_begin_epoch * leader_rotation_interval + extra_blobs; + let genesis_entries = mint.create_entries(); + let last_id = genesis_entries + .last() + .expect("expected at least one genesis entry") + .id; + let tvu_address = &validator_info.contact_info.tvu; + let msgs = make_consecutive_blobs( + leader_id, + total_blobs_to_send, + last_id, + &tvu_address, + &resp_recycler, + ).into_iter() + .rev() + .collect(); + s_responder.send(msgs).expect("send"); + t_responder + }; + + // Wait for validator to shut down tvu and restart tpu + match validator.handle_role_transition().unwrap() { + Some(FullnodeReturnType::LeaderRotation) => (), + _ => panic!("Expected reason for exit to be leader rotation"), + } + + // Check the validator ledger to make sure it's the right height + let (_, entry_height, _) = Fullnode::new_bank_from_ledger(&validator_ledger_path); + + assert_eq!( + entry_height, + my_leader_begin_epoch * leader_rotation_interval + ); + + // Shut down + t_responder.join().expect("responder thread join"); + validator.close().unwrap(); + remove_dir_all(&validator_ledger_path).unwrap(); + } } diff --git a/src/ledger.rs b/src/ledger.rs index ccb713a5e0..00c637699b 100644 --- a/src/ledger.rs +++ b/src/ledger.rs @@ -19,6 +19,7 @@ use std::fs::{create_dir_all, remove_dir_all, File, OpenOptions}; use std::io::prelude::*; use std::io::{self, BufReader, BufWriter, Seek, SeekFrom}; use std::mem::size_of; +use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::path::Path; use transaction::Transaction; use window::WINDOW_SIZE; @@ -402,6 +403,13 @@ pub trait Block { /// Verifies the hashes and counts of a slice of transactions are all consistent. fn verify(&self, start_hash: &Hash) -> bool; fn to_blobs(&self, blob_recycler: &packet::BlobRecycler) -> Vec; + fn to_blobs_with_id( + &self, + blob_recycler: &packet::BlobRecycler, + id: Pubkey, + start_id: u64, + addr: &SocketAddr, + ) -> Vec; fn votes(&self) -> Vec<(Pubkey, Vote, Hash)>; } @@ -422,10 +430,28 @@ impl Block for [Entry] { }) } - fn to_blobs(&self, blob_recycler: &packet::BlobRecycler) -> Vec { + fn to_blobs_with_id( + &self, + blob_recycler: &packet::BlobRecycler, + id: Pubkey, + start_idx: u64, + addr: &SocketAddr, + ) -> Vec { self.iter() - .map(|entry| entry.to_blob(blob_recycler, None, None, None)) - .collect() + .enumerate() + .map(|(i, entry)| { + entry.to_blob( + blob_recycler, + Some(start_idx + i as u64), + Some(id), + Some(&addr), + ) + }).collect() + } + + fn to_blobs(&self, blob_recycler: &packet::BlobRecycler) -> Vec { + let default_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0); + self.to_blobs_with_id(blob_recycler, Pubkey::default(), 0, &default_addr) } fn votes(&self) -> Vec<(Pubkey, Vote, Hash)> { diff --git a/src/packet.rs b/src/packet.rs index a453293e37..6f3d706fe2 100644 --- a/src/packet.rs +++ b/src/packet.rs @@ -2,6 +2,10 @@ use bincode::{deserialize, serialize}; use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt}; use counter::Counter; +#[cfg(test)] +use hash::Hash; +#[cfg(test)] +use ledger::{next_entries_mut, Block}; use log::Level; use recvmmsg::{recv_mmsg, NUM_RCVMMSGS}; use recycler; @@ -429,6 +433,25 @@ impl Blob { } } +#[cfg(test)] +pub fn make_consecutive_blobs( + me_id: Pubkey, + num_blobs_to_make: u64, + start_hash: Hash, + addr: &SocketAddr, + resp_recycler: &BlobRecycler, +) -> SharedBlobs { + let mut last_hash = start_hash; + let mut num_hashes = 0; + let mut all_entries = Vec::with_capacity(num_blobs_to_make as usize); + for _ in 0..num_blobs_to_make { + all_entries.extend(next_entries_mut(&mut last_hash, &mut num_hashes, vec![])); + } + let mut new_blobs = all_entries.to_blobs_with_id(&resp_recycler, me_id, 0, addr); + new_blobs.truncate(num_blobs_to_make as usize); + new_blobs +} + #[cfg(test)] mod tests { use packet::{ diff --git a/src/replicate_stage.rs b/src/replicate_stage.rs index fcc71a152b..3010322576 100644 --- a/src/replicate_stage.rs +++ b/src/replicate_stage.rs @@ -11,7 +11,7 @@ use result::{Error, Result}; use service::Service; use signature::Keypair; use std::net::UdpSocket; -use std::sync::atomic::AtomicUsize; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::mpsc::channel; use std::sync::mpsc::RecvTimeoutError; use std::sync::{Arc, RwLock}; @@ -20,6 +20,24 @@ use std::time::Duration; use streamer::{responder, BlobSender}; use vote_stage::send_validator_vote; +// Implement a destructor for the ReplicateStage thread to signal it exited +// even on panics +struct Finalizer { + exit_sender: Arc, +} + +impl Finalizer { + fn new(exit_sender: Arc) -> Self { + Finalizer { exit_sender } + } +} +// Implement a destructor for Finalizer. +impl Drop for Finalizer { + fn drop(&mut self) { + self.exit_sender.clone().store(true, Ordering::Relaxed); + } +} + pub struct ReplicateStage { thread_hdls: Vec>, } @@ -63,12 +81,14 @@ impl ReplicateStage { res?; Ok(()) } + pub fn new( keypair: Arc, bank: Arc, crdt: Arc>, window_receiver: EntryReceiver, ledger_path: Option<&str>, + exit: Arc, ) -> Self { let (vote_blob_sender, vote_blob_receiver) = channel(); let send = UdpSocket::bind("0.0.0.0:0").expect("bind"); @@ -80,20 +100,23 @@ impl ReplicateStage { let t_replicate = Builder::new() .name("solana-replicate-stage".to_string()) - .spawn(move || loop { - if let Err(e) = Self::replicate_requests( - &bank, - &crdt, - &blob_recycler, - &window_receiver, - ledger_writer.as_mut(), - &keypair, - &vote_blob_sender, - ) { - match e { - Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, - Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (), - _ => error!("{:?}", e), + .spawn(move || { + let _exit = Finalizer::new(exit);; + loop { + if let Err(e) = Self::replicate_requests( + &bank, + &crdt, + &blob_recycler, + &window_receiver, + ledger_writer.as_mut(), + &keypair, + &vote_blob_sender, + ) { + match e { + Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, + Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (), + _ => error!("{:?}", e), + } } } }).unwrap(); diff --git a/src/replicator.rs b/src/replicator.rs index cc5ccf7009..e3bcfb00a1 100644 --- a/src/replicator.rs +++ b/src/replicator.rs @@ -12,13 +12,13 @@ use std::time::Duration; use store_ledger_stage::StoreLedgerStage; use streamer::BlobReceiver; use window; -use window_service::window_service; +use window_service::{window_service, WindowServiceReturnType}; pub struct Replicator { ncp: Ncp, fetch_stage: BlobFetchStage, store_ledger_stage: StoreLedgerStage, - t_window: JoinHandle<()>, + t_window: JoinHandle>, pub retransmit_receiver: BlobReceiver, } diff --git a/src/retransmit_stage.rs b/src/retransmit_stage.rs index bd15dd3409..60ad922438 100644 --- a/src/retransmit_stage.rs +++ b/src/retransmit_stage.rs @@ -15,7 +15,12 @@ use std::thread::{self, Builder, JoinHandle}; use std::time::Duration; use streamer::BlobReceiver; use window::SharedWindow; -use window_service::window_service; +use window_service::{window_service, WindowServiceReturnType}; + +#[derive(Debug, PartialEq, Eq, Clone)] +pub enum RetransmitStageReturnType { + LeaderRotation(u64), +} fn retransmit(crdt: &Arc>, r: &BlobReceiver, sock: &UdpSocket) -> Result<()> { let timer = Duration::new(1, 0); @@ -58,7 +63,8 @@ fn retransmitter(sock: Arc, crdt: Arc>, r: BlobReceiver) } pub struct RetransmitStage { - thread_hdls: Vec>, + t_retransmit: JoinHandle<()>, + t_window: JoinHandle>, } impl RetransmitStage { @@ -83,19 +89,27 @@ impl RetransmitStage { retransmit_sender, repair_socket, ); - let thread_hdls = vec![t_retransmit, t_window]; - (RetransmitStage { thread_hdls }, entry_receiver) + ( + RetransmitStage { + t_window, + t_retransmit, + }, + entry_receiver, + ) } } impl Service for RetransmitStage { - type JoinReturnType = (); + type JoinReturnType = Option; - fn join(self) -> thread::Result<()> { - for thread_hdl in self.thread_hdls { - thread_hdl.join()?; + fn join(self) -> thread::Result> { + self.t_retransmit.join()?; + match self.t_window.join()? { + Some(WindowServiceReturnType::LeaderRotation(entry_height)) => Ok(Some( + RetransmitStageReturnType::LeaderRotation(entry_height), + )), + _ => Ok(None), } - Ok(()) } } diff --git a/src/streamer.rs b/src/streamer.rs index 8783973c4e..22042ae218 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -27,6 +27,11 @@ fn recv_loop( loop { let msgs = re.allocate(); loop { + // Check for exit signal, even if socket is busy + // (for instance the leader trasaction socket) + if exit.load(Ordering::Relaxed) { + return Ok(()); + } let result = msgs.write().recv_from(sock); match result { Ok(()) => { @@ -39,11 +44,7 @@ fn recv_loop( channel.send(msgs)?; break; } - Err(_) => { - if exit.load(Ordering::Relaxed) { - return Ok(()); - } - } + Err(_) => (), } } } diff --git a/src/tvu.rs b/src/tvu.rs index 236176b50c..25d3086b75 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -40,19 +40,25 @@ use bank::Bank; use blob_fetch_stage::BlobFetchStage; use crdt::Crdt; use replicate_stage::ReplicateStage; -use retransmit_stage::RetransmitStage; +use retransmit_stage::{RetransmitStage, RetransmitStageReturnType}; use service::Service; use signature::Keypair; use std::net::UdpSocket; -use std::sync::atomic::AtomicBool; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, RwLock}; use std::thread; use window::SharedWindow; +#[derive(Debug, PartialEq, Eq, Clone)] +pub enum TvuReturnType { + LeaderRotation(u64), +} + pub struct Tvu { replicate_stage: ReplicateStage, fetch_stage: BlobFetchStage, retransmit_stage: RetransmitStage, + exit: Arc, } impl Tvu { @@ -78,8 +84,9 @@ impl Tvu { repair_socket: UdpSocket, retransmit_socket: UdpSocket, ledger_path: Option<&str>, - exit: Arc, ) -> Self { + let exit = Arc::new(AtomicBool::new(false)); + let repair_socket = Arc::new(repair_socket); let mut blob_sockets: Vec> = replicate_sockets.into_iter().map(Arc::new).collect(); @@ -104,30 +111,39 @@ impl Tvu { crdt, blob_window_receiver, ledger_path, + exit.clone(), ); Tvu { replicate_stage, fetch_stage, retransmit_stage, + exit, } } - pub fn close(self) -> thread::Result<()> { + pub fn exit(&self) -> () { + self.exit.store(true, Ordering::Relaxed); + } + + pub fn close(self) -> thread::Result> { self.fetch_stage.close(); self.join() } } impl Service for Tvu { - type JoinReturnType = (); + type JoinReturnType = Option; - fn join(self) -> thread::Result<()> { + fn join(self) -> thread::Result> { self.replicate_stage.join()?; self.fetch_stage.join()?; - self.retransmit_stage.join()?; - - Ok(()) + match self.retransmit_stage.join()? { + Some(RetransmitStageReturnType::LeaderRotation(entry_height)) => { + Ok(Some(TvuReturnType::LeaderRotation(entry_height))) + } + _ => Ok(None), + } } } @@ -145,7 +161,7 @@ pub mod tests { use service::Service; use signature::{Keypair, KeypairUtil}; use std::net::UdpSocket; - use std::sync::atomic::AtomicBool; + use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::channel; use std::sync::{Arc, RwLock}; use std::time::Duration; @@ -233,7 +249,6 @@ pub mod tests { target1.sockets.repair, target1.sockets.retransmit, None, - exit.clone(), ); let mut alice_ref_balance = starting_balance; @@ -297,6 +312,7 @@ pub mod tests { assert_eq!(bob_balance, starting_balance - alice_ref_balance); tvu.close().expect("close"); + exit.store(true, Ordering::Relaxed); dr_l.0.join().expect("join"); dr_2.0.join().expect("join"); dr_1.0.join().expect("join"); diff --git a/src/window.rs b/src/window.rs index 43cfd982b8..fe68fe957c 100644 --- a/src/window.rs +++ b/src/window.rs @@ -65,6 +65,7 @@ pub trait WindowUtil { fn process_blob( &mut self, id: &Pubkey, + crdt: &Arc>, blob: SharedBlob, pix: u64, consume_queue: &mut Vec, @@ -72,6 +73,7 @@ pub trait WindowUtil { consumed: &mut u64, leader_unknown: bool, pending_retransmits: &mut bool, + leader_rotation_interval: u64, ); } @@ -98,16 +100,25 @@ impl WindowUtil for Window { consumed: u64, received: u64, ) -> Vec<(SocketAddr, Vec)> { - let num_peers = crdt.read().unwrap().table.len() as u64; - let max_repair = calculate_max_repair(num_peers, consumed, received, times); + let rcrdt = crdt.read().unwrap(); + let leader_rotation_interval = rcrdt.get_leader_rotation_interval(); + // Calculate the next leader rotation height and check if we are the leader + let next_leader_rotation = + consumed + leader_rotation_interval - (consumed % leader_rotation_interval); + let is_next_leader = rcrdt.get_scheduled_leader(next_leader_rotation) == Some(*id); + let num_peers = rcrdt.table.len() as u64; + let max_repair = calculate_max_repair(num_peers, consumed, received, times, is_next_leader); let idxs = self.clear_slots(consumed, max_repair); let reqs: Vec<_> = idxs .into_iter() - .filter_map(|pix| crdt.read().unwrap().window_index_request(pix).ok()) + .filter_map(|pix| rcrdt.window_index_request(pix).ok()) .collect(); + drop(rcrdt); + inc_new_counter_info!("streamer-repair_window-repair", reqs.len()); + if log_enabled!(Level::Trace) { trace!( "{}: repair_window counter times: {} consumed: {} received: {} max_repair: {} missing: {}", @@ -178,6 +189,7 @@ impl WindowUtil for Window { fn process_blob( &mut self, id: &Pubkey, + crdt: &Arc>, blob: SharedBlob, pix: u64, consume_queue: &mut Vec, @@ -185,6 +197,7 @@ impl WindowUtil for Window { consumed: &mut u64, leader_unknown: bool, pending_retransmits: &mut bool, + leader_rotation_interval: u64, ) { let w = (pix % WINDOW_SIZE) as usize; @@ -251,6 +264,18 @@ impl WindowUtil for Window { // push all contiguous blobs into consumed queue, increment consumed loop { + if *consumed != 0 && *consumed % (leader_rotation_interval as u64) == 0 { + let rcrdt = crdt.read().unwrap(); + let my_id = rcrdt.my_data().id; + match rcrdt.get_scheduled_leader(*consumed) { + // If we are the next leader, exit + Some(id) if id == my_id => { + break; + } + _ => (), + } + } + let k = (*consumed % WINDOW_SIZE) as usize; trace!("{}: k: {} consumed: {}", id, k, *consumed,); @@ -286,13 +311,20 @@ impl WindowUtil for Window { } } -fn calculate_max_repair(num_peers: u64, consumed: u64, received: u64, times: usize) -> u64 { +fn calculate_max_repair( + num_peers: u64, + consumed: u64, + received: u64, + times: usize, + is_next_leader: bool, +) -> u64 { // Calculate the highest blob index that this node should have already received // via avalanche. The avalanche splits data stream into nodes and each node retransmits // the data to their peer nodes. So there's a possibility that a blob (with index lower // than current received index) is being retransmitted by a peer node. - let max_repair = if times >= 8 { - // if repair backoff is getting high, don't wait for avalanche + let max_repair = if times >= 8 || is_next_leader { + // if repair backoff is getting high, or if we are the next leader, + // don't wait for avalanche cmp::max(consumed, received) } else { cmp::max(consumed, received.saturating_sub(num_peers)) @@ -484,29 +516,37 @@ mod test { #[test] pub fn calculate_max_repair_test() { - assert_eq!(calculate_max_repair(0, 10, 90, 0), 90); - assert_eq!(calculate_max_repair(15, 10, 90, 32), 90); - assert_eq!(calculate_max_repair(15, 10, 90, 0), 75); - assert_eq!(calculate_max_repair(90, 10, 90, 0), 10); - assert_eq!(calculate_max_repair(90, 10, 50, 0), 10); - assert_eq!(calculate_max_repair(90, 10, 99, 0), 10); - assert_eq!(calculate_max_repair(90, 10, 101, 0), 11); + assert_eq!(calculate_max_repair(0, 10, 90, 0, false), 90); + assert_eq!(calculate_max_repair(15, 10, 90, 32, false), 90); + assert_eq!(calculate_max_repair(15, 10, 90, 0, false), 75); + assert_eq!(calculate_max_repair(90, 10, 90, 0, false), 10); + assert_eq!(calculate_max_repair(90, 10, 50, 0, false), 10); + assert_eq!(calculate_max_repair(90, 10, 99, 0, false), 10); + assert_eq!(calculate_max_repair(90, 10, 101, 0, false), 11); assert_eq!( - calculate_max_repair(90, 10, 95 + WINDOW_SIZE, 0), + calculate_max_repair(90, 10, 95 + WINDOW_SIZE, 0, false), WINDOW_SIZE + 5 ); assert_eq!( - calculate_max_repair(90, 10, 99 + WINDOW_SIZE, 0), + calculate_max_repair(90, 10, 99 + WINDOW_SIZE, 0, false), WINDOW_SIZE + 9 ); assert_eq!( - calculate_max_repair(90, 10, 100 + WINDOW_SIZE, 0), + calculate_max_repair(90, 10, 100 + WINDOW_SIZE, 0, false), WINDOW_SIZE + 9 ); assert_eq!( - calculate_max_repair(90, 10, 120 + WINDOW_SIZE, 0), + calculate_max_repair(90, 10, 120 + WINDOW_SIZE, 0, false), WINDOW_SIZE + 9 ); + assert_eq!( + calculate_max_repair(50, 100, 50 + WINDOW_SIZE, 0, false), + WINDOW_SIZE + ); + assert_eq!( + calculate_max_repair(50, 100, 50 + WINDOW_SIZE, 0, true), + 50 + WINDOW_SIZE + ); } fn wrap_blob_idx_in_window(id: &Pubkey, pix: u64, consumed: u64, received: u64) -> (bool, u64) { diff --git a/src/window_service.rs b/src/window_service.rs index 7efcf3ab6e..6c50d913f9 100644 --- a/src/window_service.rs +++ b/src/window_service.rs @@ -20,6 +20,11 @@ use window::{blob_idx_in_window, SharedWindow, WindowUtil}; pub const MAX_REPAIR_BACKOFF: usize = 128; +#[derive(Debug, PartialEq, Eq, Clone)] +pub enum WindowServiceReturnType { + LeaderRotation(u64), +} + fn repair_backoff(last: &mut u64, times: &mut usize, consumed: u64) -> bool { //exponential backoff if *last != consumed { @@ -148,6 +153,7 @@ fn recv_window( s: &EntrySender, retransmit: &BlobSender, pending_retransmits: &mut bool, + leader_rotation_interval: u64, ) -> Result<()> { let timer = Duration::from_millis(200); let mut dq = r.recv_timeout(timer)?; @@ -200,6 +206,7 @@ fn recv_window( window.write().unwrap().process_blob( id, + crdt, b, pix, &mut consume_queue, @@ -207,6 +214,7 @@ fn recv_window( consumed, leader_unknown, pending_retransmits, + leader_rotation_interval, ); } if log_enabled!(Level::Trace) { @@ -236,7 +244,7 @@ pub fn window_service( s: EntrySender, retransmit: BlobSender, repair_socket: Arc, -) -> JoinHandle<()> { +) -> JoinHandle> { Builder::new() .name("solana-window".to_string()) .spawn(move || { @@ -244,11 +252,31 @@ pub fn window_service( let mut received = entry_height; let mut last = entry_height; let mut times = 0; - let id = crdt.read().unwrap().id; + let id; + let leader_rotation_interval; + { + let rcrdt = crdt.read().unwrap(); + id = rcrdt.id; + leader_rotation_interval = rcrdt.get_leader_rotation_interval(); + } let mut pending_retransmits = false; let recycler = BlobRecycler::default(); trace!("{}: RECV_WINDOW started", id); loop { + if consumed != 0 && consumed % (leader_rotation_interval as u64) == 0 { + match crdt.read().unwrap().get_scheduled_leader(consumed) { + // If we are the next leader, exit + Some(next_leader_id) if id == next_leader_id => { + return Some(WindowServiceReturnType::LeaderRotation(consumed)); + } + // TODO: Figure out where to set the new leader in the crdt for + // validator -> validator transition (once we have real leader scheduling, + // this decision will be clearer). Also make sure new blobs to window actually + // originate from new leader + _ => (), + } + } + if let Err(e) = recv_window( &window, &id, @@ -260,6 +288,7 @@ pub fn window_service( &s, &retransmit, &mut pending_retransmits, + leader_rotation_interval, ) { match e { Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, @@ -297,6 +326,7 @@ pub fn window_service( }); } } + None }).unwrap() } @@ -305,47 +335,17 @@ mod test { use crdt::{Crdt, Node}; use entry::Entry; use hash::Hash; - use ledger::next_entries_mut; use logger; - use packet::{BlobRecycler, SharedBlobs, PACKET_DATA_SIZE}; - use signature::Pubkey; - use std::net::{SocketAddr, UdpSocket}; + use packet::{make_consecutive_blobs, BlobRecycler, PACKET_DATA_SIZE}; + use signature::{Keypair, KeypairUtil}; + use std::net::UdpSocket; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::{channel, Receiver}; use std::sync::{Arc, RwLock}; use std::time::Duration; use streamer::{blob_receiver, responder}; use window::default_window; - use window_service::{repair_backoff, window_service}; - - fn make_consecutive_blobs( - me_id: Pubkey, - mut num_blobs_to_make: u64, - start_hash: Hash, - addr: &SocketAddr, - resp_recycler: &BlobRecycler, - ) -> SharedBlobs { - let mut msgs = Vec::new(); - let mut last_hash = start_hash; - let mut num_hashes = 0; - while num_blobs_to_make != 0 { - let new_entries = next_entries_mut(&mut last_hash, &mut num_hashes, vec![]); - let mut new_blobs: SharedBlobs = new_entries - .iter() - .enumerate() - .map(|(i, e)| { - let blob_index = num_blobs_to_make - i as u64 - 1; - let new_blob = - e.to_blob(&resp_recycler, Some(blob_index), Some(me_id), Some(addr)); - assert_eq!(blob_index, new_blob.read().get_index().unwrap()); - new_blob - }).collect(); - new_blobs.truncate(num_blobs_to_make as usize); - num_blobs_to_make -= new_blobs.len() as u64; - msgs.extend(new_blobs); - } - msgs - } + use window_service::{repair_backoff, window_service, WindowServiceReturnType}; fn get_entries(r: Receiver>, num: &mut usize) { for _t in 0..5 { @@ -401,7 +401,9 @@ mod test { Hash::default(), &gossip_address, &resp_recycler, - ); + ).into_iter() + .rev() + .collect();; s_responder.send(msgs).expect("send"); t_responder }; @@ -577,4 +579,86 @@ mod test { assert!(avg >= 3); assert!(avg <= 5); } + + #[test] + pub fn test_window_leader_rotation_exit() { + logger::setup(); + let leader_rotation_interval = 10; + // Height at which this node becomes the leader = + // my_leader_begin_epoch * leader_rotation_interval + let my_leader_begin_epoch = 2; + let tn = Node::new_localhost(); + let exit = Arc::new(AtomicBool::new(false)); + let mut crdt_me = Crdt::new(tn.info.clone()).expect("Crdt::new"); + let me_id = crdt_me.my_data().id; + + // Set myself in an upcoming epoch, but set the old_leader_id as the + // leader for all epochs before that + let old_leader_id = Keypair::new().pubkey(); + crdt_me.set_leader(me_id); + crdt_me.set_leader_rotation_interval(leader_rotation_interval); + for i in 0..my_leader_begin_epoch { + crdt_me.set_scheduled_leader(leader_rotation_interval * i, old_leader_id); + } + crdt_me.set_scheduled_leader(my_leader_begin_epoch * leader_rotation_interval, me_id); + + let subs = Arc::new(RwLock::new(crdt_me)); + + let resp_recycler = BlobRecycler::default(); + let (s_reader, r_reader) = channel(); + let t_receiver = blob_receiver(Arc::new(tn.sockets.gossip), exit.clone(), s_reader); + let (s_window, _r_window) = channel(); + let (s_retransmit, _r_retransmit) = channel(); + let win = Arc::new(RwLock::new(default_window())); + let t_window = window_service( + subs, + win, + 0, + r_reader, + s_window, + s_retransmit, + Arc::new(tn.sockets.repair), + ); + + let t_responder = { + let (s_responder, r_responder) = channel(); + let blob_sockets: Vec> = + tn.sockets.replicate.into_iter().map(Arc::new).collect(); + + let t_responder = responder( + "test_window_leader_rotation_exit", + blob_sockets[0].clone(), + r_responder, + ); + + let ncp_address = &tn.info.contact_info.ncp; + // Send the blobs out of order, in reverse. Also send an extra leader_rotation_interval + // number of blobs to make sure the window stops in the right place. + let extra_blobs = leader_rotation_interval; + let total_blobs_to_send = + my_leader_begin_epoch * leader_rotation_interval + extra_blobs; + let msgs = make_consecutive_blobs( + me_id, + total_blobs_to_send, + Hash::default(), + &ncp_address, + &resp_recycler, + ).into_iter() + .rev() + .collect();; + s_responder.send(msgs).expect("send"); + t_responder + }; + + assert_eq!( + Some(WindowServiceReturnType::LeaderRotation( + my_leader_begin_epoch * leader_rotation_interval + )), + t_window.join().expect("window service join") + ); + + t_responder.join().expect("responder thread join"); + exit.store(true, Ordering::Relaxed); + t_receiver.join().expect("receiver thread join"); + } } diff --git a/tests/multinode.rs b/tests/multinode.rs index 28997540bd..750ae363f1 100644 --- a/tests/multinode.rs +++ b/tests/multinode.rs @@ -9,7 +9,7 @@ use solana::crdt::{Crdt, Node, NodeInfo}; use solana::entry::Entry; use solana::fullnode::{Fullnode, FullnodeReturnType}; use solana::hash::Hash; -use solana::ledger::LedgerWriter; +use solana::ledger::{read_ledger, LedgerWriter}; use solana::logger; use solana::mint::Mint; use solana::ncp::Ncp; @@ -881,6 +881,130 @@ fn test_leader_to_validator_transition() { remove_dir_all(leader_ledger_path).unwrap(); } +#[test] +#[ignore] +fn test_leader_validator_basic() { + logger::setup(); + let leader_rotation_interval = 10; + + // Account that will be the sink for all the test's transactions + let bob_pubkey = Keypair::new().pubkey(); + + // Make a mint and a genesis entry in the leader ledger + let (mint, leader_ledger_path, genesis_entries) = + genesis("test_leader_validator_basic", 10_000); + let genesis_height = genesis_entries.len(); + + // Initialize the leader ledger + let mut ledger_paths = Vec::new(); + ledger_paths.push(leader_ledger_path.clone()); + + // Create the leader fullnode + let leader_keypair = Keypair::new(); + let leader_node = Node::new_localhost_with_pubkey(leader_keypair.pubkey()); + let leader_info = leader_node.info.clone(); + + let mut leader = Fullnode::new( + leader_node, + &leader_ledger_path, + leader_keypair, + None, + false, + Some(leader_rotation_interval), + ); + + // Send leader some tokens to vote + send_tx_and_retry_get_balance(&leader_info, &mint, &leader_info.id, 500, None).unwrap(); + + // Start the validator node + let validator_ledger_path = tmp_copy_ledger(&leader_ledger_path, "test_leader_validator_basic"); + let validator_keypair = Keypair::new(); + let validator_node = Node::new_localhost_with_pubkey(validator_keypair.pubkey()); + let validator_info = validator_node.info.clone(); + let mut validator = Fullnode::new( + validator_node, + &validator_ledger_path, + validator_keypair, + Some(leader_info.contact_info.ncp), + false, + Some(leader_rotation_interval), + ); + + ledger_paths.push(validator_ledger_path.clone()); + + // Set the leader schedule for the validator and leader + let my_leader_begin_epoch = 2; + for i in 0..my_leader_begin_epoch { + validator.set_scheduled_leader(leader_info.id, leader_rotation_interval * i); + leader.set_scheduled_leader(leader_info.id, leader_rotation_interval * i); + } + validator.set_scheduled_leader( + validator_info.id, + my_leader_begin_epoch * leader_rotation_interval, + ); + leader.set_scheduled_leader( + validator_info.id, + my_leader_begin_epoch * leader_rotation_interval, + ); + + // Wait for convergence + let servers = converge(&leader_info, 2); + assert_eq!(servers.len(), 2); + + // Send transactions to the leader + let extra_transactions = std::cmp::max(leader_rotation_interval / 3, 1); + let total_transactions_to_send = + my_leader_begin_epoch * leader_rotation_interval + extra_transactions; + + // Push "extra_transactions" past leader_rotation_interval entry height, + // make sure the validator stops. + for _ in genesis_height as u64..total_transactions_to_send { + send_tx_and_retry_get_balance(&leader_info, &mint, &bob_pubkey, 1, None); + } + + // Wait for validator to shut down tvu and restart tpu + match validator.handle_role_transition().unwrap() { + Some(FullnodeReturnType::LeaderRotation) => (), + _ => panic!("Expected reason for exit to be leader rotation"), + } + + // TODO: We ignore this test for now b/c there's a chance here that the crdt + // in the new leader calls the dummy sequence of update_leader -> top_leader() + // (see the TODOs in those functions) during gossip and sets the leader back + // to the old leader, which causes a panic from an assertion failure in crdt broadcast(), + // specifically: assert!(me.leader_id != v.id). We can enable this test once we have real + // leader scheduling + + // Wait for the leader to shut down tpu and restart tvu + match leader.handle_role_transition().unwrap() { + Some(FullnodeReturnType::LeaderRotation) => (), + _ => panic!("Expected reason for exit to be leader rotation"), + } + + // Shut down + validator.close().unwrap(); + leader.close().unwrap(); + + // Check the ledger of the validator to make sure the entry height is correct + // and that the old leader and the new leader's ledgers agree up to the point + // of leader rotation + let validator_entries = + read_ledger(&validator_ledger_path, true).expect("Expected parsing of validator ledger"); + let leader_entries = + read_ledger(&validator_ledger_path, true).expect("Expected parsing of leader ledger"); + + for (v, l) in validator_entries.zip(leader_entries) { + assert_eq!( + v.expect("expected valid validator entry"), + l.expect("expected valid leader entry") + ); + } + + for path in ledger_paths { + remove_dir_all(path).unwrap(); + } +} + fn mk_client(leader: &NodeInfo) -> ThinClient { let requests_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); requests_socket