Include shred version in gossip

This commit is contained in:
Michael Vines 2020-01-13 15:59:31 -07:00
parent 965ad778dd
commit e6af4511a8
4 changed files with 29 additions and 24 deletions

View File

@ -67,11 +67,11 @@ pub const GOSSIP_SLEEP_MILLIS: u64 = 100;
/// the number of slots to respond with when responding to `Orphan` requests /// the number of slots to respond with when responding to `Orphan` requests
pub const MAX_ORPHAN_REPAIR_RESPONSES: usize = 10; pub const MAX_ORPHAN_REPAIR_RESPONSES: usize = 10;
/// The maximum size of a bloom filter /// The maximum size of a bloom filter
pub const MAX_BLOOM_SIZE: usize = 1030; pub const MAX_BLOOM_SIZE: usize = 1028;
/// The maximum size of a protocol payload /// The maximum size of a protocol payload
const MAX_PROTOCOL_PAYLOAD_SIZE: u64 = PACKET_DATA_SIZE as u64 - MAX_PROTOCOL_HEADER_SIZE; const MAX_PROTOCOL_PAYLOAD_SIZE: u64 = PACKET_DATA_SIZE as u64 - MAX_PROTOCOL_HEADER_SIZE;
/// The largest protocol header size /// The largest protocol header size
const MAX_PROTOCOL_HEADER_SIZE: u64 = 202; const MAX_PROTOCOL_HEADER_SIZE: u64 = 204;
#[derive(Debug, PartialEq, Eq)] #[derive(Debug, PartialEq, Eq)]
pub enum ClusterInfoError { pub enum ClusterInfoError {
@ -272,7 +272,7 @@ impl ClusterInfo {
let ip_addr = node.gossip.ip(); let ip_addr = node.gossip.ip();
format!( format!(
"{:15} {:2}| {:5} | {:44} | {:5}| {:5}| {:5} | {:5}| {:5} | {:5}| {:5} | {:5}| {:5}\n", "{:15} {:2}| {:5} | {:44} | {:5}| {:5}| {:5} | {:5}| {:5} | {:5}| {:5} | {:5}| {:5}| v{}\n",
if ContactInfo::is_valid_address(&node.gossip) { if ContactInfo::is_valid_address(&node.gossip) {
ip_addr.to_string() ip_addr.to_string()
} else { } else {
@ -290,15 +290,16 @@ impl ClusterInfo {
addr_to_string(&ip_addr, &node.storage_addr), addr_to_string(&ip_addr, &node.storage_addr),
addr_to_string(&ip_addr, &node.rpc), addr_to_string(&ip_addr, &node.rpc),
addr_to_string(&ip_addr, &node.rpc_pubsub), addr_to_string(&ip_addr, &node.rpc_pubsub),
node.shred_version,
) )
}) })
.collect(); .collect();
format!( format!(
"IP Address |Age(ms)| Node identifier \ "IP Address |Age(ms)| Node identifier \
|Gossip| TPU |TPU fwd| TVU |TVU fwd|Repair|Storage| RPC |PubSub\n\ |Gossip| TPU |TPU fwd| TVU |TVU fwd|Repair|Storage| RPC |PubSub|ShredVer\n\
------------------+-------+----------------------------------------------+\ ------------------+-------+----------------------------------------------+\
------+------+-------+------+-------+------+-------+------+------\n\ ------+------+-------+------+-------+------+-------+------+------+--------\n\
{}\ {}\
Nodes: {}{}{}", Nodes: {}{}{}",
nodes.join(""), nodes.join(""),
@ -405,13 +406,13 @@ impl ClusterInfo {
} }
pub fn rpc_peers(&self) -> Vec<ContactInfo> { pub fn rpc_peers(&self) -> Vec<ContactInfo> {
let me = self.my_data().id; let me = self.my_data();
self.gossip self.gossip
.crds .crds
.table .table
.values() .values()
.filter_map(|x| x.value.contact_info()) .filter_map(|x| x.value.contact_info())
.filter(|x| x.id != me) .filter(|x| x.id != me.id)
.filter(|x| ContactInfo::is_valid_address(&x.rpc)) .filter(|x| ContactInfo::is_valid_address(&x.rpc))
.cloned() .cloned()
.collect() .collect()
@ -446,7 +447,7 @@ impl ClusterInfo {
/// all validators that have a valid tvu port. /// all validators that have a valid tvu port.
pub fn tvu_peers(&self) -> Vec<ContactInfo> { pub fn tvu_peers(&self) -> Vec<ContactInfo> {
let me = self.my_data().id; let me = self.my_data();
self.gossip self.gossip
.crds .crds
.table .table
@ -454,34 +455,34 @@ impl ClusterInfo {
.filter_map(|x| x.value.contact_info()) .filter_map(|x| x.value.contact_info())
.filter(|x| ContactInfo::is_valid_address(&x.tvu)) .filter(|x| ContactInfo::is_valid_address(&x.tvu))
.filter(|x| !ClusterInfo::is_archiver(x)) .filter(|x| !ClusterInfo::is_archiver(x))
.filter(|x| x.id != me) .filter(|x| x.id != me.id)
.cloned() .cloned()
.collect() .collect()
} }
/// all peers that have a valid storage addr /// all peers that have a valid storage addr
pub fn storage_peers(&self) -> Vec<ContactInfo> { pub fn storage_peers(&self) -> Vec<ContactInfo> {
let me = self.my_data().id; let me = self.my_data();
self.gossip self.gossip
.crds .crds
.table .table
.values() .values()
.filter_map(|x| x.value.contact_info()) .filter_map(|x| x.value.contact_info())
.filter(|x| ContactInfo::is_valid_address(&x.storage_addr)) .filter(|x| ContactInfo::is_valid_address(&x.storage_addr))
.filter(|x| x.id != me) .filter(|x| x.id != me.id)
.cloned() .cloned()
.collect() .collect()
} }
/// all peers that have a valid tvu /// all peers that have a valid tvu
pub fn retransmit_peers(&self) -> Vec<ContactInfo> { pub fn retransmit_peers(&self) -> Vec<ContactInfo> {
let me = self.my_data().id; let me = self.my_data();
self.gossip self.gossip
.crds .crds
.table .table
.values() .values()
.filter_map(|x| x.value.contact_info()) .filter_map(|x| x.value.contact_info())
.filter(|x| x.id != me) .filter(|x| x.id != me.id)
.filter(|x| ContactInfo::is_valid_address(&x.tvu)) .filter(|x| ContactInfo::is_valid_address(&x.tvu))
.filter(|x| ContactInfo::is_valid_address(&x.tvu_forwards)) .filter(|x| ContactInfo::is_valid_address(&x.tvu_forwards))
.cloned() .cloned()
@ -490,10 +491,10 @@ impl ClusterInfo {
/// all tvu peers with valid gossip addrs that likely have the slot being requested /// all tvu peers with valid gossip addrs that likely have the slot being requested
fn repair_peers(&self, slot: Slot) -> Vec<ContactInfo> { fn repair_peers(&self, slot: Slot) -> Vec<ContactInfo> {
let me = self.my_data().id; let me = self.my_data();
ClusterInfo::tvu_peers(self) ClusterInfo::tvu_peers(self)
.into_iter() .into_iter()
.filter(|x| x.id != me) .filter(|x| x.id != me.id)
.filter(|x| ContactInfo::is_valid_address(&x.gossip)) .filter(|x| ContactInfo::is_valid_address(&x.gossip))
.filter(|x| { .filter(|x| {
self.get_epoch_state_for_node(&x.id, None) self.get_epoch_state_for_node(&x.id, None)
@ -2575,7 +2576,7 @@ mod tests {
#[test] #[test]
fn test_split_messages_packet_size() { fn test_split_messages_packet_size() {
// Test that if a value is smaller than payload size but too large to be wrappe in a vec // Test that if a value is smaller than payload size but too large to be wrapped in a vec
// that it is still dropped // that it is still dropped
let payload: Vec<CrdsValue> = vec![]; let payload: Vec<CrdsValue> = vec![];
let vec_size = serialized_size(&payload).unwrap(); let vec_size = serialized_size(&payload).unwrap();
@ -2589,7 +2590,7 @@ mod tests {
})); }));
let mut i = 0; let mut i = 0;
while value.size() < desired_size { while value.size() <= desired_size {
let slots = (0..i).collect::<BTreeSet<_>>(); let slots = (0..i).collect::<BTreeSet<_>>();
if slots.len() > 200 { if slots.len() > 200 {
panic!( panic!(

View File

@ -31,6 +31,8 @@ pub struct ContactInfo {
pub rpc_pubsub: SocketAddr, pub rpc_pubsub: SocketAddr,
/// latest wallclock picked /// latest wallclock picked
pub wallclock: u64, pub wallclock: u64,
/// node shred version
pub shred_version: u16,
} }
impl Ord for ContactInfo { impl Ord for ContactInfo {
@ -84,6 +86,7 @@ impl Default for ContactInfo {
rpc: socketaddr_any!(), rpc: socketaddr_any!(),
rpc_pubsub: socketaddr_any!(), rpc_pubsub: socketaddr_any!(),
wallclock: 0, wallclock: 0,
shred_version: 0,
} }
} }
} }
@ -115,6 +118,7 @@ impl ContactInfo {
rpc, rpc,
rpc_pubsub, rpc_pubsub,
wallclock: now, wallclock: now,
shred_version: 0,
} }
} }

View File

@ -199,7 +199,6 @@ fn spy(
.unwrap() .unwrap()
.tvu_peers() .tvu_peers()
.into_iter() .into_iter()
.filter(|node| !ClusterInfo::is_archiver(&node))
.collect::<Vec<_>>(); .collect::<Vec<_>>();
archivers = spy_ref.read().unwrap().storage_peers(); archivers = spy_ref.read().unwrap().storage_peers();
if let Some(num) = num_nodes { if let Some(num) = num_nodes {

View File

@ -145,8 +145,6 @@ impl Validator {
info!("entrypoint: {:?}", entrypoint_info_option); info!("entrypoint: {:?}", entrypoint_info_option);
Self::print_node_info(&node);
info!("Initializing sigverify, this could take a while..."); info!("Initializing sigverify, this could take a while...");
sigverify::init(); sigverify::init();
info!("Done."); info!("Done.");
@ -177,8 +175,6 @@ impl Validator {
let bank = bank_forks[bank_info.bank_slot].clone(); let bank = bank_forks[bank_info.bank_slot].clone();
let bank_forks = Arc::new(RwLock::new(bank_forks)); let bank_forks = Arc::new(RwLock::new(bank_forks));
let block_commitment_cache = Arc::new(RwLock::new(BlockCommitmentCache::default())); let block_commitment_cache = Arc::new(RwLock::new(BlockCommitmentCache::default()));
// The version used by shreds, derived from genesis
let shred_version = Shred::version_from_hash(&genesis_hash);
let mut validator_exit = ValidatorExit::default(); let mut validator_exit = ValidatorExit::default();
let exit_ = exit.clone(); let exit_ = exit.clone();
@ -186,6 +182,9 @@ impl Validator {
let validator_exit = Arc::new(RwLock::new(Some(validator_exit))); let validator_exit = Arc::new(RwLock::new(Some(validator_exit)));
node.info.wallclock = timestamp(); node.info.wallclock = timestamp();
node.info.shred_version = Shred::version_from_hash(&genesis_hash);
Self::print_node_info(&node);
let cluster_info = Arc::new(RwLock::new(ClusterInfo::new( let cluster_info = Arc::new(RwLock::new(ClusterInfo::new(
node.info.clone(), node.info.clone(),
keypair.clone(), keypair.clone(),
@ -372,7 +371,7 @@ impl Validator {
block_commitment_cache, block_commitment_cache,
config.dev_sigverify_disabled, config.dev_sigverify_disabled,
config.partition_cfg.clone(), config.partition_cfg.clone(),
shred_version, node.info.shred_version,
transaction_status_sender.clone(), transaction_status_sender.clone(),
); );
@ -392,7 +391,7 @@ impl Validator {
&blockstore, &blockstore,
&config.broadcast_stage_type, &config.broadcast_stage_type,
&exit, &exit,
shred_version, node.info.shred_version,
); );
datapoint_info!("validator-new", ("id", id.to_string(), String)); datapoint_info!("validator-new", ("id", id.to_string(), String));
@ -615,6 +614,7 @@ fn get_stake_percent_in_gossip(
let mut gossip_stake = 0; let mut gossip_stake = 0;
let mut total_activated_stake = 0; let mut total_activated_stake = 0;
let tvu_peers = cluster_info.read().unwrap().tvu_peers(); let tvu_peers = cluster_info.read().unwrap().tvu_peers();
let me = cluster_info.read().unwrap().my_data();
for (activated_stake, vote_account) in bank.vote_accounts().values() { for (activated_stake, vote_account) in bank.vote_accounts().values() {
let vote_state = let vote_state =
@ -622,6 +622,7 @@ fn get_stake_percent_in_gossip(
total_activated_stake += activated_stake; total_activated_stake += activated_stake;
if tvu_peers if tvu_peers
.iter() .iter()
.filter(|peer| peer.shred_version == me.shred_version)
.any(|peer| peer.id == vote_state.node_pubkey) .any(|peer| peer.id == vote_state.node_pubkey)
{ {
trace!( trace!(