Broadcast last tick before leader rotation (#1766)

* Broadcast last tick before leader rotation to everybody on network

* Add test

* Refactor broadcast
This commit is contained in:
carllin 2018-11-13 02:21:37 -08:00 committed by GitHub
parent a77b1ff767
commit 6335be803c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 306 additions and 74 deletions

View File

@ -28,7 +28,10 @@ pub enum BroadcastStageReturnType {
ChannelDisconnected,
}
#[cfg_attr(feature = "cargo-clippy", allow(too_many_arguments))]
fn broadcast(
max_tick_height: Option<u64>,
tick_height: &mut u64,
node_info: &NodeInfo,
broadcast_table: &[NodeInfo],
window: &SharedWindow,
@ -52,6 +55,14 @@ fn broadcast(
inc_new_counter_info!("broadcast_stage-entries_received", num_entries);
let to_blobs_start = Instant::now();
let num_ticks: u64 = ventries
.iter()
.flatten()
.map(|entry| (entry.is_tick()) as u64)
.sum();
*tick_height += num_ticks;
let dq: SharedBlobs = ventries
.into_par_iter()
.flat_map(|p| p.to_blobs())
@ -128,6 +139,7 @@ fn broadcast(
// Send blobs out from the window
ClusterInfo::broadcast(
Some(*tick_height) == max_tick_height,
&node_info,
&broadcast_table,
&window,
@ -188,6 +200,8 @@ impl BroadcastStage {
entry_height: u64,
leader_slot: u64,
receiver: &Receiver<Vec<Entry>>,
max_tick_height: Option<u64>,
tick_height: u64,
) -> BroadcastStageReturnType {
let mut transmit_index = WindowIndex {
data: entry_height,
@ -195,9 +209,12 @@ impl BroadcastStage {
};
let mut receive_index = entry_height;
let me = cluster_info.read().unwrap().my_data().clone();
let mut tick_height_ = tick_height;
loop {
let broadcast_table = cluster_info.read().unwrap().compute_broadcast_table();
if let Err(e) = broadcast(
max_tick_height,
&mut tick_height_,
&me,
&broadcast_table,
&window,
@ -244,6 +261,8 @@ impl BroadcastStage {
entry_height: u64,
leader_slot: u64,
receiver: Receiver<Vec<Entry>>,
max_tick_height: Option<u64>,
tick_height: u64,
exit_sender: Arc<AtomicBool>,
) -> Self {
let thread_hdl = Builder::new()
@ -257,6 +276,8 @@ impl BroadcastStage {
entry_height,
leader_slot,
&receiver,
max_tick_height,
tick_height,
)
}).unwrap();

View File

@ -28,6 +28,7 @@ use signature::{Keypair, KeypairUtil};
use solana_sdk::pubkey::Pubkey;
use std;
use std::collections::HashMap;
use std::io;
use std::net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::{Arc, RwLock};
@ -461,6 +462,7 @@ impl ClusterInfo {
/// # Remarks
/// We need to avoid having obj locked while doing any io, such as the `send_to`
pub fn broadcast(
contains_last_tick: bool,
me: &NodeInfo,
broadcast_table: &[NodeInfo],
window: &SharedWindow,
@ -483,81 +485,18 @@ impl ClusterInfo {
let old_transmit_index = transmit_index.data;
// enumerate all the blobs in the window, those are the indices
// transmit them to nodes, starting from a different node. Add one
// to the capacity in case we want to send an extra blob notifying the
// next leader about the blob right before leader rotation
let mut orders = Vec::with_capacity((received_index - transmit_index.data + 1) as usize);
let window_l = window.read().unwrap();
let mut br_idx = transmit_index.data as usize % broadcast_table.len();
for idx in transmit_index.data..received_index {
let w_idx = idx as usize % window_l.len();
trace!(
"{} broadcast order data w_idx {} br_idx {}",
me.id,
w_idx,
br_idx
);
orders.push((window_l[w_idx].data.clone(), &broadcast_table[br_idx]));
br_idx += 1;
br_idx %= broadcast_table.len();
}
for idx in transmit_index.coding..received_index {
let w_idx = idx as usize % window_l.len();
// skip over empty slots
if window_l[w_idx].coding.is_none() {
continue;
}
trace!(
"{} broadcast order coding w_idx: {} br_idx :{}",
me.id,
w_idx,
br_idx,
);
orders.push((window_l[w_idx].coding.clone(), &broadcast_table[br_idx]));
br_idx += 1;
br_idx %= broadcast_table.len();
}
let orders = Self::create_broadcast_orders(
contains_last_tick,
window,
broadcast_table,
transmit_index,
received_index,
me,
);
trace!("broadcast orders table {}", orders.len());
let errs: Vec<_> = orders
.into_iter()
.map(|(b, v)| {
// only leader should be broadcasting
assert!(me.leader_id != v.id);
let bl = b.unwrap();
let blob = bl.read().unwrap();
//TODO profile this, may need multiple sockets for par_iter
trace!(
"{}: BROADCAST idx: {} sz: {} to {},{} coding: {}",
me.id,
blob.index().unwrap(),
blob.meta.size,
v.id,
v.contact_info.tvu,
blob.is_coding()
);
assert!(blob.meta.size <= BLOB_SIZE);
let e = s.send_to(&blob.data[..blob.meta.size], &v.contact_info.tvu);
trace!(
"{}: done broadcast {} to {} {}",
me.id,
blob.meta.size,
v.id,
v.contact_info.tvu
);
e
}).collect();
trace!("broadcast results {}", errs.len());
let errs = Self::send_orders(s, orders, me);
for e in errs {
if let Err(e) = &e {
trace!("broadcast result {:?}", e);
@ -641,6 +580,126 @@ impl ClusterInfo {
self.remote.values().fold(max, |a, b| std::cmp::min(a, *b))
}
fn send_orders(
s: &UdpSocket,
orders: Vec<(Option<SharedBlob>, Vec<&NodeInfo>)>,
me: &NodeInfo,
) -> Vec<io::Result<usize>> {
orders
.into_iter()
.flat_map(|(b, vs)| {
// only leader should be broadcasting
assert!(vs.iter().find(|info| info.id == me.leader_id).is_none());
let bl = b.unwrap();
let blob = bl.read().unwrap();
//TODO profile this, may need multiple sockets for par_iter
let ids_and_tvus = if log_enabled!(Level::Trace) {
let v_ids = vs.iter().map(|v| v.id);
let tvus = vs.iter().map(|v| v.contact_info.tvu);
let ids_and_tvus = v_ids.zip(tvus).collect();
trace!(
"{}: BROADCAST idx: {} sz: {} to {:?} coding: {}",
me.id,
blob.index().unwrap(),
blob.meta.size,
ids_and_tvus,
blob.is_coding()
);
ids_and_tvus
} else {
vec![]
};
assert!(blob.meta.size <= BLOB_SIZE);
let send_errs_for_blob: Vec<_> = vs
.iter()
.map(move |v| {
let e = s.send_to(&blob.data[..blob.meta.size], &v.contact_info.tvu);
trace!(
"{}: done broadcast {} to {:?}",
me.id,
blob.meta.size,
ids_and_tvus
);
e
}).collect();
send_errs_for_blob
}).collect()
}
fn create_broadcast_orders<'a>(
contains_last_tick: bool,
window: &SharedWindow,
broadcast_table: &'a [NodeInfo],
transmit_index: &mut WindowIndex,
received_index: u64,
me: &NodeInfo,
) -> Vec<(Option<SharedBlob>, Vec<&'a NodeInfo>)> {
// enumerate all the blobs in the window, those are the indices
// transmit them to nodes, starting from a different node.
let mut orders = Vec::with_capacity((received_index - transmit_index.data) as usize);
let window_l = window.read().unwrap();
let mut br_idx = transmit_index.data as usize % broadcast_table.len();
for idx in transmit_index.data..received_index {
let w_idx = idx as usize % window_l.len();
trace!(
"{} broadcast order data w_idx {} br_idx {}",
me.id,
w_idx,
br_idx
);
// Broadcast the last tick to everyone on the network so it doesn't get dropped
// (Need to maximize probability the next leader in line sees this handoff tick
// despite packet drops)
let target = if idx == received_index - 1 && contains_last_tick {
// If we see a tick at max_tick_height, then we know it must be the last
// Blob in the window, at index == received_index. There cannot be an entry
// that got sent after the last tick, guaranteed by the PohService).
assert!(window_l[w_idx].data.is_some());
(
window_l[w_idx].data.clone(),
broadcast_table.iter().collect(),
)
} else {
(window_l[w_idx].data.clone(), vec![&broadcast_table[br_idx]])
};
orders.push(target);
br_idx += 1;
br_idx %= broadcast_table.len();
}
for idx in transmit_index.coding..received_index {
let w_idx = idx as usize % window_l.len();
// skip over empty slots
if window_l[w_idx].coding.is_none() {
continue;
}
trace!(
"{} broadcast order coding w_idx: {} br_idx :{}",
me.id,
w_idx,
br_idx,
);
orders.push((
window_l[w_idx].coding.clone(),
vec![&broadcast_table[br_idx]],
));
br_idx += 1;
br_idx %= broadcast_table.len();
}
orders
}
// TODO: fill in with real implmentation once staking is implemented
fn get_stake(_id: Pubkey) -> f64 {
1.0

View File

@ -298,6 +298,8 @@ impl Fullnode {
entry_height,
leader_slot,
entry_receiver,
max_tick_height,
bank.tick_height(),
tpu_exit,
);
let leader_state = LeaderServices::new(tpu, broadcast_stage);
@ -449,6 +451,8 @@ impl Fullnode {
entry_height,
0, // TODO: get real leader slot from leader_scheduler
blob_receiver,
max_tick_height,
tick_height,
tpu_exit,
);
let leader_state = LeaderServices::new(tpu, broadcast_stage);

View File

@ -6,17 +6,21 @@ extern crate serde_json;
extern crate solana;
extern crate solana_sdk;
use solana::blob_fetch_stage::BlobFetchStage;
use solana::cluster_info::{ClusterInfo, Node, NodeInfo};
use solana::entry::Entry;
use solana::fullnode::{Fullnode, FullnodeReturnType};
use solana::hash::Hash;
use solana::leader_scheduler::{make_active_set_entries, LeaderScheduler, LeaderSchedulerConfig};
use solana::ledger::{
create_tmp_genesis, create_tmp_sample_ledger, get_tmp_ledger_path, read_ledger, LedgerWriter,
create_tmp_genesis, create_tmp_sample_ledger, get_tmp_ledger_path, read_ledger,
reconstruct_entries_from_blobs, LedgerWindow, LedgerWriter,
};
use solana::logger;
use solana::mint::Mint;
use solana::ncp::Ncp;
use solana::packet::SharedBlob;
use solana::poh_service::NUM_TICKS_PER_SECOND;
use solana::result;
use solana::service::Service;
use solana::signature::{Keypair, KeypairUtil};
@ -58,6 +62,31 @@ fn make_spy_node(leader: &NodeInfo) -> (Ncp, Arc<RwLock<ClusterInfo>>, Pubkey) {
(ncp, spy_cluster_info_ref, me)
}
fn make_listening_node(leader: &NodeInfo) -> (Ncp, Arc<RwLock<ClusterInfo>>, Node, Pubkey) {
let exit = Arc::new(AtomicBool::new(false));
let new_node = Node::new_localhost();
let new_node_info = new_node.info.clone();
let me = new_node.info.id.clone();
let mut new_node_cluster_info = ClusterInfo::new(new_node_info).expect("ClusterInfo::new");
new_node_cluster_info.insert(&leader);
new_node_cluster_info.set_leader(leader.id);
let new_node_cluster_info_ref = Arc::new(RwLock::new(new_node_cluster_info));
let new_node_window = Arc::new(RwLock::new(default_window()));
let ncp = Ncp::new(
&new_node_cluster_info_ref,
new_node_window,
None,
new_node
.sockets
.gossip
.try_clone()
.expect("Failed to clone gossip"),
exit.clone(),
);
(ncp, new_node_cluster_info_ref, new_node, me)
}
fn converge(leader: &NodeInfo, num_nodes: usize) -> Vec<NodeInfo> {
//lets spy on the network
let (ncp, spy_ref, _) = make_spy_node(leader);
@ -1470,6 +1499,125 @@ fn test_full_leader_validator_network() {
}
}
#[test]
fn test_broadcast_last_tick() {
logger::setup();
// The number of validators
const N: usize = 5;
logger::setup();
// Create the bootstrap leader node information
let bootstrap_leader_keypair = Keypair::new();
let bootstrap_leader_node = Node::new_localhost_with_pubkey(bootstrap_leader_keypair.pubkey());
let bootstrap_leader_info = bootstrap_leader_node.info.clone();
// Create leader ledger
let (_, bootstrap_leader_ledger_path, genesis_entries) = create_tmp_sample_ledger(
"test_broadcast_last_tick",
10_000,
0,
bootstrap_leader_info.id,
500,
);
let num_ending_ticks = genesis_entries
.iter()
.skip(2)
.fold(0, |tick_count, entry| tick_count + entry.is_tick() as u64);
let genesis_ledger_len = genesis_entries.len() as u64 - num_ending_ticks;
let blob_receiver_exit = Arc::new(AtomicBool::new(false));
// Create the listeners
let mut listening_nodes: Vec<_> = (0..N)
.map(|_| make_listening_node(&bootstrap_leader_info))
.collect();
let blob_fetch_stages: Vec<_> = listening_nodes
.iter_mut()
.map(|(_, _, node, _)| {
BlobFetchStage::new(
Arc::new(node.sockets.replicate.pop().unwrap()),
blob_receiver_exit.clone(),
)
}).collect();
// Create fullnode, should take 20 seconds to reach end of bootstrap period
let bootstrap_height = (NUM_TICKS_PER_SECOND * 20) as u64;
let leader_rotation_interval = 100;
let seed_rotation_interval = 200;
let leader_scheduler_config = LeaderSchedulerConfig::new(
Some(bootstrap_height),
Some(leader_rotation_interval),
Some(seed_rotation_interval),
Some(leader_rotation_interval),
);
// Start up the bootstrap leader fullnode
let mut bootstrap_leader = Fullnode::new(
bootstrap_leader_node,
&bootstrap_leader_ledger_path,
Arc::new(bootstrap_leader_keypair),
Arc::new(Keypair::new()),
Some(bootstrap_leader_info.contact_info.ncp),
false,
LeaderScheduler::new(&leader_scheduler_config),
None,
);
// Wait for convergence
let servers = converge(&bootstrap_leader_info, N + 1);
assert_eq!(servers.len(), N + 1);
// Wait for leader rotation
match bootstrap_leader.handle_role_transition().unwrap() {
Some(FullnodeReturnType::LeaderToValidatorRotation) => (),
_ => panic!("Expected reason for exit to be leader rotation"),
}
// Shut down the leader
bootstrap_leader.close().unwrap();
let last_tick_entry_height = genesis_ledger_len as u64 + bootstrap_height;
let mut ledger_window = LedgerWindow::open(&bootstrap_leader_ledger_path)
.expect("Expected to be able to open ledger");
// get_entry() expects the index of the entry, so we have to subtract one from the actual entry height
let expected_last_tick = ledger_window
.get_entry(last_tick_entry_height - 1)
.expect("Expected last tick entry to exist");
// Check that the nodes got the last broadcasted blob
for (_, receiver) in blob_fetch_stages.iter() {
let mut last_tick_blob: SharedBlob = SharedBlob::default();
while let Ok(mut new_blobs) = receiver.try_recv() {
let last_blob = new_blobs.into_iter().find(|b| {
b.read().unwrap().index().expect("Expected index in blob")
== last_tick_entry_height - 1
});
if let Some(last_blob) = last_blob {
last_tick_blob = last_blob;
break;
}
}
let actual_last_tick = &reconstruct_entries_from_blobs(vec![last_tick_blob])
.expect("Expected to be able to reconstruct entries from blob")[0];
assert_eq!(actual_last_tick, &expected_last_tick);
}
// Shut down blob fetch stages
blob_receiver_exit.store(true, Ordering::Relaxed);
for (bf, _) in blob_fetch_stages {
bf.join().unwrap();
}
// Shut down the listeners
for node in listening_nodes {
node.0.close().unwrap();
}
remove_dir_all(bootstrap_leader_ledger_path).unwrap();
}
fn mk_client(leader: &NodeInfo) -> ThinClient {
let transactions_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
assert!(ClusterInfo::is_valid_address(&leader.contact_info.tpu));