Dynamic N layer 'avalanche' broadcast and retransmit (#2058)

* Dynamic N layer avalanche broadcast and retransmit
This commit is contained in:
Sagar Dhawan 2019-01-02 14:16:15 +05:30 committed by GitHub
parent 5fbdc6450d
commit 0bea870b22
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 440 additions and 17 deletions

View File

@ -1,6 +1,7 @@
//! The `broadcast_service` broadcasts data from a leader node to validators
//!
use crate::cluster_info::{ClusterInfo, ClusterInfoError, NodeInfo};
use crate::bank::Bank;
use crate::cluster_info::{ClusterInfo, ClusterInfoError, NodeInfo, DATA_PLANE_FANOUT};
use crate::counter::Counter;
use crate::db_ledger::DbLedger;
use crate::entry::Entry;
@ -239,8 +240,10 @@ pub struct BroadcastService {
}
impl BroadcastService {
#[allow(clippy::too_many_arguments)]
fn run(
db_ledger: &Arc<DbLedger>,
bank: &Arc<Bank>,
sock: &UdpSocket,
cluster_info: &Arc<RwLock<ClusterInfo>>,
window: &SharedWindow,
@ -260,7 +263,9 @@ impl BroadcastService {
if exit_signal.load(Ordering::Relaxed) {
return BroadcastServiceReturnType::ExitSignal;
}
let broadcast_table = cluster_info.read().unwrap().tvu_peers();
let mut broadcast_table = cluster_info.read().unwrap().sorted_tvu_peers(&bank);
// Layer 1 nodes are limited to the fanout size.
broadcast_table.truncate(DATA_PLANE_FANOUT);
inc_new_counter_info!("broadcast_service-num_peers", broadcast_table.len() + 1);
let leader_id = cluster_info.read().unwrap().leader_id();
if let Err(e) = broadcast(
@ -309,6 +314,7 @@ impl BroadcastService {
#[allow(clippy::too_many_arguments, clippy::new_ret_no_self)]
pub fn new(
db_ledger: Arc<DbLedger>,
bank: Arc<Bank>,
sock: UdpSocket,
cluster_info: Arc<RwLock<ClusterInfo>>,
window: SharedWindow,
@ -326,6 +332,7 @@ impl BroadcastService {
let _exit = Finalizer::new(exit_sender);
Self::run(
&db_ledger,
&bank,
&sock,
&cluster_info,
&window,
@ -401,10 +408,12 @@ mod test {
let shared_window = Arc::new(RwLock::new(window));
let (entry_sender, entry_receiver) = channel();
let exit_sender = Arc::new(AtomicBool::new(false));
let bank = Arc::new(Bank::default());
// Start up the broadcast stage
let (broadcast_service, exit_signal) = BroadcastService::new(
db_ledger.clone(),
bank.clone(),
leader_info.sockets.broadcast,
cluster_info,
shared_window,

View File

@ -12,6 +12,7 @@
//! * layer 2 - Everyone else, if layer 1 is `2^10`, layer 2 should be able to fit `2^20` number of nodes.
//!
//! Bank needs to provide an interface for us to query the stake weight
use crate::bank::Bank;
use crate::bloom::Bloom;
use crate::contact_info::ContactInfo;
use crate::counter::Counter;
@ -36,6 +37,7 @@ use solana_sdk::hash::Hash;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::signature::{Keypair, KeypairUtil, Signable, Signature};
use solana_sdk::timing::{duration_as_ms, timestamp};
use std::cmp::min;
use std::io;
use std::net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
@ -47,6 +49,12 @@ pub type NodeInfo = ContactInfo;
pub const FULLNODE_PORT_RANGE: (u16, u16) = (8000, 10_000);
/// The fanout for Ledger Replication
pub const DATA_PLANE_FANOUT: usize = 200;
pub const NEIGHBORHOOD_SIZE: usize = DATA_PLANE_FANOUT;
/// Set whether node capacity should grow as layers are added
pub const GROW_LAYER_CAPACITY: bool = false;
/// milliseconds we sleep for between gossip requests
const GOSSIP_SLEEP_MILLIS: u64 = 100;
@ -66,6 +74,20 @@ pub struct ClusterInfo {
pub(crate) keypair: Arc<Keypair>,
}
#[derive(Default, Clone)]
pub struct Locality {
/// The bounds of the neighborhood represented by this locality
pub neighbor_bounds: (usize, usize),
/// The `avalanche` layer this locality is in
pub layer_ix: usize,
/// The bounds of the current layer
pub layer_bounds: (usize, usize),
/// The bounds of the next layer
pub child_layer_bounds: Option<(usize, usize)>,
/// The indices of the nodes that should be contacted in next layer
pub child_layer_peers: Vec<usize>,
}
#[derive(Debug, Deserialize, Serialize)]
pub struct PruneData {
/// Pubkey of the node that sent this prune data
@ -297,6 +319,33 @@ impl ClusterInfo {
.collect()
}
fn sort_by_stake(peers: &[NodeInfo], bank: &Arc<Bank>) -> Vec<(u64, NodeInfo)> {
let mut peers_with_stakes: Vec<_> = peers
.iter()
.map(|c| (bank.get_stake(&c.id), c.clone()))
.collect();
peers_with_stakes.sort_unstable();
peers_with_stakes
}
pub fn sorted_retransmit_peers(&self, bank: &Arc<Bank>) -> Vec<NodeInfo> {
let peers = self.retransmit_peers();
let peers_with_stakes: Vec<_> = ClusterInfo::sort_by_stake(&peers, bank);
peers_with_stakes
.iter()
.map(|(_, peer)| (*peer).clone())
.collect()
}
pub fn sorted_tvu_peers(&self, bank: &Arc<Bank>) -> Vec<NodeInfo> {
let peers = self.tvu_peers();
let peers_with_stakes: Vec<_> = ClusterInfo::sort_by_stake(&peers, bank);
peers_with_stakes
.iter()
.map(|(_, peer)| (*peer).clone())
.collect()
}
/// compute broadcast table
pub fn tpu_peers(&self) -> Vec<NodeInfo> {
let me = self.my_data().id;
@ -311,6 +360,137 @@ impl ClusterInfo {
.collect()
}
/// Given a node count, neighborhood size, and an initial fanout (leader -> layer 1), it
/// calculates how many layers are needed and at what index each layer begins.
/// The `grow` parameter is used to determine if the network should 'fanout' or keep
/// layer capacities constant.
pub fn describe_data_plane(
nodes: usize,
fanout: usize,
hood_size: usize,
grow: bool,
) -> (usize, Vec<usize>) {
let mut layer_indices: Vec<usize> = vec![0];
if nodes == 0 {
(0, vec![])
} else if nodes <= fanout {
// single layer data plane
(1, layer_indices)
} else {
//layer 1 is going to be the first num fanout nodes, so exclude those
let mut remaining_nodes = nodes - fanout;
layer_indices.push(fanout);
let mut num_layers = 2;
let mut num_neighborhoods = fanout / 2;
let mut layer_capacity = hood_size * num_neighborhoods;
while remaining_nodes > 0 {
if remaining_nodes > layer_capacity {
// Needs more layers.
num_layers += 1;
remaining_nodes -= layer_capacity;
let end = *layer_indices.last().unwrap();
layer_indices.push(layer_capacity + end);
if grow {
// Next layer's capacity
num_neighborhoods *= num_neighborhoods;
layer_capacity = hood_size * num_neighborhoods;
}
} else {
//everything will now fit in the layers we have
let end = *layer_indices.last().unwrap();
layer_indices.push(layer_capacity + end);
break;
}
}
assert_eq!(num_layers, layer_indices.len() - 1);
(num_layers, layer_indices)
}
}
fn localize_item(
layer_indices: &[usize],
hood_size: usize,
select_index: usize,
curr_index: usize,
) -> Option<(Locality)> {
let end = layer_indices.len() - 1;
let next = min(end, curr_index + 1);
let value = layer_indices[curr_index];
let localized = select_index >= value && select_index < layer_indices[next];
let mut locality = Locality::default();
if localized {
match curr_index {
_ if curr_index == 0 => {
locality.layer_ix = 0;
locality.layer_bounds = (0, hood_size);
locality.neighbor_bounds = locality.layer_bounds;
if next == end {
locality.child_layer_bounds = None;
locality.child_layer_peers = vec![];
} else {
locality.child_layer_bounds =
Some((layer_indices[next], layer_indices[next + 1]));
locality.child_layer_peers = ClusterInfo::lower_layer_peers(
select_index,
layer_indices[next],
layer_indices[next + 1],
hood_size,
);
}
}
_ if curr_index == end => {
locality.layer_ix = end;
locality.layer_bounds = (end - hood_size, end);
locality.neighbor_bounds = locality.layer_bounds;
locality.child_layer_bounds = None;
locality.child_layer_peers = vec![];
}
ix => {
let hood_ix = (select_index - value) / hood_size;
locality.layer_ix = ix;
locality.layer_bounds = (value, layer_indices[next]);
locality.neighbor_bounds = (
((hood_ix * hood_size) + value),
((hood_ix + 1) * hood_size + value),
);
if next == end {
locality.child_layer_bounds = None;
locality.child_layer_peers = vec![];
} else {
locality.child_layer_bounds =
Some((layer_indices[next], layer_indices[next + 1]));
locality.child_layer_peers = ClusterInfo::lower_layer_peers(
select_index,
layer_indices[next],
layer_indices[next + 1],
hood_size,
);
}
}
}
Some(locality)
} else {
None
}
}
/// Given a array of layer indices and another index, returns (as a `Locality`) the layer,
/// layer-bounds and neighborhood-bounds in which the index resides
pub fn localize(layer_indices: &[usize], hood_size: usize, select_index: usize) -> Locality {
(0..layer_indices.len())
.find_map(|i| ClusterInfo::localize_item(layer_indices, hood_size, select_index, i))
.or_else(|| Some(Locality::default()))
.unwrap()
}
fn lower_layer_peers(index: usize, start: usize, end: usize, hood_size: usize) -> Vec<usize> {
(start..end)
.step_by(hood_size)
.map(|x| x + index % hood_size)
.collect()
}
/// broadcast messages from the leader to layer 1 nodes
/// # Remarks
/// We need to avoid having obj locked while doing any io, such as the `send_to`
@ -369,14 +549,19 @@ impl ClusterInfo {
Ok(())
}
/// retransmit messages from the leader to layer 1 nodes
/// retransmit messages to a list of nodes
/// # Remarks
/// We need to avoid having obj locked while doing any io, such as the `send_to`
pub fn retransmit(obj: &Arc<RwLock<Self>>, blob: &SharedBlob, s: &UdpSocket) -> Result<()> {
let (me, orders): (NodeInfo, Vec<NodeInfo>) = {
pub fn retransmit_to(
obj: &Arc<RwLock<Self>>,
peers: &[NodeInfo],
blob: &SharedBlob,
s: &UdpSocket,
) -> Result<()> {
let (me, orders): (NodeInfo, &[NodeInfo]) = {
// copy to avoid locking during IO
let s = obj.read().expect("'obj' read lock in pub fn retransmit");
(s.my_data().clone(), s.retransmit_peers())
let s = obj.read().unwrap();
(s.my_data().clone(), peers)
};
blob.write()
.unwrap()
@ -409,6 +594,14 @@ impl ClusterInfo {
Ok(())
}
/// retransmit messages from the leader to layer 1 nodes
/// # Remarks
/// We need to avoid having obj locked while doing any io, such as the `send_to`
pub fn retransmit(obj: &Arc<RwLock<Self>>, blob: &SharedBlob, s: &UdpSocket) -> Result<()> {
let me = obj.read().unwrap();
ClusterInfo::retransmit_to(obj, &me.retransmit_peers(), blob, s)
}
fn send_orders(
s: &UdpSocket,
orders: Vec<(Option<SharedBlob>, Vec<&NodeInfo>)>,
@ -1119,10 +1312,10 @@ mod tests {
use crate::crds_value::CrdsValueLabel;
use crate::db_ledger::DbLedger;
use crate::ledger::get_tmp_ledger_path;
use crate::packet::BLOB_HEADER_SIZE;
use crate::result::Error;
use solana_sdk::signature::{Keypair, KeypairUtil};
use std::collections::HashSet;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::sync::{Arc, RwLock};
@ -1361,4 +1554,154 @@ mod tests {
.unwrap();
assert!(val.verify());
}
fn num_layers(nodes: usize, fanout: usize, hood_size: usize, grow: bool) -> usize {
ClusterInfo::describe_data_plane(nodes, fanout, hood_size, grow).0
}
#[test]
fn test_describe_data_plane() {
// no nodes
assert_eq!(num_layers(0, 200, 200, false), 0);
// 1 node
assert_eq!(num_layers(1, 200, 200, false), 1);
// 10 nodes with fanout of 2 and hood size of 2
assert_eq!(num_layers(10, 2, 2, false), 5);
// fanout + 1 nodes with fanout of 2 and hood size of 2
assert_eq!(num_layers(3, 2, 2, false), 2);
// 10 nodes with fanout of 4 and hood size of 2 while growing
assert_eq!(num_layers(10, 4, 2, true), 3);
// A little more realistic
assert_eq!(num_layers(100, 10, 10, false), 3);
// A little more realistic with odd numbers
assert_eq!(num_layers(103, 13, 13, false), 3);
// larger
let (layer_cnt, layer_indices) = ClusterInfo::describe_data_plane(10_000, 10, 10, false);
assert_eq!(layer_cnt, 201);
// distances between index values should be the same since we aren't growing.
let capacity = 10 / 2 * 10;
assert_eq!(layer_indices[1], 10);
layer_indices[1..layer_indices.len()]
.chunks(2)
.for_each(|x| {
if x.len() == 2 {
assert_eq!(x[1] - x[0], capacity);
}
});
// massive
let (layer_cnt, layer_indices) = ClusterInfo::describe_data_plane(500_000, 200, 200, false);
let capacity = 200 / 2 * 200;
let cnt = 500_000 / capacity + 1;
assert_eq!(layer_cnt, cnt);
// distances between index values should be the same since we aren't growing.
assert_eq!(layer_indices[1], 200);
layer_indices[1..layer_indices.len()]
.chunks(2)
.for_each(|x| {
if x.len() == 2 {
assert_eq!(x[1] - x[0], capacity);
}
});
let total_capacity: usize = *layer_indices.last().unwrap();
assert!(total_capacity >= 500_000);
// massive with growth
assert_eq!(num_layers(500_000, 200, 200, true), 3);
}
#[test]
fn test_localize() {
// go for gold
let (_, layer_indices) = ClusterInfo::describe_data_plane(500_000, 200, 200, false);
let mut me = 0;
let mut layer_ix = 0;
let locality = ClusterInfo::localize(&layer_indices, 200, me);
assert_eq!(locality.layer_ix, layer_ix);
assert_eq!(
locality.child_layer_bounds,
Some((layer_indices[layer_ix + 1], layer_indices[layer_ix + 2]))
);
me = 201;
layer_ix = 1;
let locality = ClusterInfo::localize(&layer_indices, 200, me);
assert_eq!(
locality.layer_ix, layer_ix,
"layer_indices[layer_ix] is actually {}",
layer_indices[layer_ix]
);
assert_eq!(
locality.child_layer_bounds,
Some((layer_indices[layer_ix + 1], layer_indices[layer_ix + 2]))
);
me = 20_201;
layer_ix = 2;
let locality = ClusterInfo::localize(&layer_indices, 200, me);
assert_eq!(
locality.layer_ix, layer_ix,
"layer_indices[layer_ix] is actually {}",
layer_indices[layer_ix]
);
assert_eq!(
locality.child_layer_bounds,
Some((layer_indices[layer_ix + 1], layer_indices[layer_ix + 2]))
);
// test no child layer since last layer should have massive capacity
let (_, layer_indices) = ClusterInfo::describe_data_plane(500_000, 200, 200, true);
me = 20_201;
layer_ix = 2;
let locality = ClusterInfo::localize(&layer_indices, 200, me);
assert_eq!(
locality.layer_ix, layer_ix,
"layer_indices[layer_ix] is actually {}",
layer_indices[layer_ix]
);
assert_eq!(locality.child_layer_bounds, None);
}
#[test]
fn test_localize_child_peer_overlap() {
let (_, layer_indices) = ClusterInfo::describe_data_plane(500_000, 200, 200, false);
let last_ix = layer_indices.len() - 1;
// sample every 33 pairs to reduce test time
for x in (0..*layer_indices.get(last_ix - 2).unwrap()).step_by(33) {
let me_locality = ClusterInfo::localize(&layer_indices, 200, x);
let buddy_locality = ClusterInfo::localize(&layer_indices, 200, x + 1);
assert!(!me_locality.child_layer_peers.is_empty());
assert!(!buddy_locality.child_layer_peers.is_empty());
me_locality
.child_layer_peers
.iter()
.zip(buddy_locality.child_layer_peers.iter())
.for_each(|(x, y)| assert_ne!(x, y));
}
}
#[test]
fn test_network_coverage() {
// pretend to be each node in a scaled down network and make sure the set of all the broadcast peers
// includes every node in the network.
let (_, layer_indices) = ClusterInfo::describe_data_plane(25_000, 10, 10, false);
let mut broadcast_set = HashSet::new();
for my_index in 0..25_000 {
let my_locality = ClusterInfo::localize(&layer_indices, 10, my_index);
broadcast_set.extend(my_locality.neighbor_bounds.0..my_locality.neighbor_bounds.1);
broadcast_set.extend(my_locality.child_layer_peers);
}
for i in 0..25_000 {
assert!(broadcast_set.contains(&(i as usize)));
}
assert!(broadcast_set.contains(&(layer_indices.last().unwrap() - 1)));
//sanity check for past total capacity.
assert!(!broadcast_set.contains(&(layer_indices.last().unwrap())));
}
}

View File

@ -3,10 +3,11 @@ use bincode::serialize;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::signature::{Keypair, KeypairUtil, Signable, Signature};
use solana_sdk::timing::timestamp;
use std::cmp::{Ord, Ordering, PartialEq, PartialOrd};
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
/// Structure representing a node on the network
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct ContactInfo {
pub id: Pubkey,
/// signature of this ContactInfo
@ -27,6 +28,26 @@ pub struct ContactInfo {
pub wallclock: u64,
}
impl Ord for ContactInfo {
fn cmp(&self, other: &Self) -> Ordering {
self.id.cmp(&other.id)
}
}
impl PartialOrd for ContactInfo {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl PartialEq for ContactInfo {
fn eq(&self, other: &Self) -> bool {
self.id == other.id
}
}
impl Eq for ContactInfo {}
#[macro_export]
macro_rules! socketaddr {
($ip:expr, $port:expr) => {

View File

@ -303,6 +303,7 @@ impl Fullnode {
let (broadcast_service, _) = BroadcastService::new(
db_ledger.clone(),
bank.clone(),
node.sockets
.broadcast
.try_clone()
@ -476,6 +477,7 @@ impl Fullnode {
let (broadcast_service, _) = BroadcastService::new(
self.db_ledger.clone(),
self.bank.clone(),
self.broadcast_socket
.try_clone()
.expect("Failed to clone broadcast socket"),

View File

@ -1,10 +1,10 @@
//! The `retransmit_stage` retransmits blobs between validators
use crate::cluster_info::ClusterInfo;
use crate::bank::Bank;
use crate::cluster_info::{ClusterInfo, DATA_PLANE_FANOUT, GROW_LAYER_CAPACITY, NEIGHBORHOOD_SIZE};
use crate::counter::Counter;
use crate::db_ledger::DbLedger;
use crate::entry::Entry;
use crate::leader_scheduler::LeaderScheduler;
use crate::result::{Error, Result};
use crate::service::Service;
@ -21,6 +21,7 @@ use std::thread::{self, Builder, JoinHandle};
use std::time::Duration;
fn retransmit(
bank: &Arc<Bank>,
cluster_info: &Arc<RwLock<ClusterInfo>>,
r: &BlobReceiver,
sock: &UdpSocket,
@ -37,13 +38,53 @@ fn retransmit(
.to_owned(),
);
for b in &mut dq {
ClusterInfo::retransmit(&cluster_info, b, sock)?;
// TODO layer 2 logic here
// 1 - find out if I am in layer 1 first
// 1.1 - If yes, then broadcast to all layer 1 nodes
// 1 - using my layer 1 index, broadcast to all layer 2 nodes assuming you know neighborhood size
// 1.2 - If no, then figure out what layer I am in and who my neighbors are and only broadcast to them
// 1 - also check if there are nodes in lower layers and repeat the layer 1 to layer 2 logic
let peers = cluster_info.read().unwrap().sorted_retransmit_peers(bank);
let my_id = cluster_info.read().unwrap().id();
//calc num_layers and num_neighborhoods using the total number of nodes
let (num_layers, layer_indices) = ClusterInfo::describe_data_plane(
peers.len(),
DATA_PLANE_FANOUT,
NEIGHBORHOOD_SIZE,
GROW_LAYER_CAPACITY,
);
if num_layers <= 1 {
/* single layer data plane */
for b in &mut dq {
ClusterInfo::retransmit(&cluster_info, b, sock)?;
}
} else {
//find my index (my ix is the same as the first node with smaller stake)
let my_index = peers
.iter()
.position(|ci| bank.get_stake(&ci.id) <= bank.get_stake(&my_id));
//find my layer
let locality = ClusterInfo::localize(
&layer_indices,
NEIGHBORHOOD_SIZE,
my_index.unwrap_or(peers.len() - 1),
);
let mut retransmit_peers =
peers[locality.neighbor_bounds.0..locality.neighbor_bounds.1].to_vec();
locality.child_layer_peers.iter().for_each(|&ix| {
if let Some(peer) = peers.get(ix) {
retransmit_peers.push(peer.clone());
}
});
for b in &mut dq {
ClusterInfo::retransmit_to(&cluster_info, &retransmit_peers, b, sock)?;
}
}
Ok(())
}
/// Service to retransmit messages from the leader to layer 1 nodes.
/// Service to retransmit messages from the leader or layer 1 to relevant peer nodes.
/// See `cluster_info` for network layer definitions.
/// # Arguments
/// * `sock` - Socket to read from. Read timeout is set to 1.
@ -53,6 +94,7 @@ fn retransmit(
/// * `r` - Receive channel for blobs to be retransmitted to all the layer 1 nodes.
fn retransmitter(
sock: Arc<UdpSocket>,
bank: Arc<Bank>,
cluster_info: Arc<RwLock<ClusterInfo>>,
r: BlobReceiver,
) -> JoinHandle<()> {
@ -61,7 +103,7 @@ fn retransmitter(
.spawn(move || {
trace!("retransmitter started");
loop {
if let Err(e) = retransmit(&cluster_info, &r, &sock) {
if let Err(e) = retransmit(&bank, &cluster_info, &r, &sock) {
match e {
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
@ -83,6 +125,7 @@ pub struct RetransmitStage {
impl RetransmitStage {
#[allow(clippy::new_ret_no_self)]
pub fn new(
bank: &Arc<Bank>,
db_ledger: Arc<DbLedger>,
cluster_info: &Arc<RwLock<ClusterInfo>>,
tick_height: u64,
@ -94,8 +137,12 @@ impl RetransmitStage {
) -> (Self, Receiver<Vec<Entry>>) {
let (retransmit_sender, retransmit_receiver) = channel();
let t_retransmit =
retransmitter(retransmit_socket, cluster_info.clone(), retransmit_receiver);
let t_retransmit = retransmitter(
retransmit_socket,
bank.clone(),
cluster_info.clone(),
retransmit_receiver,
);
let (entry_sender, entry_receiver) = channel();
let done = Arc::new(AtomicBool::new(false));
let t_window = window_service(

View File

@ -96,6 +96,7 @@ impl Tvu {
//the packets coming out of blob_receiver need to be sent to the GPU and verified
//then sent to the window, which does the erasure coding reconstruction
let (retransmit_stage, blob_window_receiver) = RetransmitStage::new(
bank,
db_ledger,
&cluster_info,
bank.tick_height(),