Validator to leader (#1303)

* Add check in window_service to exit in checks for leader rotation, and propagate that service exit up to fullnode

* Added logic to shutdown Tvu once ReplicateStage finishes

* Added test for successfully shutting down validator and starting up leader

* Add test for leader validator interaction

* fix streamer to check for exit signal before checking socket again to prevent busy leaders from never returning

* PR comments - Rewrite make_consecutive_blobs() function, revert genesis function change
This commit is contained in:
carllin 2018-09-25 15:41:29 -07:00 committed by GitHub
parent 8a7545197f
commit e7383a7e66
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 629 additions and 109 deletions

View File

@ -27,6 +27,8 @@ pub enum BroadcastStageReturnType {
}
fn broadcast(
crdt: &Arc<RwLock<Crdt>>,
leader_rotation_interval: u64,
node_info: &NodeInfo,
broadcast_table: &[NodeInfo],
window: &SharedWindow,
@ -120,6 +122,8 @@ fn broadcast(
// Send blobs out from the window
Crdt::broadcast(
crdt,
leader_rotation_interval,
&node_info,
&broadcast_table,
&window,
@ -202,6 +206,8 @@ impl BroadcastStage {
let broadcast_table = crdt.read().unwrap().compute_broadcast_table();
if let Err(e) = broadcast(
crdt,
leader_rotation_interval,
&me,
&broadcast_table,
&window,

View File

@ -515,6 +515,8 @@ impl Crdt {
/// # Remarks
/// We need to avoid having obj locked while doing any io, such as the `send_to`
pub fn broadcast(
crdt: &Arc<RwLock<Crdt>>,
leader_rotation_interval: u64,
me: &NodeInfo,
broadcast_table: &[NodeInfo],
window: &SharedWindow,
@ -538,8 +540,10 @@ impl Crdt {
let old_transmit_index = transmit_index.data;
// enumerate all the blobs in the window, those are the indices
// transmit them to nodes, starting from a different node
let mut orders = Vec::with_capacity((received_index - transmit_index.data) as usize);
// transmit them to nodes, starting from a different node. Add one
// to the capacity in case we want to send an extra blob notifying the
// next leader about the blob right before leader rotation
let mut orders = Vec::with_capacity((received_index - transmit_index.data + 1) as usize);
let window_l = window.read().unwrap();
let mut br_idx = transmit_index.data as usize % broadcast_table.len();
@ -554,6 +558,21 @@ impl Crdt {
br_idx
);
// Make sure the next leader in line knows about the last entry before rotation
// so he can initiate repairs if necessary
let entry_height = idx + 1;
if entry_height % leader_rotation_interval == 0 {
let next_leader_id = crdt.read().unwrap().get_scheduled_leader(entry_height);
if next_leader_id.is_some() && next_leader_id != Some(me.id) {
let info_result = broadcast_table
.iter()
.position(|n| n.id == next_leader_id.unwrap());
if let Some(index) = info_result {
orders.push((window_l[w_idx].data.clone(), &broadcast_table[index]));
}
}
}
orders.push((window_l[w_idx].data.clone(), &broadcast_table[br_idx]));
br_idx += 1;
br_idx %= broadcast_table.len();

View File

@ -17,7 +17,7 @@ use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};
use std::thread::Result;
use tpu::{Tpu, TpuReturnType};
use tvu::Tvu;
use tvu::{Tvu, TvuReturnType};
use untrusted::Input;
use window;
@ -58,12 +58,12 @@ impl ValidatorServices {
ValidatorServices { tvu }
}
pub fn join(self) -> Result<()> {
pub fn join(self) -> Result<Option<TvuReturnType>> {
self.tvu.join()
}
pub fn exit(&self) -> () {
//TODO: implement exit for Tvu
self.tvu.exit()
}
}
@ -81,10 +81,13 @@ pub struct Fullnode {
bank: Arc<Bank>,
crdt: Arc<RwLock<Crdt>>,
ledger_path: String,
sigverify_disabled: bool,
shared_window: window::SharedWindow,
replicate_socket: Vec<UdpSocket>,
repair_socket: UdpSocket,
retransmit_socket: UdpSocket,
transaction_sockets: Vec<UdpSocket>,
broadcast_socket: UdpSocket,
requests_socket: UdpSocket,
respond_socket: UdpSocket,
}
@ -307,7 +310,6 @@ impl Fullnode {
.try_clone()
.expect("Failed to clone retransmit socket"),
Some(ledger_path),
exit.clone(),
);
let validator_state = ValidatorServices::new(tvu);
node_role = Some(NodeRole::Validator(validator_state));
@ -353,6 +355,7 @@ impl Fullnode {
crdt,
shared_window,
bank,
sigverify_disabled,
rpu,
ncp,
rpc_service,
@ -362,6 +365,8 @@ impl Fullnode {
replicate_socket: node.sockets.replicate,
repair_socket: node.sockets.repair,
retransmit_socket: node.sockets.retransmit,
transaction_sockets: node.sockets.transaction,
broadcast_socket: node.sockets.broadcast,
requests_socket: node.sockets.requests,
respond_socket: node.sockets.respond,
}
@ -417,13 +422,45 @@ impl Fullnode {
.try_clone()
.expect("Failed to clone retransmit socket"),
Some(&self.ledger_path),
self.exit.clone(),
);
let validator_state = ValidatorServices::new(tvu);
self.node_role = Some(NodeRole::Validator(validator_state));
Ok(())
}
fn validator_to_leader(&mut self, entry_height: u64) {
self.crdt.write().unwrap().set_leader(self.keypair.pubkey());
let tick_duration = None;
// TODO: To light up PoH, uncomment the following line:
//let tick_duration = Some(Duration::from_millis(1000));
let (tpu, blob_receiver, tpu_exit) = Tpu::new(
self.keypair.clone(),
&self.bank,
&self.crdt,
tick_duration,
self.transaction_sockets
.iter()
.map(|s| s.try_clone().expect("Failed to clone transaction sockets"))
.collect(),
&self.ledger_path,
self.sigverify_disabled,
entry_height,
);
let broadcast_stage = BroadcastStage::new(
self.broadcast_socket
.try_clone()
.expect("Failed to clone broadcast socket"),
self.crdt.clone(),
self.shared_window.clone(),
entry_height,
blob_receiver,
tpu_exit,
);
let leader_state = LeaderServices::new(tpu, broadcast_stage);
self.node_role = Some(NodeRole::Leader(leader_state));
}
pub fn handle_role_transition(&mut self) -> Result<Option<FullnodeReturnType>> {
let node_role = self.node_role.take();
match node_role {
@ -435,6 +472,10 @@ impl Fullnode {
_ => Ok(None),
},
Some(NodeRole::Validator(validator_services)) => match validator_services.join()? {
Some(TvuReturnType::LeaderRotation(entry_height)) => {
self.validator_to_leader(entry_height);
Ok(Some(FullnodeReturnType::LeaderRotation))
}
_ => Ok(None),
},
None => Ok(None),
@ -494,7 +535,9 @@ impl Service for Fullnode {
match self.node_role {
Some(NodeRole::Validator(validator_service)) => {
validator_service.join()?;
if let Some(TvuReturnType::LeaderRotation(_)) = validator_service.join()? {
return Ok(Some(FullnodeReturnType::LeaderRotation));
}
}
Some(NodeRole::Leader(leader_service)) => {
if let Some(TpuReturnType::LeaderRotation) = leader_service.join()? {
@ -512,11 +555,17 @@ impl Service for Fullnode {
mod tests {
use bank::Bank;
use crdt::Node;
use fullnode::Fullnode;
use fullnode::{Fullnode, FullnodeReturnType};
use ledger::genesis;
use packet::{make_consecutive_blobs, BlobRecycler};
use service::Service;
use signature::{Keypair, KeypairUtil};
use std::cmp;
use std::fs::remove_dir_all;
use std::net::UdpSocket;
use std::sync::mpsc::channel;
use std::sync::Arc;
use streamer::responder;
#[test]
fn validator_exit() {
@ -540,6 +589,7 @@ mod tests {
v.close().unwrap();
remove_dir_all(validator_ledger_path).unwrap();
}
#[test]
fn validator_parallel_exit() {
let mut ledger_paths = vec![];
@ -578,4 +628,98 @@ mod tests {
remove_dir_all(path).unwrap();
}
}
#[test]
fn test_validator_to_leader_transition() {
// Make a leader identity
let leader_keypair = Keypair::new();
let leader_node = Node::new_localhost_with_pubkey(leader_keypair.pubkey());
let leader_id = leader_node.info.id;
let leader_ncp = leader_node.info.contact_info.ncp;
// Start the validator node
let leader_rotation_interval = 10;
let (mint, validator_ledger_path) = genesis("test_validator_to_leader_transition", 10_000);
let validator_keypair = Keypair::new();
let validator_node = Node::new_localhost_with_pubkey(validator_keypair.pubkey());
let validator_info = validator_node.info.clone();
let mut validator = Fullnode::new(
validator_node,
&validator_ledger_path,
validator_keypair,
Some(leader_ncp),
false,
Some(leader_rotation_interval),
);
// Set the leader schedule for the validator
let my_leader_begin_epoch = 2;
for i in 0..my_leader_begin_epoch {
validator.set_scheduled_leader(leader_id, leader_rotation_interval * i);
}
validator.set_scheduled_leader(
validator_info.id,
my_leader_begin_epoch * leader_rotation_interval,
);
// Send blobs to the validator from our mock leader
let resp_recycler = BlobRecycler::default();
let t_responder = {
let (s_responder, r_responder) = channel();
let blob_sockets: Vec<Arc<UdpSocket>> = leader_node
.sockets
.replicate
.into_iter()
.map(Arc::new)
.collect();
let t_responder = responder(
"test_validator_to_leader_transition",
blob_sockets[0].clone(),
r_responder,
);
// Send the blobs out of order, in reverse. Also send an extra
// "extra_blobs" number of blobs to make sure the window stops in the right place.
let extra_blobs = cmp::max(leader_rotation_interval / 3, 1);
let total_blobs_to_send =
my_leader_begin_epoch * leader_rotation_interval + extra_blobs;
let genesis_entries = mint.create_entries();
let last_id = genesis_entries
.last()
.expect("expected at least one genesis entry")
.id;
let tvu_address = &validator_info.contact_info.tvu;
let msgs = make_consecutive_blobs(
leader_id,
total_blobs_to_send,
last_id,
&tvu_address,
&resp_recycler,
).into_iter()
.rev()
.collect();
s_responder.send(msgs).expect("send");
t_responder
};
// Wait for validator to shut down tvu and restart tpu
match validator.handle_role_transition().unwrap() {
Some(FullnodeReturnType::LeaderRotation) => (),
_ => panic!("Expected reason for exit to be leader rotation"),
}
// Check the validator ledger to make sure it's the right height
let (_, entry_height, _) = Fullnode::new_bank_from_ledger(&validator_ledger_path);
assert_eq!(
entry_height,
my_leader_begin_epoch * leader_rotation_interval
);
// Shut down
t_responder.join().expect("responder thread join");
validator.close().unwrap();
remove_dir_all(&validator_ledger_path).unwrap();
}
}

View File

@ -19,6 +19,7 @@ use std::fs::{create_dir_all, remove_dir_all, File, OpenOptions};
use std::io::prelude::*;
use std::io::{self, BufReader, BufWriter, Seek, SeekFrom};
use std::mem::size_of;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::path::Path;
use transaction::Transaction;
use window::WINDOW_SIZE;
@ -402,6 +403,13 @@ pub trait Block {
/// Verifies the hashes and counts of a slice of transactions are all consistent.
fn verify(&self, start_hash: &Hash) -> bool;
fn to_blobs(&self, blob_recycler: &packet::BlobRecycler) -> Vec<SharedBlob>;
fn to_blobs_with_id(
&self,
blob_recycler: &packet::BlobRecycler,
id: Pubkey,
start_id: u64,
addr: &SocketAddr,
) -> Vec<SharedBlob>;
fn votes(&self) -> Vec<(Pubkey, Vote, Hash)>;
}
@ -422,10 +430,28 @@ impl Block for [Entry] {
})
}
fn to_blobs(&self, blob_recycler: &packet::BlobRecycler) -> Vec<SharedBlob> {
fn to_blobs_with_id(
&self,
blob_recycler: &packet::BlobRecycler,
id: Pubkey,
start_idx: u64,
addr: &SocketAddr,
) -> Vec<SharedBlob> {
self.iter()
.map(|entry| entry.to_blob(blob_recycler, None, None, None))
.collect()
.enumerate()
.map(|(i, entry)| {
entry.to_blob(
blob_recycler,
Some(start_idx + i as u64),
Some(id),
Some(&addr),
)
}).collect()
}
fn to_blobs(&self, blob_recycler: &packet::BlobRecycler) -> Vec<SharedBlob> {
let default_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0);
self.to_blobs_with_id(blob_recycler, Pubkey::default(), 0, &default_addr)
}
fn votes(&self) -> Vec<(Pubkey, Vote, Hash)> {

View File

@ -2,6 +2,10 @@
use bincode::{deserialize, serialize};
use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
use counter::Counter;
#[cfg(test)]
use hash::Hash;
#[cfg(test)]
use ledger::{next_entries_mut, Block};
use log::Level;
use recvmmsg::{recv_mmsg, NUM_RCVMMSGS};
use recycler;
@ -429,6 +433,25 @@ impl Blob {
}
}
#[cfg(test)]
pub fn make_consecutive_blobs(
me_id: Pubkey,
num_blobs_to_make: u64,
start_hash: Hash,
addr: &SocketAddr,
resp_recycler: &BlobRecycler,
) -> SharedBlobs {
let mut last_hash = start_hash;
let mut num_hashes = 0;
let mut all_entries = Vec::with_capacity(num_blobs_to_make as usize);
for _ in 0..num_blobs_to_make {
all_entries.extend(next_entries_mut(&mut last_hash, &mut num_hashes, vec![]));
}
let mut new_blobs = all_entries.to_blobs_with_id(&resp_recycler, me_id, 0, addr);
new_blobs.truncate(num_blobs_to_make as usize);
new_blobs
}
#[cfg(test)]
mod tests {
use packet::{

View File

@ -11,7 +11,7 @@ use result::{Error, Result};
use service::Service;
use signature::Keypair;
use std::net::UdpSocket;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::mpsc::channel;
use std::sync::mpsc::RecvTimeoutError;
use std::sync::{Arc, RwLock};
@ -20,6 +20,24 @@ use std::time::Duration;
use streamer::{responder, BlobSender};
use vote_stage::send_validator_vote;
// Implement a destructor for the ReplicateStage thread to signal it exited
// even on panics
struct Finalizer {
exit_sender: Arc<AtomicBool>,
}
impl Finalizer {
fn new(exit_sender: Arc<AtomicBool>) -> Self {
Finalizer { exit_sender }
}
}
// Implement a destructor for Finalizer.
impl Drop for Finalizer {
fn drop(&mut self) {
self.exit_sender.clone().store(true, Ordering::Relaxed);
}
}
pub struct ReplicateStage {
thread_hdls: Vec<JoinHandle<()>>,
}
@ -63,12 +81,14 @@ impl ReplicateStage {
res?;
Ok(())
}
pub fn new(
keypair: Arc<Keypair>,
bank: Arc<Bank>,
crdt: Arc<RwLock<Crdt>>,
window_receiver: EntryReceiver,
ledger_path: Option<&str>,
exit: Arc<AtomicBool>,
) -> Self {
let (vote_blob_sender, vote_blob_receiver) = channel();
let send = UdpSocket::bind("0.0.0.0:0").expect("bind");
@ -80,20 +100,23 @@ impl ReplicateStage {
let t_replicate = Builder::new()
.name("solana-replicate-stage".to_string())
.spawn(move || loop {
if let Err(e) = Self::replicate_requests(
&bank,
&crdt,
&blob_recycler,
&window_receiver,
ledger_writer.as_mut(),
&keypair,
&vote_blob_sender,
) {
match e {
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
_ => error!("{:?}", e),
.spawn(move || {
let _exit = Finalizer::new(exit);;
loop {
if let Err(e) = Self::replicate_requests(
&bank,
&crdt,
&blob_recycler,
&window_receiver,
ledger_writer.as_mut(),
&keypair,
&vote_blob_sender,
) {
match e {
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
_ => error!("{:?}", e),
}
}
}
}).unwrap();

View File

@ -12,13 +12,13 @@ use std::time::Duration;
use store_ledger_stage::StoreLedgerStage;
use streamer::BlobReceiver;
use window;
use window_service::window_service;
use window_service::{window_service, WindowServiceReturnType};
pub struct Replicator {
ncp: Ncp,
fetch_stage: BlobFetchStage,
store_ledger_stage: StoreLedgerStage,
t_window: JoinHandle<()>,
t_window: JoinHandle<Option<WindowServiceReturnType>>,
pub retransmit_receiver: BlobReceiver,
}

View File

@ -15,7 +15,12 @@ use std::thread::{self, Builder, JoinHandle};
use std::time::Duration;
use streamer::BlobReceiver;
use window::SharedWindow;
use window_service::window_service;
use window_service::{window_service, WindowServiceReturnType};
#[derive(Debug, PartialEq, Eq, Clone)]
pub enum RetransmitStageReturnType {
LeaderRotation(u64),
}
fn retransmit(crdt: &Arc<RwLock<Crdt>>, r: &BlobReceiver, sock: &UdpSocket) -> Result<()> {
let timer = Duration::new(1, 0);
@ -58,7 +63,8 @@ fn retransmitter(sock: Arc<UdpSocket>, crdt: Arc<RwLock<Crdt>>, r: BlobReceiver)
}
pub struct RetransmitStage {
thread_hdls: Vec<JoinHandle<()>>,
t_retransmit: JoinHandle<()>,
t_window: JoinHandle<Option<WindowServiceReturnType>>,
}
impl RetransmitStage {
@ -83,19 +89,27 @@ impl RetransmitStage {
retransmit_sender,
repair_socket,
);
let thread_hdls = vec![t_retransmit, t_window];
(RetransmitStage { thread_hdls }, entry_receiver)
(
RetransmitStage {
t_window,
t_retransmit,
},
entry_receiver,
)
}
}
impl Service for RetransmitStage {
type JoinReturnType = ();
type JoinReturnType = Option<RetransmitStageReturnType>;
fn join(self) -> thread::Result<()> {
for thread_hdl in self.thread_hdls {
thread_hdl.join()?;
fn join(self) -> thread::Result<Option<RetransmitStageReturnType>> {
self.t_retransmit.join()?;
match self.t_window.join()? {
Some(WindowServiceReturnType::LeaderRotation(entry_height)) => Ok(Some(
RetransmitStageReturnType::LeaderRotation(entry_height),
)),
_ => Ok(None),
}
Ok(())
}
}

View File

@ -27,6 +27,11 @@ fn recv_loop(
loop {
let msgs = re.allocate();
loop {
// Check for exit signal, even if socket is busy
// (for instance the leader trasaction socket)
if exit.load(Ordering::Relaxed) {
return Ok(());
}
let result = msgs.write().recv_from(sock);
match result {
Ok(()) => {
@ -39,11 +44,7 @@ fn recv_loop(
channel.send(msgs)?;
break;
}
Err(_) => {
if exit.load(Ordering::Relaxed) {
return Ok(());
}
}
Err(_) => (),
}
}
}

View File

@ -40,19 +40,25 @@ use bank::Bank;
use blob_fetch_stage::BlobFetchStage;
use crdt::Crdt;
use replicate_stage::ReplicateStage;
use retransmit_stage::RetransmitStage;
use retransmit_stage::{RetransmitStage, RetransmitStageReturnType};
use service::Service;
use signature::Keypair;
use std::net::UdpSocket;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};
use std::thread;
use window::SharedWindow;
#[derive(Debug, PartialEq, Eq, Clone)]
pub enum TvuReturnType {
LeaderRotation(u64),
}
pub struct Tvu {
replicate_stage: ReplicateStage,
fetch_stage: BlobFetchStage,
retransmit_stage: RetransmitStage,
exit: Arc<AtomicBool>,
}
impl Tvu {
@ -78,8 +84,9 @@ impl Tvu {
repair_socket: UdpSocket,
retransmit_socket: UdpSocket,
ledger_path: Option<&str>,
exit: Arc<AtomicBool>,
) -> Self {
let exit = Arc::new(AtomicBool::new(false));
let repair_socket = Arc::new(repair_socket);
let mut blob_sockets: Vec<Arc<UdpSocket>> =
replicate_sockets.into_iter().map(Arc::new).collect();
@ -104,30 +111,39 @@ impl Tvu {
crdt,
blob_window_receiver,
ledger_path,
exit.clone(),
);
Tvu {
replicate_stage,
fetch_stage,
retransmit_stage,
exit,
}
}
pub fn close(self) -> thread::Result<()> {
pub fn exit(&self) -> () {
self.exit.store(true, Ordering::Relaxed);
}
pub fn close(self) -> thread::Result<Option<TvuReturnType>> {
self.fetch_stage.close();
self.join()
}
}
impl Service for Tvu {
type JoinReturnType = ();
type JoinReturnType = Option<TvuReturnType>;
fn join(self) -> thread::Result<()> {
fn join(self) -> thread::Result<Option<TvuReturnType>> {
self.replicate_stage.join()?;
self.fetch_stage.join()?;
self.retransmit_stage.join()?;
Ok(())
match self.retransmit_stage.join()? {
Some(RetransmitStageReturnType::LeaderRotation(entry_height)) => {
Ok(Some(TvuReturnType::LeaderRotation(entry_height)))
}
_ => Ok(None),
}
}
}
@ -145,7 +161,7 @@ pub mod tests {
use service::Service;
use signature::{Keypair, KeypairUtil};
use std::net::UdpSocket;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::channel;
use std::sync::{Arc, RwLock};
use std::time::Duration;
@ -233,7 +249,6 @@ pub mod tests {
target1.sockets.repair,
target1.sockets.retransmit,
None,
exit.clone(),
);
let mut alice_ref_balance = starting_balance;
@ -297,6 +312,7 @@ pub mod tests {
assert_eq!(bob_balance, starting_balance - alice_ref_balance);
tvu.close().expect("close");
exit.store(true, Ordering::Relaxed);
dr_l.0.join().expect("join");
dr_2.0.join().expect("join");
dr_1.0.join().expect("join");

View File

@ -65,6 +65,7 @@ pub trait WindowUtil {
fn process_blob(
&mut self,
id: &Pubkey,
crdt: &Arc<RwLock<Crdt>>,
blob: SharedBlob,
pix: u64,
consume_queue: &mut Vec<Entry>,
@ -72,6 +73,7 @@ pub trait WindowUtil {
consumed: &mut u64,
leader_unknown: bool,
pending_retransmits: &mut bool,
leader_rotation_interval: u64,
);
}
@ -98,16 +100,25 @@ impl WindowUtil for Window {
consumed: u64,
received: u64,
) -> Vec<(SocketAddr, Vec<u8>)> {
let num_peers = crdt.read().unwrap().table.len() as u64;
let max_repair = calculate_max_repair(num_peers, consumed, received, times);
let rcrdt = crdt.read().unwrap();
let leader_rotation_interval = rcrdt.get_leader_rotation_interval();
// Calculate the next leader rotation height and check if we are the leader
let next_leader_rotation =
consumed + leader_rotation_interval - (consumed % leader_rotation_interval);
let is_next_leader = rcrdt.get_scheduled_leader(next_leader_rotation) == Some(*id);
let num_peers = rcrdt.table.len() as u64;
let max_repair = calculate_max_repair(num_peers, consumed, received, times, is_next_leader);
let idxs = self.clear_slots(consumed, max_repair);
let reqs: Vec<_> = idxs
.into_iter()
.filter_map(|pix| crdt.read().unwrap().window_index_request(pix).ok())
.filter_map(|pix| rcrdt.window_index_request(pix).ok())
.collect();
drop(rcrdt);
inc_new_counter_info!("streamer-repair_window-repair", reqs.len());
if log_enabled!(Level::Trace) {
trace!(
"{}: repair_window counter times: {} consumed: {} received: {} max_repair: {} missing: {}",
@ -178,6 +189,7 @@ impl WindowUtil for Window {
fn process_blob(
&mut self,
id: &Pubkey,
crdt: &Arc<RwLock<Crdt>>,
blob: SharedBlob,
pix: u64,
consume_queue: &mut Vec<Entry>,
@ -185,6 +197,7 @@ impl WindowUtil for Window {
consumed: &mut u64,
leader_unknown: bool,
pending_retransmits: &mut bool,
leader_rotation_interval: u64,
) {
let w = (pix % WINDOW_SIZE) as usize;
@ -251,6 +264,18 @@ impl WindowUtil for Window {
// push all contiguous blobs into consumed queue, increment consumed
loop {
if *consumed != 0 && *consumed % (leader_rotation_interval as u64) == 0 {
let rcrdt = crdt.read().unwrap();
let my_id = rcrdt.my_data().id;
match rcrdt.get_scheduled_leader(*consumed) {
// If we are the next leader, exit
Some(id) if id == my_id => {
break;
}
_ => (),
}
}
let k = (*consumed % WINDOW_SIZE) as usize;
trace!("{}: k: {} consumed: {}", id, k, *consumed,);
@ -286,13 +311,20 @@ impl WindowUtil for Window {
}
}
fn calculate_max_repair(num_peers: u64, consumed: u64, received: u64, times: usize) -> u64 {
fn calculate_max_repair(
num_peers: u64,
consumed: u64,
received: u64,
times: usize,
is_next_leader: bool,
) -> u64 {
// Calculate the highest blob index that this node should have already received
// via avalanche. The avalanche splits data stream into nodes and each node retransmits
// the data to their peer nodes. So there's a possibility that a blob (with index lower
// than current received index) is being retransmitted by a peer node.
let max_repair = if times >= 8 {
// if repair backoff is getting high, don't wait for avalanche
let max_repair = if times >= 8 || is_next_leader {
// if repair backoff is getting high, or if we are the next leader,
// don't wait for avalanche
cmp::max(consumed, received)
} else {
cmp::max(consumed, received.saturating_sub(num_peers))
@ -484,29 +516,37 @@ mod test {
#[test]
pub fn calculate_max_repair_test() {
assert_eq!(calculate_max_repair(0, 10, 90, 0), 90);
assert_eq!(calculate_max_repair(15, 10, 90, 32), 90);
assert_eq!(calculate_max_repair(15, 10, 90, 0), 75);
assert_eq!(calculate_max_repair(90, 10, 90, 0), 10);
assert_eq!(calculate_max_repair(90, 10, 50, 0), 10);
assert_eq!(calculate_max_repair(90, 10, 99, 0), 10);
assert_eq!(calculate_max_repair(90, 10, 101, 0), 11);
assert_eq!(calculate_max_repair(0, 10, 90, 0, false), 90);
assert_eq!(calculate_max_repair(15, 10, 90, 32, false), 90);
assert_eq!(calculate_max_repair(15, 10, 90, 0, false), 75);
assert_eq!(calculate_max_repair(90, 10, 90, 0, false), 10);
assert_eq!(calculate_max_repair(90, 10, 50, 0, false), 10);
assert_eq!(calculate_max_repair(90, 10, 99, 0, false), 10);
assert_eq!(calculate_max_repair(90, 10, 101, 0, false), 11);
assert_eq!(
calculate_max_repair(90, 10, 95 + WINDOW_SIZE, 0),
calculate_max_repair(90, 10, 95 + WINDOW_SIZE, 0, false),
WINDOW_SIZE + 5
);
assert_eq!(
calculate_max_repair(90, 10, 99 + WINDOW_SIZE, 0),
calculate_max_repair(90, 10, 99 + WINDOW_SIZE, 0, false),
WINDOW_SIZE + 9
);
assert_eq!(
calculate_max_repair(90, 10, 100 + WINDOW_SIZE, 0),
calculate_max_repair(90, 10, 100 + WINDOW_SIZE, 0, false),
WINDOW_SIZE + 9
);
assert_eq!(
calculate_max_repair(90, 10, 120 + WINDOW_SIZE, 0),
calculate_max_repair(90, 10, 120 + WINDOW_SIZE, 0, false),
WINDOW_SIZE + 9
);
assert_eq!(
calculate_max_repair(50, 100, 50 + WINDOW_SIZE, 0, false),
WINDOW_SIZE
);
assert_eq!(
calculate_max_repair(50, 100, 50 + WINDOW_SIZE, 0, true),
50 + WINDOW_SIZE
);
}
fn wrap_blob_idx_in_window(id: &Pubkey, pix: u64, consumed: u64, received: u64) -> (bool, u64) {

View File

@ -20,6 +20,11 @@ use window::{blob_idx_in_window, SharedWindow, WindowUtil};
pub const MAX_REPAIR_BACKOFF: usize = 128;
#[derive(Debug, PartialEq, Eq, Clone)]
pub enum WindowServiceReturnType {
LeaderRotation(u64),
}
fn repair_backoff(last: &mut u64, times: &mut usize, consumed: u64) -> bool {
//exponential backoff
if *last != consumed {
@ -148,6 +153,7 @@ fn recv_window(
s: &EntrySender,
retransmit: &BlobSender,
pending_retransmits: &mut bool,
leader_rotation_interval: u64,
) -> Result<()> {
let timer = Duration::from_millis(200);
let mut dq = r.recv_timeout(timer)?;
@ -200,6 +206,7 @@ fn recv_window(
window.write().unwrap().process_blob(
id,
crdt,
b,
pix,
&mut consume_queue,
@ -207,6 +214,7 @@ fn recv_window(
consumed,
leader_unknown,
pending_retransmits,
leader_rotation_interval,
);
}
if log_enabled!(Level::Trace) {
@ -236,7 +244,7 @@ pub fn window_service(
s: EntrySender,
retransmit: BlobSender,
repair_socket: Arc<UdpSocket>,
) -> JoinHandle<()> {
) -> JoinHandle<Option<WindowServiceReturnType>> {
Builder::new()
.name("solana-window".to_string())
.spawn(move || {
@ -244,11 +252,31 @@ pub fn window_service(
let mut received = entry_height;
let mut last = entry_height;
let mut times = 0;
let id = crdt.read().unwrap().id;
let id;
let leader_rotation_interval;
{
let rcrdt = crdt.read().unwrap();
id = rcrdt.id;
leader_rotation_interval = rcrdt.get_leader_rotation_interval();
}
let mut pending_retransmits = false;
let recycler = BlobRecycler::default();
trace!("{}: RECV_WINDOW started", id);
loop {
if consumed != 0 && consumed % (leader_rotation_interval as u64) == 0 {
match crdt.read().unwrap().get_scheduled_leader(consumed) {
// If we are the next leader, exit
Some(next_leader_id) if id == next_leader_id => {
return Some(WindowServiceReturnType::LeaderRotation(consumed));
}
// TODO: Figure out where to set the new leader in the crdt for
// validator -> validator transition (once we have real leader scheduling,
// this decision will be clearer). Also make sure new blobs to window actually
// originate from new leader
_ => (),
}
}
if let Err(e) = recv_window(
&window,
&id,
@ -260,6 +288,7 @@ pub fn window_service(
&s,
&retransmit,
&mut pending_retransmits,
leader_rotation_interval,
) {
match e {
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
@ -297,6 +326,7 @@ pub fn window_service(
});
}
}
None
}).unwrap()
}
@ -305,47 +335,17 @@ mod test {
use crdt::{Crdt, Node};
use entry::Entry;
use hash::Hash;
use ledger::next_entries_mut;
use logger;
use packet::{BlobRecycler, SharedBlobs, PACKET_DATA_SIZE};
use signature::Pubkey;
use std::net::{SocketAddr, UdpSocket};
use packet::{make_consecutive_blobs, BlobRecycler, PACKET_DATA_SIZE};
use signature::{Keypair, KeypairUtil};
use std::net::UdpSocket;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{channel, Receiver};
use std::sync::{Arc, RwLock};
use std::time::Duration;
use streamer::{blob_receiver, responder};
use window::default_window;
use window_service::{repair_backoff, window_service};
fn make_consecutive_blobs(
me_id: Pubkey,
mut num_blobs_to_make: u64,
start_hash: Hash,
addr: &SocketAddr,
resp_recycler: &BlobRecycler,
) -> SharedBlobs {
let mut msgs = Vec::new();
let mut last_hash = start_hash;
let mut num_hashes = 0;
while num_blobs_to_make != 0 {
let new_entries = next_entries_mut(&mut last_hash, &mut num_hashes, vec![]);
let mut new_blobs: SharedBlobs = new_entries
.iter()
.enumerate()
.map(|(i, e)| {
let blob_index = num_blobs_to_make - i as u64 - 1;
let new_blob =
e.to_blob(&resp_recycler, Some(blob_index), Some(me_id), Some(addr));
assert_eq!(blob_index, new_blob.read().get_index().unwrap());
new_blob
}).collect();
new_blobs.truncate(num_blobs_to_make as usize);
num_blobs_to_make -= new_blobs.len() as u64;
msgs.extend(new_blobs);
}
msgs
}
use window_service::{repair_backoff, window_service, WindowServiceReturnType};
fn get_entries(r: Receiver<Vec<Entry>>, num: &mut usize) {
for _t in 0..5 {
@ -401,7 +401,9 @@ mod test {
Hash::default(),
&gossip_address,
&resp_recycler,
);
).into_iter()
.rev()
.collect();;
s_responder.send(msgs).expect("send");
t_responder
};
@ -577,4 +579,86 @@ mod test {
assert!(avg >= 3);
assert!(avg <= 5);
}
#[test]
pub fn test_window_leader_rotation_exit() {
logger::setup();
let leader_rotation_interval = 10;
// Height at which this node becomes the leader =
// my_leader_begin_epoch * leader_rotation_interval
let my_leader_begin_epoch = 2;
let tn = Node::new_localhost();
let exit = Arc::new(AtomicBool::new(false));
let mut crdt_me = Crdt::new(tn.info.clone()).expect("Crdt::new");
let me_id = crdt_me.my_data().id;
// Set myself in an upcoming epoch, but set the old_leader_id as the
// leader for all epochs before that
let old_leader_id = Keypair::new().pubkey();
crdt_me.set_leader(me_id);
crdt_me.set_leader_rotation_interval(leader_rotation_interval);
for i in 0..my_leader_begin_epoch {
crdt_me.set_scheduled_leader(leader_rotation_interval * i, old_leader_id);
}
crdt_me.set_scheduled_leader(my_leader_begin_epoch * leader_rotation_interval, me_id);
let subs = Arc::new(RwLock::new(crdt_me));
let resp_recycler = BlobRecycler::default();
let (s_reader, r_reader) = channel();
let t_receiver = blob_receiver(Arc::new(tn.sockets.gossip), exit.clone(), s_reader);
let (s_window, _r_window) = channel();
let (s_retransmit, _r_retransmit) = channel();
let win = Arc::new(RwLock::new(default_window()));
let t_window = window_service(
subs,
win,
0,
r_reader,
s_window,
s_retransmit,
Arc::new(tn.sockets.repair),
);
let t_responder = {
let (s_responder, r_responder) = channel();
let blob_sockets: Vec<Arc<UdpSocket>> =
tn.sockets.replicate.into_iter().map(Arc::new).collect();
let t_responder = responder(
"test_window_leader_rotation_exit",
blob_sockets[0].clone(),
r_responder,
);
let ncp_address = &tn.info.contact_info.ncp;
// Send the blobs out of order, in reverse. Also send an extra leader_rotation_interval
// number of blobs to make sure the window stops in the right place.
let extra_blobs = leader_rotation_interval;
let total_blobs_to_send =
my_leader_begin_epoch * leader_rotation_interval + extra_blobs;
let msgs = make_consecutive_blobs(
me_id,
total_blobs_to_send,
Hash::default(),
&ncp_address,
&resp_recycler,
).into_iter()
.rev()
.collect();;
s_responder.send(msgs).expect("send");
t_responder
};
assert_eq!(
Some(WindowServiceReturnType::LeaderRotation(
my_leader_begin_epoch * leader_rotation_interval
)),
t_window.join().expect("window service join")
);
t_responder.join().expect("responder thread join");
exit.store(true, Ordering::Relaxed);
t_receiver.join().expect("receiver thread join");
}
}

View File

@ -9,7 +9,7 @@ use solana::crdt::{Crdt, Node, NodeInfo};
use solana::entry::Entry;
use solana::fullnode::{Fullnode, FullnodeReturnType};
use solana::hash::Hash;
use solana::ledger::LedgerWriter;
use solana::ledger::{read_ledger, LedgerWriter};
use solana::logger;
use solana::mint::Mint;
use solana::ncp::Ncp;
@ -881,6 +881,130 @@ fn test_leader_to_validator_transition() {
remove_dir_all(leader_ledger_path).unwrap();
}
#[test]
#[ignore]
fn test_leader_validator_basic() {
logger::setup();
let leader_rotation_interval = 10;
// Account that will be the sink for all the test's transactions
let bob_pubkey = Keypair::new().pubkey();
// Make a mint and a genesis entry in the leader ledger
let (mint, leader_ledger_path, genesis_entries) =
genesis("test_leader_validator_basic", 10_000);
let genesis_height = genesis_entries.len();
// Initialize the leader ledger
let mut ledger_paths = Vec::new();
ledger_paths.push(leader_ledger_path.clone());
// Create the leader fullnode
let leader_keypair = Keypair::new();
let leader_node = Node::new_localhost_with_pubkey(leader_keypair.pubkey());
let leader_info = leader_node.info.clone();
let mut leader = Fullnode::new(
leader_node,
&leader_ledger_path,
leader_keypair,
None,
false,
Some(leader_rotation_interval),
);
// Send leader some tokens to vote
send_tx_and_retry_get_balance(&leader_info, &mint, &leader_info.id, 500, None).unwrap();
// Start the validator node
let validator_ledger_path = tmp_copy_ledger(&leader_ledger_path, "test_leader_validator_basic");
let validator_keypair = Keypair::new();
let validator_node = Node::new_localhost_with_pubkey(validator_keypair.pubkey());
let validator_info = validator_node.info.clone();
let mut validator = Fullnode::new(
validator_node,
&validator_ledger_path,
validator_keypair,
Some(leader_info.contact_info.ncp),
false,
Some(leader_rotation_interval),
);
ledger_paths.push(validator_ledger_path.clone());
// Set the leader schedule for the validator and leader
let my_leader_begin_epoch = 2;
for i in 0..my_leader_begin_epoch {
validator.set_scheduled_leader(leader_info.id, leader_rotation_interval * i);
leader.set_scheduled_leader(leader_info.id, leader_rotation_interval * i);
}
validator.set_scheduled_leader(
validator_info.id,
my_leader_begin_epoch * leader_rotation_interval,
);
leader.set_scheduled_leader(
validator_info.id,
my_leader_begin_epoch * leader_rotation_interval,
);
// Wait for convergence
let servers = converge(&leader_info, 2);
assert_eq!(servers.len(), 2);
// Send transactions to the leader
let extra_transactions = std::cmp::max(leader_rotation_interval / 3, 1);
let total_transactions_to_send =
my_leader_begin_epoch * leader_rotation_interval + extra_transactions;
// Push "extra_transactions" past leader_rotation_interval entry height,
// make sure the validator stops.
for _ in genesis_height as u64..total_transactions_to_send {
send_tx_and_retry_get_balance(&leader_info, &mint, &bob_pubkey, 1, None);
}
// Wait for validator to shut down tvu and restart tpu
match validator.handle_role_transition().unwrap() {
Some(FullnodeReturnType::LeaderRotation) => (),
_ => panic!("Expected reason for exit to be leader rotation"),
}
// TODO: We ignore this test for now b/c there's a chance here that the crdt
// in the new leader calls the dummy sequence of update_leader -> top_leader()
// (see the TODOs in those functions) during gossip and sets the leader back
// to the old leader, which causes a panic from an assertion failure in crdt broadcast(),
// specifically: assert!(me.leader_id != v.id). We can enable this test once we have real
// leader scheduling
// Wait for the leader to shut down tpu and restart tvu
match leader.handle_role_transition().unwrap() {
Some(FullnodeReturnType::LeaderRotation) => (),
_ => panic!("Expected reason for exit to be leader rotation"),
}
// Shut down
validator.close().unwrap();
leader.close().unwrap();
// Check the ledger of the validator to make sure the entry height is correct
// and that the old leader and the new leader's ledgers agree up to the point
// of leader rotation
let validator_entries =
read_ledger(&validator_ledger_path, true).expect("Expected parsing of validator ledger");
let leader_entries =
read_ledger(&validator_ledger_path, true).expect("Expected parsing of leader ledger");
for (v, l) in validator_entries.zip(leader_entries) {
assert_eq!(
v.expect("expected valid validator entry"),
l.expect("expected valid leader entry")
);
}
for path in ledger_paths {
remove_dir_all(path).unwrap();
}
}
fn mk_client(leader: &NodeInfo) -> ThinClient {
let requests_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
requests_socket