From c8c85ff93b9508f67fdfe891ee8c26990baef833 Mon Sep 17 00:00:00 2001 From: Sagar Dhawan Date: Fri, 8 Mar 2019 18:08:24 -0800 Subject: [PATCH] Fix propagation of incorrectly signed messages in Gossip (#3201) --- core/src/cluster_info.rs | 107 +++++++++++++++++++++++++++++++++-- core/src/crds_gossip.rs | 2 +- core/src/crds_gossip_pull.rs | 2 +- core/src/crds_value.rs | 25 +++----- core/src/fullnode.rs | 3 +- core/src/gossip_service.rs | 9 ++- core/src/replicator.rs | 4 +- 7 files changed, 121 insertions(+), 31 deletions(-) diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index 31fb3f8b9..08ba186e0 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -77,6 +77,8 @@ pub struct ClusterInfo { // TODO: remove gossip_leader_id once all usage of `set_leader()` and `leader_data()` is // purged gossip_leader_id: Pubkey, + /// The network entrypoint + entrypoint: Option, } #[derive(Default, Clone)] @@ -174,13 +176,21 @@ impl ClusterInfo { gossip: CrdsGossip::default(), keypair, gossip_leader_id: Pubkey::default(), + entrypoint: None, }; let id = node_info.id; me.gossip.set_self(id); - me.insert_info(node_info); + me.insert_self(node_info); me.push_self(&HashMap::new()); me } + pub fn insert_self(&mut self, node_info: NodeInfo) { + if self.id() == node_info.id { + let mut value = CrdsValue::ContactInfo(node_info.clone()); + value.sign(&self.keypair); + let _ = self.gossip.crds.insert(value, timestamp()); + } + } pub fn push_self(&mut self, stakes: &HashMap) { let mut my_data = self.my_data(); let now = timestamp(); @@ -190,11 +200,15 @@ impl ClusterInfo { self.gossip.refresh_push_active_set(stakes); self.gossip.process_push_message(&[entry], now); } + // TODO kill insert_info, only used by tests pub fn insert_info(&mut self, node_info: NodeInfo) { let mut value = CrdsValue::ContactInfo(node_info); value.sign(&self.keypair); let _ = self.gossip.crds.insert(value, timestamp()); } + pub fn set_entrypoint(&mut self, entrypoint: NodeInfo) { + self.entrypoint = Some(entrypoint) + } pub fn id(&self) -> Pubkey { self.gossip.id } @@ -763,6 +777,26 @@ impl ClusterInfo { Ok((addr, out)) } + // If the network entrypoint hasn't been discovered yet, add it to the crds table + fn add_entrypoint(&mut self, pulls: &mut Vec<(Pubkey, Bloom, SocketAddr, CrdsValue)>) { + match &self.entrypoint { + Some(entrypoint) => { + let self_info = self + .gossip + .crds + .lookup(&CrdsValueLabel::ContactInfo(self.id())) + .unwrap_or_else(|| panic!("self_id invalid {}", self.id())); + + pulls.push(( + entrypoint.id, + self.gossip.pull.build_crds_filter(&self.gossip.crds), + entrypoint.gossip, + self_info.clone(), + )) + } + None => (), + } + } fn new_pull_requests(&mut self, stakes: &HashMap) -> Vec<(SocketAddr, Protocol)> { let now = timestamp(); @@ -773,7 +807,7 @@ impl ClusterInfo { .into_iter() .collect(); - let pr: Vec<_> = pulls + let mut pr: Vec<_> = pulls .into_iter() .filter_map(|(peer, filter, self_info)| { let peer_label = CrdsValueLabel::ContactInfo(peer); @@ -784,6 +818,11 @@ impl ClusterInfo { .map(|peer_info| (peer, filter, peer_info.gossip, self_info)) }) .collect(); + if pr.is_empty() { + self.add_entrypoint(&mut pr); + } else { + self.entrypoint = None; + } pr.into_iter() .map(|(peer, filter, gossip, self_info)| { self.gossip.mark_pull_request_creation_time(peer, now); @@ -1069,7 +1108,11 @@ impl ClusterInfo { return vec![]; } - me.write().unwrap().insert_info(from.clone()); + me.write() + .unwrap() + .gossip + .crds + .update_record_timestamp(from.id, timestamp()); let my_info = me.read().unwrap().my_data().clone(); inc_new_counter_info!("cluster_info-window-request-recv", 1); trace!( @@ -1103,8 +1146,12 @@ impl ClusterInfo { match request { // TODO verify messages faster Protocol::PullRequest(filter, caller) => { - //Pulls don't need to be verified - Self::handle_pull_request(me, filter, caller, from_addr) + if !caller.verify() { + inc_new_counter_info!("cluster_info-gossip_pull_request_verify_fail", 1); + vec![] + } else { + Self::handle_pull_request(me, filter, caller, from_addr) + } } Protocol::PullResponse(from, mut data) => { data.retain(|v| { @@ -1456,6 +1503,19 @@ mod tests { assert!(cluster_info.gossip.crds.lookup(&label).is_some()); } #[test] + fn test_insert_self() { + let d = NodeInfo::new_localhost(Keypair::new().pubkey(), timestamp()); + let mut cluster_info = ClusterInfo::new_with_invalid_keypair(d.clone()); + let entry_label = CrdsValueLabel::ContactInfo(cluster_info.id()); + assert!(cluster_info.gossip.crds.lookup(&entry_label).is_some()); + + // inserting something else shouldn't work + let d = NodeInfo::new_localhost(Keypair::new().pubkey(), timestamp()); + cluster_info.insert_self(d.clone()); + let label = CrdsValueLabel::ContactInfo(d.id); + assert!(cluster_info.gossip.crds.lookup(&label).is_none()); + } + #[test] fn window_index_request() { let me = NodeInfo::new_localhost(Keypair::new().pubkey(), timestamp()); let mut cluster_info = ClusterInfo::new_with_invalid_keypair(me); @@ -1891,3 +1951,40 @@ mod tests { assert_eq!(max_ts, new_max_ts); } } +#[test] +fn test_add_entrypoint() { + let node_keypair = Arc::new(Keypair::new()); + let mut cluster_info = ClusterInfo::new( + NodeInfo::new_localhost(node_keypair.pubkey(), timestamp()), + node_keypair, + ); + let entrypoint_id = Keypair::new().pubkey(); + let entrypoint = NodeInfo::new_localhost(entrypoint_id, timestamp()); + cluster_info.set_entrypoint(entrypoint.clone()); + let pulls = cluster_info.new_pull_requests(&HashMap::new()); + assert_eq!(1, pulls.len()); + match pulls.get(0) { + Some((addr, msg)) => { + assert_eq!(*addr, entrypoint.gossip); + match msg { + Protocol::PullRequest(_, value) => { + assert!(value.verify()); + assert_eq!(value.pubkey(), cluster_info.id()) + } + _ => panic!("wrong protocol"), + } + } + None => panic!("entrypoint should be a pull destination"), + } + + // now add this message back to the table and make sure after the next pull, the entrypoint is unset + let entrypoint_crdsvalue = CrdsValue::ContactInfo(entrypoint.clone()); + let cluster_info = Arc::new(RwLock::new(cluster_info)); + ClusterInfo::handle_pull_response(&cluster_info, entrypoint_id, vec![entrypoint_crdsvalue]); + let pulls = cluster_info + .write() + .unwrap() + .new_pull_requests(&HashMap::new()); + assert_eq!(1, pulls.len()); + assert_eq!(cluster_info.read().unwrap().entrypoint, None); +} diff --git a/core/src/crds_gossip.rs b/core/src/crds_gossip.rs index 68d856363..2916ae492 100644 --- a/core/src/crds_gossip.rs +++ b/core/src/crds_gossip.rs @@ -21,7 +21,7 @@ pub struct CrdsGossip { pub crds: Crds, pub id: Pubkey, pub push: CrdsGossipPush, - pull: CrdsGossipPull, + pub pull: CrdsGossipPull, } impl Default for CrdsGossip { diff --git a/core/src/crds_gossip_pull.rs b/core/src/crds_gossip_pull.rs index 2e0416af6..4bb8f5cb6 100644 --- a/core/src/crds_gossip_pull.rs +++ b/core/src/crds_gossip_pull.rs @@ -148,7 +148,7 @@ impl CrdsGossipPull { failed } /// build a filter of the current crds table - fn build_crds_filter(&self, crds: &Crds) -> Bloom { + pub fn build_crds_filter(&self, crds: &Crds) -> Bloom { let num = cmp::max( CRDS_GOSSIP_BLOOM_SIZE, crds.table.values().count() + self.purged_values.len(), diff --git a/core/src/crds_value.rs b/core/src/crds_value.rs index 21f230dbf..759d622ef 100644 --- a/core/src/crds_value.rs +++ b/core/src/crds_value.rs @@ -27,7 +27,6 @@ pub struct LeaderId { #[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] pub struct Vote { pub transaction: Transaction, - pub signature: Signature, pub wallclock: u64, } @@ -61,30 +60,25 @@ impl Signable for LeaderId { } impl Signable for Vote { + fn sign(&mut self, _keypair: &Keypair) {} + + fn verify(&self) -> bool { + self.transaction.verify_signature() + } + fn pubkey(&self) -> Pubkey { self.transaction.account_keys[0] } fn signable_data(&self) -> Vec { - #[derive(Serialize)] - struct SignData { - transaction: Transaction, - wallclock: u64, - } - let data = SignData { - transaction: self.transaction.clone(), - wallclock: self.wallclock, - }; - serialize(&data).expect("unable to serialize Vote") + vec![] } fn get_signature(&self) -> Signature { - self.signature + Signature::default() } - fn set_signature(&mut self, signature: Signature) { - self.signature = signature - } + fn set_signature(&mut self, _signature: Signature) {} } /// Type of the replicated value @@ -132,7 +126,6 @@ impl Vote { pub fn new(transaction: Transaction, wallclock: u64) -> Self { Vote { transaction, - signature: Signature::default(), wallclock, } } diff --git a/core/src/fullnode.rs b/core/src/fullnode.rs index 770f7a693..c6fdf0292 100644 --- a/core/src/fullnode.rs +++ b/core/src/fullnode.rs @@ -155,11 +155,12 @@ impl Fullnode { // Insert the entrypoint info, should only be None if this node // is the bootstrap leader + if let Some(entrypoint_info) = entrypoint_info_option { cluster_info .write() .unwrap() - .insert_info(entrypoint_info.clone()); + .set_entrypoint(entrypoint_info.clone()); } let sockets = Sockets { diff --git a/core/src/gossip_service.rs b/core/src/gossip_service.rs index 12bcea878..24e6bae97 100644 --- a/core/src/gossip_service.rs +++ b/core/src/gossip_service.rs @@ -2,7 +2,7 @@ use crate::bank_forks::BankForks; use crate::blocktree::Blocktree; -use crate::cluster_info::{ClusterInfo, NodeInfo}; +use crate::cluster_info::ClusterInfo; use crate::contact_info::ContactInfo; use crate::service::Service; use crate::streamer; @@ -51,7 +51,7 @@ impl GossipService { } } -pub fn discover(gossip_addr: &SocketAddr, num_nodes: usize) -> std::io::Result> { +pub fn discover(gossip_addr: &SocketAddr, num_nodes: usize) -> std::io::Result> { let exit = Arc::new(AtomicBool::new(false)); let (gossip_service, spy_ref) = make_spy_node(gossip_addr, &exit); let id = spy_ref.read().unwrap().keypair.pubkey(); @@ -67,7 +67,7 @@ pub fn discover(gossip_addr: &SocketAddr, num_nodes: usize) -> std::io::Result= num_nodes { - info!( + trace!( "discover success in {}s...\n{}", now.elapsed().as_secs(), spy_ref.read().unwrap().node_info_trace() @@ -108,8 +108,7 @@ fn make_spy_node( let keypair = Arc::new(Keypair::new()); let (node, gossip_socket) = ClusterInfo::spy_node(&keypair.pubkey()); let mut cluster_info = ClusterInfo::new(node, keypair); - cluster_info.insert_info(ContactInfo::new_gossip_entry_point(gossip_addr)); - + cluster_info.set_entrypoint(ContactInfo::new_gossip_entry_point(gossip_addr)); let cluster_info = Arc::new(RwLock::new(cluster_info)); let gossip_service = GossipService::new(&cluster_info.clone(), None, None, gossip_socket, &exit); diff --git a/core/src/replicator.rs b/core/src/replicator.rs index dabae998d..af160c2c0 100644 --- a/core/src/replicator.rs +++ b/core/src/replicator.rs @@ -118,7 +118,7 @@ impl Replicator { info!("Replicator: id: {}", keypair.pubkey()); info!("Creating cluster info...."); let mut cluster_info = ClusterInfo::new(node.info.clone(), keypair.clone()); - cluster_info.insert_info(leader_info.clone()); + cluster_info.set_entrypoint(leader_info.clone()); cluster_info.set_leader(leader_info.id); let cluster_info = Arc::new(RwLock::new(cluster_info)); @@ -202,7 +202,7 @@ impl Replicator { node_info.tvu = "0.0.0.0:0".parse().unwrap(); { let mut cluster_info_w = cluster_info.write().unwrap(); - cluster_info_w.insert_info(node_info); + cluster_info_w.insert_self(node_info); } let mut client = mk_client(leader_info);