adds api to obtain the parent node in the turbine retransmit tree (#115)
Following commits will use this api to check retransmitter's signature on incoming shreds.
This commit is contained in:
parent
85cfe23b46
commit
42e8309c34
|
@ -7501,6 +7501,7 @@ dependencies = [
|
|||
"solana-runtime",
|
||||
"solana-sdk",
|
||||
"solana-streamer",
|
||||
"test-case",
|
||||
"thiserror",
|
||||
"tokio",
|
||||
]
|
||||
|
|
|
@ -43,6 +43,7 @@ tokio = { workspace = true }
|
|||
assert_matches = { workspace = true }
|
||||
solana-logger = { workspace = true }
|
||||
solana-runtime = { workspace = true, features = ["dev-context-only-utils"] }
|
||||
test-case = { workspace = true }
|
||||
|
||||
[[bench]]
|
||||
name = "cluster_info"
|
||||
|
|
|
@ -152,8 +152,7 @@ impl ClusterNodes<BroadcastStage> {
|
|||
}
|
||||
|
||||
pub(crate) fn get_broadcast_peer(&self, shred: &ShredId) -> Option<&ContactInfo> {
|
||||
let shred_seed = shred.seed(&self.pubkey);
|
||||
let mut rng = ChaChaRng::from_seed(shred_seed);
|
||||
let mut rng = get_seeded_rng(/*leader:*/ &self.pubkey, shred);
|
||||
let index = self.weighted_shuffle.first(&mut rng)?;
|
||||
self.nodes[index].contact_info()
|
||||
}
|
||||
|
@ -187,7 +186,6 @@ impl ClusterNodes<RetransmitStage> {
|
|||
shred: &ShredId,
|
||||
fanout: usize,
|
||||
) -> Result<RetransmitPeers, Error> {
|
||||
let shred_seed = shred.seed(slot_leader);
|
||||
let mut weighted_shuffle = self.weighted_shuffle.clone();
|
||||
// Exclude slot leader from list of nodes.
|
||||
if slot_leader == &self.pubkey {
|
||||
|
@ -200,7 +198,7 @@ impl ClusterNodes<RetransmitStage> {
|
|||
weighted_shuffle.remove_index(*index);
|
||||
}
|
||||
let mut addrs = HashMap::<SocketAddr, Pubkey>::with_capacity(self.nodes.len());
|
||||
let mut rng = ChaChaRng::from_seed(shred_seed);
|
||||
let mut rng = get_seeded_rng(slot_leader, shred);
|
||||
let protocol = get_broadcast_protocol(shred);
|
||||
let nodes: Vec<_> = weighted_shuffle
|
||||
.shuffle(&mut rng)
|
||||
|
@ -233,6 +231,43 @@ impl ClusterNodes<RetransmitStage> {
|
|||
addrs,
|
||||
})
|
||||
}
|
||||
|
||||
// Returns the parent node in the turbine broadcast tree.
|
||||
// Returns None if the node is the root of the tree or if it is not staked.
|
||||
#[allow(unused)]
|
||||
fn get_retransmit_parent(
|
||||
&self,
|
||||
leader: &Pubkey,
|
||||
shred: &ShredId,
|
||||
fanout: usize,
|
||||
) -> Result<Option<Pubkey>, Error> {
|
||||
// Exclude slot leader from list of nodes.
|
||||
if leader == &self.pubkey {
|
||||
return Err(Error::Loopback {
|
||||
leader: *leader,
|
||||
shred: *shred,
|
||||
});
|
||||
}
|
||||
// Unstaked nodes' position in the turbine tree is not deterministic
|
||||
// and depends on gossip propagation of contact-infos. Therefore, if
|
||||
// this node is not staked return None.
|
||||
if self.nodes[self.index[&self.pubkey]].stake == 0 {
|
||||
return Ok(None);
|
||||
}
|
||||
let mut weighted_shuffle = self.weighted_shuffle.clone();
|
||||
if let Some(index) = self.index.get(leader).copied() {
|
||||
weighted_shuffle.remove_index(index);
|
||||
}
|
||||
let mut rng = get_seeded_rng(leader, shred);
|
||||
// Only need shuffled nodes until this node itself.
|
||||
let nodes: Vec<_> = weighted_shuffle
|
||||
.shuffle(&mut rng)
|
||||
.map(|index| &self.nodes[index])
|
||||
.take_while(|node| node.pubkey() != self.pubkey)
|
||||
.collect();
|
||||
let parent = get_retransmit_parent(fanout, nodes.len(), &nodes);
|
||||
Ok(parent.map(Node::pubkey))
|
||||
}
|
||||
}
|
||||
|
||||
pub fn new_cluster_nodes<T: 'static>(
|
||||
|
@ -296,6 +331,11 @@ fn get_nodes(cluster_info: &ClusterInfo, stakes: &HashMap<Pubkey, u64>) -> Vec<N
|
|||
.collect()
|
||||
}
|
||||
|
||||
fn get_seeded_rng(leader: &Pubkey, shred: &ShredId) -> ChaChaRng {
|
||||
let seed = shred.seed(leader);
|
||||
ChaChaRng::from_seed(seed)
|
||||
}
|
||||
|
||||
// root : [0]
|
||||
// 1st layer: [1, 2, ..., fanout]
|
||||
// 2nd layer: [[fanout + 1, ..., fanout * 2],
|
||||
|
@ -327,6 +367,21 @@ fn get_retransmit_peers<T: Copy>(
|
|||
.copied()
|
||||
}
|
||||
|
||||
// Returns the parent node in the turbine broadcast tree.
|
||||
// Returns None if the node is the root of the tree.
|
||||
fn get_retransmit_parent<T: Copy>(
|
||||
fanout: usize,
|
||||
index: usize, // Local node's index within the nodes slice.
|
||||
nodes: &[T],
|
||||
) -> Option<T> {
|
||||
// Node's index within its neighborhood.
|
||||
let offset = index.saturating_sub(1) % fanout;
|
||||
let index = index.checked_sub(1)? / fanout;
|
||||
let index = index - index.saturating_sub(1) % fanout;
|
||||
let index = if index == 0 { index } else { index + offset };
|
||||
nodes.get(index).copied()
|
||||
}
|
||||
|
||||
impl<T> ClusterNodesCache<T> {
|
||||
pub fn new(
|
||||
// Capacity of underlying LRU-cache in terms of number of epochs.
|
||||
|
@ -516,7 +571,11 @@ pub fn check_feature_activation(feature: &Pubkey, shred_slot: Slot, root_bank: &
|
|||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use {
|
||||
super::*,
|
||||
std::{fmt::Debug, hash::Hash},
|
||||
test_case::test_case,
|
||||
};
|
||||
|
||||
#[test]
|
||||
fn test_cluster_nodes_retransmit() {
|
||||
|
@ -589,10 +648,42 @@ mod tests {
|
|||
}
|
||||
}
|
||||
|
||||
// Checks (1) computed retransmit children against expected children and
|
||||
// (2) computed parent of each child against the expected parent.
|
||||
fn check_retransmit_nodes<T>(fanout: usize, nodes: &[T], peers: Vec<Vec<T>>)
|
||||
where
|
||||
T: Copy + Eq + PartialEq + Debug + Hash,
|
||||
{
|
||||
// Map node identities to their index within the shuffled tree.
|
||||
let index: HashMap<_, _> = nodes
|
||||
.iter()
|
||||
.copied()
|
||||
.enumerate()
|
||||
.map(|(k, node)| (node, k))
|
||||
.collect();
|
||||
let offset = peers.len();
|
||||
// Root node's parent is None.
|
||||
assert_eq!(get_retransmit_parent(fanout, /*index:*/ 0, nodes), None);
|
||||
for (k, peers) in peers.into_iter().enumerate() {
|
||||
assert_eq!(
|
||||
get_retransmit_peers(fanout, k, nodes).collect::<Vec<_>>(),
|
||||
peers
|
||||
);
|
||||
let parent = Some(nodes[k]);
|
||||
for peer in peers {
|
||||
assert_eq!(get_retransmit_parent(fanout, index[&peer], nodes), parent);
|
||||
}
|
||||
}
|
||||
// Remaining nodes have no children.
|
||||
for k in offset..=nodes.len() {
|
||||
assert_eq!(get_retransmit_peers(fanout, k, nodes).next(), None);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_get_retransmit_peers() {
|
||||
fn test_get_retransmit_nodes() {
|
||||
// fanout 2
|
||||
let index = vec![
|
||||
let nodes = [
|
||||
7, // root
|
||||
6, 10, // 1st layer
|
||||
// 2nd layer
|
||||
|
@ -620,16 +711,9 @@ mod tests {
|
|||
vec![16, 9],
|
||||
vec![8],
|
||||
];
|
||||
for (k, peers) in peers.into_iter().enumerate() {
|
||||
let retransmit_peers = get_retransmit_peers(/*fanout:*/ 2, k, &index);
|
||||
assert_eq!(retransmit_peers.collect::<Vec<_>>(), peers);
|
||||
}
|
||||
for k in 10..=index.len() {
|
||||
let mut retransmit_peers = get_retransmit_peers(/*fanout:*/ 2, k, &index);
|
||||
assert_eq!(retransmit_peers.next(), None);
|
||||
}
|
||||
check_retransmit_nodes(/*fanout:*/ 2, &nodes, peers);
|
||||
// fanout 3
|
||||
let index = vec![
|
||||
let nodes = [
|
||||
19, // root
|
||||
14, 15, 28, // 1st layer
|
||||
// 2nd layer
|
||||
|
@ -661,13 +745,84 @@ mod tests {
|
|||
vec![24, 32],
|
||||
vec![34],
|
||||
];
|
||||
for (k, peers) in peers.into_iter().enumerate() {
|
||||
let retransmit_peers = get_retransmit_peers(/*fanout:*/ 3, k, &index);
|
||||
assert_eq!(retransmit_peers.collect::<Vec<_>>(), peers);
|
||||
check_retransmit_nodes(/*fanout:*/ 3, &nodes, peers);
|
||||
let nodes = [
|
||||
5, // root
|
||||
34, 52, 8, // 1st layer
|
||||
// 2nd layar
|
||||
44, 18, 2, // 1st neigborhood
|
||||
42, 47, 46, // 2nd
|
||||
11, 26, 28, // 3rd
|
||||
// 3rd layer
|
||||
53, 23, 37, // 1st neighborhood
|
||||
40, 13, 7, // 2nd
|
||||
50, 35, 22, // 3rd
|
||||
3, 27, 31, // 4th
|
||||
10, 48, 15, // 5th
|
||||
19, 6, 30, // 6th
|
||||
36, 45, 1, // 7th
|
||||
38, 12, 17, // 8th
|
||||
4, 32, 16, // 9th
|
||||
// 4th layer
|
||||
41, 49, 24, // 1st neighborhood
|
||||
14, 9, 0, // 2nd
|
||||
29, 21, 39, // 3rd
|
||||
43, 51, 33, // 4th
|
||||
25, 20, // 5th
|
||||
];
|
||||
let peers = vec![
|
||||
vec![34, 52, 8],
|
||||
vec![44, 42, 11],
|
||||
vec![18, 47, 26],
|
||||
vec![2, 46, 28],
|
||||
vec![53, 40, 50],
|
||||
vec![23, 13, 35],
|
||||
vec![37, 7, 22],
|
||||
vec![3, 10, 19],
|
||||
vec![27, 48, 6],
|
||||
vec![31, 15, 30],
|
||||
vec![36, 38, 4],
|
||||
vec![45, 12, 32],
|
||||
vec![1, 17, 16],
|
||||
vec![41, 14, 29],
|
||||
vec![49, 9, 21],
|
||||
vec![24, 0, 39],
|
||||
vec![43, 25],
|
||||
vec![51, 20],
|
||||
vec![33],
|
||||
];
|
||||
check_retransmit_nodes(/*fanout:*/ 3, &nodes, peers);
|
||||
}
|
||||
|
||||
#[test_case(2, 1_347)]
|
||||
#[test_case(3, 1_359)]
|
||||
#[test_case(4, 4_296)]
|
||||
#[test_case(5, 3_925)]
|
||||
#[test_case(6, 8_778)]
|
||||
#[test_case(7, 9_879)]
|
||||
fn test_get_retransmit_nodes_round_trip(fanout: usize, size: usize) {
|
||||
let mut rng = rand::thread_rng();
|
||||
let mut nodes: Vec<_> = (0..size).collect();
|
||||
nodes.shuffle(&mut rng);
|
||||
// Map node identities to their index within the shuffled tree.
|
||||
let index: HashMap<_, _> = nodes
|
||||
.iter()
|
||||
.copied()
|
||||
.enumerate()
|
||||
.map(|(k, node)| (node, k))
|
||||
.collect();
|
||||
// Root node's parent is None.
|
||||
assert_eq!(get_retransmit_parent(fanout, /*index:*/ 0, &nodes), None);
|
||||
for k in 1..size {
|
||||
let parent = get_retransmit_parent(fanout, k, &nodes).unwrap();
|
||||
let mut peers = get_retransmit_peers(fanout, index[&parent], &nodes);
|
||||
assert_eq!(peers.find(|&peer| peer == nodes[k]), Some(nodes[k]));
|
||||
}
|
||||
for k in 13..=index.len() {
|
||||
let mut retransmit_peers = get_retransmit_peers(/*fanout:*/ 3, k, &index);
|
||||
assert_eq!(retransmit_peers.next(), None);
|
||||
for k in 0..size {
|
||||
let parent = Some(nodes[k]);
|
||||
for peer in get_retransmit_peers(fanout, k, &nodes) {
|
||||
assert_eq!(get_retransmit_parent(fanout, index[&peer], &nodes), parent);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue