From b8fd51e97d32c51a62158b461496b01465a85b0b Mon Sep 17 00:00:00 2001 From: carllin Date: Wed, 8 May 2019 13:50:32 -0700 Subject: [PATCH] Add new gossip structure for supporting repairs (#4205) * Add Epoch Slots to gossip * Add new gossip structure to support Repair * remove unnecessary clones * Setup dummy fast repair in repair_service * PR comments --- core/src/cluster_info.rs | 20 +++++-- core/src/crds_gossip.rs | 16 +++--- core/src/crds_value.rs | 114 +++++++++++++++++++++++++++++++------ core/src/repair_service.rs | 60 +++++++++++-------- core/tests/crds_gossip.rs | 10 ++-- 5 files changed, 161 insertions(+), 59 deletions(-) diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index d6df2832f..4956d239f 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -18,7 +18,7 @@ use crate::contact_info::ContactInfo; use crate::crds_gossip::CrdsGossip; use crate::crds_gossip_error::CrdsGossipError; use crate::crds_gossip_pull::CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS; -use crate::crds_value::{CrdsValue, CrdsValueLabel, Vote}; +use crate::crds_value::{CrdsValue, CrdsValueLabel, EpochSlots, Vote}; use crate::packet::{to_shared_blob, Blob, SharedBlob, BLOB_SIZE}; use crate::repair_service::RepairType; use crate::result::Result; @@ -41,6 +41,7 @@ use solana_sdk::signature::{Keypair, KeypairUtil, Signable, Signature}; use solana_sdk::timing::{duration_as_ms, timestamp}; use solana_sdk::transaction::Transaction; use std::cmp::min; +use std::collections::HashSet; use std::fmt; use std::io; use std::net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket}; @@ -200,7 +201,7 @@ impl ClusterInfo { let mut entry = CrdsValue::ContactInfo(my_data); entry.sign(&self.keypair); self.gossip.refresh_push_active_set(stakes); - self.gossip.process_push_message(&[entry], now); + self.gossip.process_push_message(vec![entry], now); } // TODO kill insert_info, only used by tests @@ -296,12 +297,19 @@ impl ClusterInfo { self.gossip_leader_id = *leader_id; } + pub fn push_epoch_slots(&mut self, id: Pubkey, root: u64, slots: HashSet) { + let now = timestamp(); + let mut entry = CrdsValue::EpochSlots(EpochSlots::new(id, root, slots, now)); + entry.sign(&self.keypair); + self.gossip.process_push_message(vec![entry], now); + } + pub fn push_vote(&mut self, vote: Transaction) { let now = timestamp(); let vote = Vote::new(&self.id(), vote, now); let mut entry = CrdsValue::Vote(vote); entry.sign(&self.keypair); - self.gossip.process_push_message(&[entry], now); + self.gossip.process_push_message(vec![entry], now); } /// Get votes in the crds @@ -1133,7 +1141,7 @@ impl ClusterInfo { fn handle_push_message( me: &Arc>, from: &Pubkey, - data: &[CrdsValue], + data: Vec, ) -> Vec { let self_id = me.read().unwrap().gossip.id; inc_new_counter_info!("cluster_info-push_message", 1, 0, 1000); @@ -1141,7 +1149,7 @@ impl ClusterInfo { .write() .unwrap() .gossip - .process_push_message(&data, timestamp()); + .process_push_message(data, timestamp()); if !prunes.is_empty() { inc_new_counter_info!("cluster_info-push_message-prunes", prunes.len()); let ci = me.read().unwrap().lookup(from).cloned(); @@ -1294,7 +1302,7 @@ impl ClusterInfo { } ret }); - Self::handle_push_message(me, &from, &data) + Self::handle_push_message(me, &from, data) } Protocol::PruneMessage(from, data) => { if data.verify() { diff --git a/core/src/crds_gossip.rs b/core/src/crds_gossip.rs index 966900fde..665bbd565 100644 --- a/core/src/crds_gossip.rs +++ b/core/src/crds_gossip.rs @@ -40,20 +40,20 @@ impl CrdsGossip { self.id = *id; } /// process a push message to the network - pub fn process_push_message(&mut self, values: &[CrdsValue], now: u64) -> Vec { + pub fn process_push_message(&mut self, values: Vec, now: u64) -> Vec { + let labels: Vec<_> = values.iter().map(CrdsValue::label).collect(); + let results: Vec<_> = values - .iter() - .map(|val| { - self.push - .process_push_message(&mut self.crds, val.clone(), now) - }) + .into_iter() + .map(|val| self.push.process_push_message(&mut self.crds, val, now)) .collect(); + results .into_iter() - .zip(values) + .zip(labels) .filter_map(|(r, d)| { if r == Err(CrdsGossipError::PushMessagePrune) { - Some(d.label().pubkey()) + Some(d.pubkey()) } else if let Ok(Some(val)) = r { self.pull .record_old_hash(val.value_hash, val.local_timestamp); diff --git a/core/src/crds_value.rs b/core/src/crds_value.rs index c885722a1..6eeaa1f2e 100644 --- a/core/src/crds_value.rs +++ b/core/src/crds_value.rs @@ -3,6 +3,7 @@ use bincode::serialize; use solana_sdk::pubkey::Pubkey; use solana_sdk::signature::{Keypair, Signable, Signature}; use solana_sdk::transaction::Transaction; +use std::collections::HashSet; use std::fmt; /// CrdsValue that is replicated across the cluster @@ -12,6 +13,58 @@ pub enum CrdsValue { ContactInfo(ContactInfo), /// * Merge Strategy - Latest wallclock is picked Vote(Vote), + /// * Merge Strategy - Latest wallclock is picked + EpochSlots(EpochSlots), +} + +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] +pub struct EpochSlots { + pub from: Pubkey, + pub root: u64, + pub slots: HashSet, + pub signature: Signature, + pub wallclock: u64, +} + +impl EpochSlots { + pub fn new(from: Pubkey, root: u64, slots: HashSet, wallclock: u64) -> Self { + Self { + from, + root, + slots, + signature: Signature::default(), + wallclock, + } + } +} + +impl Signable for EpochSlots { + fn pubkey(&self) -> Pubkey { + self.from + } + + fn signable_data(&self) -> Vec { + #[derive(Serialize)] + struct SignData<'a> { + root: u64, + slots: &'a HashSet, + wallclock: u64, + } + let data = SignData { + root: self.root, + slots: &self.slots, + wallclock: self.wallclock, + }; + serialize(&data).expect("unable to serialize EpochSlots") + } + + fn get_signature(&self) -> Signature { + self.signature + } + + fn set_signature(&mut self, signature: Signature) { + self.signature = signature; + } } #[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] @@ -22,6 +75,17 @@ pub struct Vote { pub wallclock: u64, } +impl Vote { + pub fn new(from: &Pubkey, transaction: Transaction, wallclock: u64) -> Self { + Self { + from: *from, + transaction, + signature: Signature::default(), + wallclock, + } + } +} + impl Signable for Vote { fn pubkey(&self) -> Pubkey { self.from @@ -29,12 +93,12 @@ impl Signable for Vote { fn signable_data(&self) -> Vec { #[derive(Serialize)] - struct SignData { - transaction: Transaction, + struct SignData<'a> { + transaction: &'a Transaction, wallclock: u64, } let data = SignData { - transaction: self.transaction.clone(), + transaction: &self.transaction, wallclock: self.wallclock, }; serialize(&data).expect("unable to serialize Vote") @@ -55,6 +119,7 @@ impl Signable for Vote { pub enum CrdsValueLabel { ContactInfo(Pubkey), Vote(Pubkey), + EpochSlots(Pubkey), } impl fmt::Display for CrdsValueLabel { @@ -62,6 +127,7 @@ impl fmt::Display for CrdsValueLabel { match self { CrdsValueLabel::ContactInfo(_) => write!(f, "ContactInfo({})", self.pubkey()), CrdsValueLabel::Vote(_) => write!(f, "Vote({})", self.pubkey()), + CrdsValueLabel::EpochSlots(_) => write!(f, "EpochSlots({})", self.pubkey()), } } } @@ -71,17 +137,7 @@ impl CrdsValueLabel { match self { CrdsValueLabel::ContactInfo(p) => *p, CrdsValueLabel::Vote(p) => *p, - } - } -} - -impl Vote { - pub fn new(from: &Pubkey, transaction: Transaction, wallclock: u64) -> Self { - Vote { - from: *from, - transaction, - signature: Signature::default(), - wallclock, + CrdsValueLabel::EpochSlots(p) => *p, } } } @@ -94,6 +150,7 @@ impl CrdsValue { match self { CrdsValue::ContactInfo(contact_info) => contact_info.wallclock, CrdsValue::Vote(vote) => vote.wallclock, + CrdsValue::EpochSlots(vote) => vote.wallclock, } } pub fn label(&self) -> CrdsValueLabel { @@ -102,6 +159,7 @@ impl CrdsValue { CrdsValueLabel::ContactInfo(contact_info.pubkey()) } CrdsValue::Vote(vote) => CrdsValueLabel::Vote(vote.pubkey()), + CrdsValue::EpochSlots(slots) => CrdsValueLabel::EpochSlots(slots.pubkey()), } } pub fn contact_info(&self) -> Option<&ContactInfo> { @@ -116,11 +174,18 @@ impl CrdsValue { _ => None, } } + pub fn epoch_slots(&self) -> Option<&EpochSlots> { + match self { + CrdsValue::EpochSlots(slots) => Some(slots), + _ => None, + } + } /// Return all the possible labels for a record identified by Pubkey. - pub fn record_labels(key: &Pubkey) -> [CrdsValueLabel; 2] { + pub fn record_labels(key: &Pubkey) -> [CrdsValueLabel; 3] { [ CrdsValueLabel::ContactInfo(*key), CrdsValueLabel::Vote(*key), + CrdsValueLabel::EpochSlots(*key), ] } } @@ -130,12 +195,15 @@ impl Signable for CrdsValue { match self { CrdsValue::ContactInfo(contact_info) => contact_info.sign(keypair), CrdsValue::Vote(vote) => vote.sign(keypair), + CrdsValue::EpochSlots(epoch_slots) => epoch_slots.sign(keypair), }; } + fn verify(&self) -> bool { match self { CrdsValue::ContactInfo(contact_info) => contact_info.verify(), CrdsValue::Vote(vote) => vote.verify(), + CrdsValue::EpochSlots(epoch_slots) => epoch_slots.verify(), } } @@ -143,6 +211,7 @@ impl Signable for CrdsValue { match self { CrdsValue::ContactInfo(contact_info) => contact_info.pubkey(), CrdsValue::Vote(vote) => vote.pubkey(), + CrdsValue::EpochSlots(epoch_slots) => epoch_slots.pubkey(), } } @@ -169,12 +238,13 @@ mod test { #[test] fn test_labels() { - let mut hits = [false; 2]; + let mut hits = [false; 3]; // this method should cover all the possible labels for v in &CrdsValue::record_labels(&Pubkey::default()) { match v { CrdsValueLabel::ContactInfo(_) => hits[0] = true, CrdsValueLabel::Vote(_) => hits[1] = true, + CrdsValueLabel::EpochSlots(_) => hits[2] = true, } } assert!(hits.iter().all(|x| *x)); @@ -190,6 +260,11 @@ mod test { assert_eq!(v.wallclock(), 0); let key = v.clone().vote().unwrap().from; assert_eq!(v.label(), CrdsValueLabel::Vote(key)); + + let v = CrdsValue::EpochSlots(EpochSlots::new(Pubkey::default(), 0, HashSet::new(), 0)); + assert_eq!(v.wallclock(), 0); + let key = v.clone().epoch_slots().unwrap().from; + assert_eq!(v.label(), CrdsValueLabel::EpochSlots(key)); } #[test] fn test_signature() { @@ -200,6 +275,13 @@ mod test { verify_signatures(&mut v, &keypair, &wrong_keypair); v = CrdsValue::Vote(Vote::new(&keypair.pubkey(), test_tx(), timestamp())); verify_signatures(&mut v, &keypair, &wrong_keypair); + v = CrdsValue::EpochSlots(EpochSlots::new( + keypair.pubkey(), + 0, + HashSet::new(), + timestamp(), + )); + verify_signatures(&mut v, &keypair, &wrong_keypair); } fn verify_signatures( diff --git a/core/src/repair_service.rs b/core/src/repair_service.rs index 5057e74cc..1549f5dc6 100644 --- a/core/src/repair_service.rs +++ b/core/src/repair_service.rs @@ -6,6 +6,8 @@ use crate::cluster_info::ClusterInfo; use crate::result::Result; use crate::service::Service; use solana_metrics::{influxdb, submit}; +use solana_sdk::pubkey::Pubkey; +use std::collections::HashSet; use std::net::UdpSocket; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, RwLock}; @@ -60,6 +62,30 @@ pub struct RepairService { } impl RepairService { + pub fn new( + blocktree: Arc, + exit: &Arc, + repair_socket: Arc, + cluster_info: Arc>, + repair_slot_range: Option, + ) -> Self { + let exit = exit.clone(); + let t_repair = Builder::new() + .name("solana-repair-service".to_string()) + .spawn(move || { + Self::run( + &blocktree, + exit, + &repair_socket, + &cluster_info, + repair_slot_range, + ) + }) + .unwrap(); + + RepairService { t_repair } + } + fn run( blocktree: &Arc, exit: Arc, @@ -68,6 +94,7 @@ impl RepairService { repair_slot_range: Option, ) { let mut repair_info = RepairInfo::new(); + let epoch_slots: HashSet = HashSet::new(); let id = cluster_info.read().unwrap().id(); loop { if exit.load(Ordering::Relaxed) { @@ -84,6 +111,7 @@ impl RepairService { repair_slot_range, ) } else { + Self::update_fast_repair(id, &epoch_slots, &cluster_info); Self::generate_repairs(blocktree, MAX_REPAIR_LENGTH) } }; @@ -129,30 +157,6 @@ impl RepairService { } } - pub fn new( - blocktree: Arc, - exit: &Arc, - repair_socket: Arc, - cluster_info: Arc>, - repair_slot_range: Option, - ) -> Self { - let exit = exit.clone(); - let t_repair = Builder::new() - .name("solana-repair-service".to_string()) - .spawn(move || { - Self::run( - &blocktree, - exit, - &repair_socket, - &cluster_info, - repair_slot_range, - ) - }) - .unwrap(); - - RepairService { t_repair } - } - fn generate_repairs_in_range( blocktree: &Blocktree, max_repairs: usize, @@ -266,6 +270,14 @@ impl RepairService { } } } + + fn update_fast_repair(id: Pubkey, slots: &HashSet, cluster_info: &RwLock) { + let root = 0; + cluster_info + .write() + .unwrap() + .push_epoch_slots(id, root, slots.clone()); + } } impl Service for RepairService { diff --git a/core/tests/crds_gossip.rs b/core/tests/crds_gossip.rs index 0ee42b0f5..e056e02d3 100644 --- a/core/tests/crds_gossip.rs +++ b/core/tests/crds_gossip.rs @@ -125,7 +125,7 @@ fn network_simulator(network: &mut Network) { .and_then(|v| v.contact_info().cloned()) .unwrap(); m.wallclock = now; - node.process_push_message(&[CrdsValue::ContactInfo(m.clone())], now); + node.process_push_message(vec![CrdsValue::ContactInfo(m)], now); }); // push for a bit let (queue_size, bytes_tx) = network_run_push(network, start, end); @@ -170,18 +170,18 @@ fn network_run_push(network: &mut Network, start: usize, end: usize) -> (usize, }) .collect(); let transfered: Vec<_> = requests - .par_iter() + .into_par_iter() .map(|(from, peers, msgs)| { let mut bytes: usize = 0; let mut delivered: usize = 0; let mut num_msgs: usize = 0; let mut prunes: usize = 0; for to in peers { - bytes += serialized_size(msgs).unwrap() as usize; + bytes += serialized_size(&msgs).unwrap() as usize; num_msgs += 1; let rsps = network .get(&to) - .map(|node| node.lock().unwrap().process_push_message(&msgs, now)) + .map(|node| node.lock().unwrap().process_push_message(msgs.clone(), now)) .unwrap(); bytes += serialized_size(&rsps).unwrap() as usize; prunes += rsps.len(); @@ -191,7 +191,7 @@ fn network_run_push(network: &mut Network, start: usize, end: usize) -> (usize, let mut node = node.lock().unwrap(); let destination = node.id; let now = timestamp(); - node.process_prune_msg(&*to, &destination, &rsps, now, now) + node.process_prune_msg(&to, &destination, &rsps, now, now) .unwrap() }) .unwrap();