From 34c3a0cc1f668323bf77ae6d477bed9011836050 Mon Sep 17 00:00:00 2001 From: Sagar Dhawan Date: Sat, 1 Dec 2018 12:00:30 -0800 Subject: [PATCH] Add signature verification to gossip (#1937) --- sdk/src/signature.rs | 16 ++++ src/cluster_info.rs | 151 ++++++++++++++++++++++++++++++++------ src/contact_info.rs | 48 +++++++++++- src/crds.rs | 56 +++----------- src/crds_gossip.rs | 65 ++++++++++++++++- src/crds_gossip_error.rs | 2 + src/crds_gossip_pull.rs | 12 +-- src/crds_gossip_push.rs | 7 +- src/crds_value.rs | 153 ++++++++++++++++++++++++++++++++++++--- src/fullnode.rs | 5 +- tests/data_replicator.rs | 6 +- tests/multinode.rs | 10 ++- 12 files changed, 432 insertions(+), 99 deletions(-) diff --git a/sdk/src/signature.rs b/sdk/src/signature.rs index 0cfcca4260..d00cc05f62 100644 --- a/sdk/src/signature.rs +++ b/sdk/src/signature.rs @@ -30,6 +30,22 @@ impl Signature { } } +pub trait Signable { + fn sign(&mut self, keypair: &Keypair) { + let data = self.signable_data(); + self.set_signature(Signature::new(&keypair.sign(&data).as_ref())); + } + fn verify(&self) -> bool { + self.get_signature() + .verify(&self.pubkey().as_ref(), &self.signable_data()) + } + + fn pubkey(&self) -> Pubkey; + fn signable_data(&self) -> Vec; + fn get_signature(&self) -> Signature; + fn set_signature(&mut self, signature: Signature); +} + impl AsRef<[u8]> for Signature { fn as_ref(&self) -> &[u8] { &self.0[..] diff --git a/src/cluster_info.rs b/src/cluster_info.rs index cac02c9222..f13e402fa2 100644 --- a/src/cluster_info.rs +++ b/src/cluster_info.rs @@ -27,7 +27,7 @@ use rand::{thread_rng, Rng}; use rayon::prelude::*; use result::Result; use rpc::RPC_PORT; -use signature::{Keypair, KeypairUtil}; +use signature::{Keypair, KeypairUtil, Signable, Signature}; use solana_sdk::hash::Hash; use solana_sdk::pubkey::Pubkey; use solana_sdk::timing::{duration_as_ms, timestamp}; @@ -60,17 +60,64 @@ pub enum ClusterInfoError { pub struct ClusterInfo { /// The network pub gossip: CrdsGossip, + /// set the keypair that will be used to sign crds values generated. It is unset only in tests. + keypair: Arc, } -// TODO These messages should be signed, and go through the gpu pipeline for spam filtering +#[derive(Debug, Deserialize, Serialize)] +pub struct PruneData { + /// Pubkey of the node that sent this prune data + pub pubkey: Pubkey, + /// Pubkeys of nodes that should be pruned + pub prunes: Vec, + /// Signature of this Prune Message + pub signature: Signature, + /// The Pubkey of the intended node/destination for this message + pub destination: Pubkey, + /// Wallclock of the node that generated this message + pub wallclock: u64, +} + +impl Signable for PruneData { + fn pubkey(&self) -> Pubkey { + self.pubkey + } + + fn signable_data(&self) -> Vec { + #[derive(Serialize)] + struct SignData { + pubkey: Pubkey, + prunes: Vec, + destination: Pubkey, + wallclock: u64, + } + let data = SignData { + pubkey: self.pubkey, + prunes: self.prunes.clone(), + destination: self.destination, + wallclock: self.wallclock, + }; + serialize(&data).expect("serialize PruneData") + } + + fn get_signature(&self) -> Signature { + self.signature + } + + fn set_signature(&mut self, signature: Signature) { + self.signature = signature + } +} + +// TODO These messages should go through the gpu pipeline for spam filtering #[derive(Serialize, Deserialize, Debug)] #[cfg_attr(feature = "cargo-clippy", allow(large_enum_variant))] enum Protocol { - /// Gosisp protocol messages + /// Gossip protocol messages PullRequest(Bloom, CrdsValue), PullResponse(Pubkey, Vec), PushMessage(Pubkey, Vec), - PruneMessage(Pubkey, Vec), + PruneMessage(Pubkey, PruneData), /// Window protocol messages /// TODO: move this message to a different module @@ -79,8 +126,13 @@ enum Protocol { impl ClusterInfo { pub fn new(node_info: NodeInfo) -> Self { + //Without a keypair, gossip will not function. Only useful for tests. + ClusterInfo::new_with_keypair(node_info, Arc::new(Keypair::new())) + } + pub fn new_with_keypair(node_info: NodeInfo, keypair: Arc) -> Self { let mut me = ClusterInfo { gossip: CrdsGossip::default(), + keypair, }; let id = node_info.id; me.gossip.set_self(id); @@ -92,12 +144,14 @@ impl ClusterInfo { let mut my_data = self.my_data(); let now = timestamp(); my_data.wallclock = now; - let entry = CrdsValue::ContactInfo(my_data); + let mut entry = CrdsValue::ContactInfo(my_data); + entry.sign(&self.keypair); self.gossip.refresh_push_active_set(); self.gossip.process_push_message(&[entry], now); } pub fn insert_info(&mut self, node_info: NodeInfo) { - let value = CrdsValue::ContactInfo(node_info); + let mut value = CrdsValue::ContactInfo(node_info); + value.sign(&self.keypair); let _ = self.gossip.crds.insert(value, timestamp()); } pub fn id(&self) -> Pubkey { @@ -165,13 +219,10 @@ impl ClusterInfo { let prev = self.leader_id(); let self_id = self.gossip.id; let now = timestamp(); - let leader = LeaderId { - id: self_id, - leader_id: key, - wallclock: now, - }; - let entry = CrdsValue::LeaderId(leader); + let leader = LeaderId::new(self_id, key, now); + let mut entry = CrdsValue::LeaderId(leader); warn!("{}: LEADER_UPDATE TO {} from {}", self_id, key, prev); + entry.sign(&self.keypair); self.gossip.process_push_message(&[entry], now); } @@ -743,15 +794,23 @@ impl ClusterInfo { .gossip .process_push_message(&data, timestamp()); if !prunes.is_empty() { - let mut wme = me.write().unwrap(); inc_new_counter_info!("cluster_info-push_message-prunes", prunes.len()); - let rsp = Protocol::PruneMessage(self_id, prunes); - let ci = wme.lookup(from).cloned(); - let pushes: Vec<_> = wme.new_push_requests(); + let ci = me.read().unwrap().lookup(from).cloned(); + let pushes: Vec<_> = me.write().unwrap().new_push_requests(); inc_new_counter_info!("cluster_info-push_message-pushes", pushes.len()); let mut rsp: Vec<_> = ci - .and_then(|ci| to_blob(rsp, ci.ncp).ok()) - .into_iter() + .and_then(|ci| { + let mut prune_msg = PruneData { + pubkey: self_id, + prunes, + signature: Signature::default(), + destination: from, + wallclock: timestamp(), + }; + prune_msg.sign(&me.read().unwrap().keypair); + let rsp = Protocol::PruneMessage(self_id, prune_msg); + to_blob(rsp, ci.ncp).ok() + }).into_iter() .collect(); let mut blobs: Vec<_> = pushes .into_iter() @@ -821,19 +880,35 @@ impl ClusterInfo { ledger_window: &mut Option<&mut LedgerWindow>, ) -> Vec { match request { - // TODO sigverify these + // TODO verify messages faster Protocol::PullRequest(filter, caller) => { + //Pulls don't need to be verified Self::handle_pull_request(me, filter, caller, from_addr) } - Protocol::PullResponse(from, data) => { + Protocol::PullResponse(from, mut data) => { + data.retain(|v| v.verify()); Self::handle_pull_response(me, from, data); vec![] } - Protocol::PushMessage(from, data) => Self::handle_push_message(me, from, &data), + Protocol::PushMessage(from, mut data) => { + data.retain(|v| v.verify()); + Self::handle_push_message(me, from, &data) + } Protocol::PruneMessage(from, data) => { - inc_new_counter_info!("cluster_info-prune_message", 1); - inc_new_counter_info!("cluster_info-prune_message-size", data.len()); - me.write().unwrap().gossip.process_prune_msg(from, &data); + if data.verify() { + inc_new_counter_info!("cluster_info-prune_message", 1); + inc_new_counter_info!("cluster_info-prune_message-size", data.prunes.len()); + me.write() + .unwrap() + .gossip + .process_prune_msg( + from, + data.destination, + &data.prunes, + data.wallclock, + timestamp(), + ).ok(); + } vec![] } Protocol::RequestWindowIndex(from, ix) => { @@ -1343,4 +1418,32 @@ mod tests { assert!(node.sockets.repair.local_addr().unwrap().port() >= FULLNODE_PORT_RANGE.0); assert!(node.sockets.repair.local_addr().unwrap().port() < FULLNODE_PORT_RANGE.1); } + + //test that all cluster_info objects only generate signed messages + //when constructed with keypairs + #[test] + fn test_gossip_signature_verification() { + //create new cluster info, leader, and peer + let keypair = Keypair::new(); + let peer_keypair = Keypair::new(); + let leader_keypair = Keypair::new(); + let node_info = NodeInfo::new_localhost(keypair.pubkey(), 0); + let leader = NodeInfo::new_localhost(leader_keypair.pubkey(), 0); + let peer = NodeInfo::new_localhost(peer_keypair.pubkey(), 0); + let mut cluster_info = ClusterInfo::new_with_keypair(node_info.clone(), Arc::new(keypair)); + cluster_info.set_leader(leader.id); + cluster_info.insert_info(peer.clone()); + //check that all types of gossip messages are signed correctly + let (_, _, vals) = cluster_info.gossip.new_push_messages(timestamp()); + // there should be some pushes ready + assert!(vals.len() > 0); + vals.par_iter().for_each(|v| assert!(v.verify())); + + let (_, _, val) = cluster_info + .gossip + .new_pull_request(timestamp()) + .ok() + .unwrap(); + assert!(val.verify()); + } } diff --git a/src/contact_info.rs b/src/contact_info.rs index 56512031c9..40aa333f18 100644 --- a/src/contact_info.rs +++ b/src/contact_info.rs @@ -1,5 +1,6 @@ +use bincode::serialize; use rpc::RPC_PORT; -use signature::{Keypair, KeypairUtil}; +use signature::{Keypair, KeypairUtil, Signable, Signature}; use solana_sdk::pubkey::Pubkey; use solana_sdk::timing::timestamp; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; @@ -8,6 +9,8 @@ use std::net::{IpAddr, Ipv4Addr, SocketAddr}; #[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] pub struct ContactInfo { pub id: Pubkey, + /// signature of this ContactInfo + pub signature: Signature, /// gossip address pub ncp: SocketAddr, /// address to connect to for replication @@ -52,6 +55,7 @@ impl Default for ContactInfo { rpc: socketaddr_any!(), rpc_pubsub: socketaddr_any!(), wallclock: 0, + signature: Signature::default(), } } } @@ -69,6 +73,7 @@ impl ContactInfo { ) -> Self { ContactInfo { id, + signature: Signature::default(), ncp, tvu, tpu, @@ -161,6 +166,47 @@ impl ContactInfo { } } +impl Signable for ContactInfo { + fn pubkey(&self) -> Pubkey { + self.id + } + + fn signable_data(&self) -> Vec { + #[derive(Serialize)] + struct SignData { + id: Pubkey, + ncp: SocketAddr, + tvu: SocketAddr, + tpu: SocketAddr, + storage_addr: SocketAddr, + rpc: SocketAddr, + rpc_pubsub: SocketAddr, + wallclock: u64, + } + + let me = self; + let data = SignData { + id: me.id, + ncp: me.ncp, + tvu: me.tvu, + tpu: me.tpu, + storage_addr: me.storage_addr, + rpc: me.rpc, + rpc_pubsub: me.rpc_pubsub, + wallclock: me.wallclock, + }; + serialize(&data).expect("failed to serialize ContactInfo") + } + + fn get_signature(&self) -> Signature { + self.signature + } + + fn set_signature(&mut self, signature: Signature) { + self.signature = signature + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/crds.rs b/src/crds.rs index 82e40320e0..4257f7b5df 100644 --- a/src/crds.rs +++ b/src/crds.rs @@ -41,7 +41,7 @@ pub enum CrdsError { InsertFailed, } -/// This structure stores some local metadata assosciated with the CrdsValue +/// This structure stores some local metadata associated with the CrdsValue /// The implementation of PartialOrd ensures that the "highest" version is always picked to be /// stored in the Crds #[derive(PartialEq, Debug)] @@ -188,11 +188,7 @@ mod test { let mut crds = Crds::default(); let original = CrdsValue::LeaderId(LeaderId::default()); assert_matches!(crds.insert(original.clone(), 0), Ok(_)); - let val = CrdsValue::LeaderId(LeaderId { - id: Pubkey::default(), - leader_id: Pubkey::default(), - wallclock: 1, - }); + let val = CrdsValue::LeaderId(LeaderId::new(Pubkey::default(), Pubkey::default(), 1)); assert_eq!( crds.insert(val.clone(), 1).unwrap().unwrap().value, original @@ -255,19 +251,11 @@ mod test { let key = Keypair::new(); let v1 = VersionedCrdsValue::new( 1, - CrdsValue::LeaderId(LeaderId { - id: key.pubkey(), - leader_id: Pubkey::default(), - wallclock: 0, - }), + CrdsValue::LeaderId(LeaderId::new(key.pubkey(), Pubkey::default(), 0)), ); let v2 = VersionedCrdsValue::new( 1, - CrdsValue::LeaderId(LeaderId { - id: key.pubkey(), - leader_id: Pubkey::default(), - wallclock: 0, - }), + CrdsValue::LeaderId(LeaderId::new(key.pubkey(), Pubkey::default(), 0)), ); assert!(!(v1 != v2)); assert!(v1 == v2); @@ -277,19 +265,11 @@ mod test { let key = Keypair::new(); let v1 = VersionedCrdsValue::new( 1, - CrdsValue::LeaderId(LeaderId { - id: key.pubkey(), - leader_id: Pubkey::default(), - wallclock: 0, - }), + CrdsValue::LeaderId(LeaderId::new(key.pubkey(), Pubkey::default(), 0)), ); let v2 = VersionedCrdsValue::new( 1, - CrdsValue::LeaderId(LeaderId { - id: key.pubkey(), - leader_id: key.pubkey(), - wallclock: 0, - }), + CrdsValue::LeaderId(LeaderId::new(key.pubkey(), key.pubkey(), 0)), ); assert!(v1 != v2); assert!(!(v1 == v2)); @@ -304,19 +284,11 @@ mod test { let key = Keypair::new(); let v1 = VersionedCrdsValue::new( 1, - CrdsValue::LeaderId(LeaderId { - id: key.pubkey(), - leader_id: Pubkey::default(), - wallclock: 1, - }), + CrdsValue::LeaderId(LeaderId::new(key.pubkey(), Pubkey::default(), 1)), ); let v2 = VersionedCrdsValue::new( 1, - CrdsValue::LeaderId(LeaderId { - id: key.pubkey(), - leader_id: Pubkey::default(), - wallclock: 0, - }), + CrdsValue::LeaderId(LeaderId::new(key.pubkey(), Pubkey::default(), 0)), ); assert!(v1 > v2); assert!(!(v1 < v2)); @@ -327,19 +299,11 @@ mod test { fn test_label_order() { let v1 = VersionedCrdsValue::new( 1, - CrdsValue::LeaderId(LeaderId { - id: Keypair::new().pubkey(), - leader_id: Pubkey::default(), - wallclock: 0, - }), + CrdsValue::LeaderId(LeaderId::new(Keypair::new().pubkey(), Pubkey::default(), 0)), ); let v2 = VersionedCrdsValue::new( 1, - CrdsValue::LeaderId(LeaderId { - id: Keypair::new().pubkey(), - leader_id: Pubkey::default(), - wallclock: 0, - }), + CrdsValue::LeaderId(LeaderId::new(Keypair::new().pubkey(), Pubkey::default(), 0)), ); assert!(v1 != v2); assert!(!(v1 == v2)); diff --git a/src/crds_gossip.rs b/src/crds_gossip.rs index 57fb21bbe1..400cf3c61d 100644 --- a/src/crds_gossip.rs +++ b/src/crds_gossip.rs @@ -12,6 +12,9 @@ use crds_value::CrdsValue; use solana_sdk::hash::Hash; use solana_sdk::pubkey::Pubkey; +///The min size for bloom filters +pub const CRDS_GOSSIP_BLOOM_SIZE: usize = 1000; + pub struct CrdsGossip { pub crds: Crds, pub id: Pubkey, @@ -64,8 +67,24 @@ impl CrdsGossip { } /// add the `from` to the peer's filter of nodes - pub fn process_prune_msg(&mut self, peer: Pubkey, origin: &[Pubkey]) { - self.push.process_prune_msg(peer, origin) + pub fn process_prune_msg( + &mut self, + peer: Pubkey, + destination: Pubkey, + origin: &[Pubkey], + wallclock: u64, + now: u64, + ) -> Result<(), CrdsGossipError> { + let expired = now > wallclock + self.push.prune_timeout; + if expired { + return Err(CrdsGossipError::PruneMessageTimeout); + } + if self.id == destination { + self.push.process_prune_msg(peer, origin); + Ok(()) + } else { + Err(CrdsGossipError::BadPruneDestination) + } } /// refresh the push active set @@ -138,11 +157,14 @@ impl CrdsGossip { mod test { use super::*; use bincode::serialized_size; + use cluster_info::NodeInfo; use contact_info::ContactInfo; use crds_gossip_push::CRDS_GOSSIP_PUSH_MSG_TIMEOUT_MS; use crds_value::CrdsValueLabel; use rayon::prelude::*; use signature::{Keypair, KeypairUtil}; + use solana_sdk::hash::hash; + use solana_sdk::timing::timestamp; use std::collections::HashMap; use std::sync::{Arc, Mutex}; @@ -317,8 +339,13 @@ mod test { prunes += rsps.len(); network .get(&from) - .map(|node| node.lock().unwrap().process_prune_msg(*to, &rsps)) - .unwrap(); + .map(|node| { + let mut node = node.lock().unwrap(); + let destination = node.id; + let now = timestamp(); + node.process_prune_msg(*to, destination, &rsps, now, now) + .unwrap() + }).unwrap(); delivered += rsps.is_empty() as usize; } (bytes, delivered, num_msgs, prunes) @@ -483,4 +510,34 @@ mod test { let mut network = star_network_create(4002); network_simulator(&mut network); } + #[test] + fn test_prune_errors() { + let mut crds_gossip = CrdsGossip::default(); + crds_gossip.id = Pubkey::new(&[0; 32]); + let id = crds_gossip.id; + let ci = NodeInfo::new_localhost(Pubkey::new(&[1; 32]), 0); + let prune_pubkey = Pubkey::new(&[2; 32]); + crds_gossip + .crds + .insert(CrdsValue::ContactInfo(ci.clone()), 0) + .unwrap(); + crds_gossip.refresh_push_active_set(); + let now = timestamp(); + //incorrect dest + let mut res = crds_gossip.process_prune_msg( + ci.id, + Pubkey::new(hash(&[1; 32]).as_ref()), + &[prune_pubkey], + now, + now, + ); + assert_eq!(res.err(), Some(CrdsGossipError::BadPruneDestination)); + //correct dest + res = crds_gossip.process_prune_msg(ci.id, id, &[prune_pubkey], now, now); + assert!(res.is_ok()); + //test timeout + let timeout = now + crds_gossip.push.prune_timeout * 2; + res = crds_gossip.process_prune_msg(ci.id, id, &[prune_pubkey], now, timeout); + assert_eq!(res.err(), Some(CrdsGossipError::PruneMessageTimeout)); + } } diff --git a/src/crds_gossip_error.rs b/src/crds_gossip_error.rs index d9d00ce77c..2c3cb51762 100644 --- a/src/crds_gossip_error.rs +++ b/src/crds_gossip_error.rs @@ -4,4 +4,6 @@ pub enum CrdsGossipError { PushMessageTimeout, PushMessagePrune, PushMessageOldVersion, + BadPruneDestination, + PruneMessageTimeout, } diff --git a/src/crds_gossip_pull.rs b/src/crds_gossip_pull.rs index 2b77fbfa1b..5f1f1a1e49 100644 --- a/src/crds_gossip_pull.rs +++ b/src/crds_gossip_pull.rs @@ -12,6 +12,7 @@ use bincode::serialized_size; use bloom::Bloom; use crds::Crds; +use crds_gossip::CRDS_GOSSIP_BLOOM_SIZE; use crds_gossip_error::CrdsGossipError; use crds_value::{CrdsValue, CrdsValueLabel}; use packet::BLOB_DATA_SIZE; @@ -135,7 +136,10 @@ impl CrdsGossipPull { } /// build a filter of the current crds table fn build_crds_filter(&self, crds: &Crds) -> Bloom { - let num = crds.table.values().count() + self.purged_values.len(); + let num = cmp::max( + CRDS_GOSSIP_BLOOM_SIZE, + crds.table.values().count() + self.purged_values.len(), + ); let mut bloom = Bloom::random(num, 0.1, 4 * 1024 * 8 - 1); for v in crds.table.values() { bloom.add(&v.value_hash); @@ -292,11 +296,7 @@ mod test { // node contains a key from the dest node, but at an older local timestamp let dest_id = new.label().pubkey(); - let same_key = CrdsValue::LeaderId(LeaderId { - id: dest_id, - leader_id: dest_id, - wallclock: 1, - }); + let same_key = CrdsValue::LeaderId(LeaderId::new(dest_id, dest_id, 1)); node_crds.insert(same_key.clone(), 0).unwrap(); assert_eq!( node_crds diff --git a/src/crds_gossip_push.rs b/src/crds_gossip_push.rs index 00f50ba630..078001dfd4 100644 --- a/src/crds_gossip_push.rs +++ b/src/crds_gossip_push.rs @@ -12,6 +12,7 @@ use bincode::serialized_size; use bloom::Bloom; use contact_info::ContactInfo; use crds::{Crds, VersionedCrdsValue}; +use crds_gossip::CRDS_GOSSIP_BLOOM_SIZE; use crds_gossip_error::CrdsGossipError; use crds_value::{CrdsValue, CrdsValueLabel}; use indexmap::map::IndexMap; @@ -25,6 +26,7 @@ use std::collections::HashMap; pub const CRDS_GOSSIP_NUM_ACTIVE: usize = 30; pub const CRDS_GOSSIP_PUSH_FANOUT: usize = 6; pub const CRDS_GOSSIP_PUSH_MSG_TIMEOUT_MS: u64 = 5000; +pub const CRDS_GOSSIP_PRUNE_MSG_TIMEOUT_MS: u64 = 500; pub struct CrdsGossipPush { /// max bytes per message @@ -37,6 +39,7 @@ pub struct CrdsGossipPush { pub num_active: usize, pub push_fanout: usize, pub msg_timeout: u64, + pub prune_timeout: u64, } impl Default for CrdsGossipPush { @@ -49,6 +52,7 @@ impl Default for CrdsGossipPush { num_active: CRDS_GOSSIP_NUM_ACTIVE, push_fanout: CRDS_GOSSIP_PUSH_FANOUT, msg_timeout: CRDS_GOSSIP_PUSH_MSG_TIMEOUT_MS, + prune_timeout: CRDS_GOSSIP_PRUNE_MSG_TIMEOUT_MS, } } } @@ -183,7 +187,8 @@ impl CrdsGossipPush { continue; } } - let bloom = Bloom::random(network_size, 0.1, 1024 * 8 * 4); + let size = cmp::max(CRDS_GOSSIP_BLOOM_SIZE, network_size); + let mut bloom = Bloom::random(size, 0.1, 1024 * 8 * 4); new_items.insert(val.0.pubkey(), bloom); if new_items.len() == need { break; diff --git a/src/crds_value.rs b/src/crds_value.rs index 9ad502cb0e..600933a33c 100644 --- a/src/crds_value.rs +++ b/src/crds_value.rs @@ -1,4 +1,6 @@ +use bincode::serialize; use contact_info::ContactInfo; +use signature::{Keypair, Signable, Signature}; use solana_sdk::pubkey::Pubkey; use solana_sdk::transaction::Transaction; use std::fmt; @@ -18,6 +20,7 @@ pub enum CrdsValue { #[derive(Serialize, Deserialize, Default, Clone, Debug, PartialEq)] pub struct LeaderId { pub id: Pubkey, + pub signature: Signature, pub leader_id: Pubkey, pub wallclock: u64, } @@ -25,12 +28,71 @@ pub struct LeaderId { #[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] pub struct Vote { pub transaction: Transaction, + pub signature: Signature, pub height: u64, pub wallclock: u64, } +impl Signable for LeaderId { + fn pubkey(&self) -> Pubkey { + self.id + } + + fn signable_data(&self) -> Vec { + #[derive(Serialize)] + struct SignData { + id: Pubkey, + leader_id: Pubkey, + wallclock: u64, + } + let data = SignData { + id: self.id, + leader_id: self.leader_id, + wallclock: self.wallclock, + }; + serialize(&data).expect("unable to serialize LeaderId") + } + + fn get_signature(&self) -> Signature { + self.signature + } + + fn set_signature(&mut self, signature: Signature) { + self.signature = signature + } +} + +impl Signable for Vote { + fn pubkey(&self) -> Pubkey { + self.transaction.account_keys[0] + } + + fn signable_data(&self) -> Vec { + #[derive(Serialize)] + struct SignData { + transaction: Transaction, + height: u64, + wallclock: u64, + } + let data = SignData { + transaction: self.transaction.clone(), + height: self.height, + wallclock: self.wallclock, + }; + serialize(&data).expect("unable to serialize Vote") + } + + fn get_signature(&self) -> Signature { + self.signature + } + + fn set_signature(&mut self, signature: Signature) { + self.signature = signature + } +} + /// Type of the replicated value -/// These are labels for values in a record that is assosciated with `Pubkey` +/// These are labels for values in a record that is associated with `Pubkey` #[derive(PartialEq, Hash, Eq, Clone, Debug)] pub enum CrdsValueLabel { ContactInfo(Pubkey), @@ -58,8 +120,30 @@ impl CrdsValueLabel { } } +impl LeaderId { + pub fn new(id: Pubkey, leader_id: Pubkey, wallclock: u64) -> Self { + LeaderId { + id, + signature: Signature::default(), + leader_id, + wallclock, + } + } +} + +impl Vote { + pub fn new(transaction: Transaction, height: u64, wallclock: u64) -> Self { + Vote { + transaction, + signature: Signature::default(), + height, + wallclock, + } + } +} + impl CrdsValue { - /// Totally unsecure unverfiable wallclock of the node that generatd this message + /// Totally unsecure unverfiable wallclock of the node that generated this message /// Latest wallclock is always picked. /// This is used to time out push messages. pub fn wallclock(&self) -> u64 { @@ -71,9 +155,11 @@ impl CrdsValue { } pub fn label(&self) -> CrdsValueLabel { match self { - CrdsValue::ContactInfo(contact_info) => CrdsValueLabel::ContactInfo(contact_info.id), - CrdsValue::Vote(vote) => CrdsValueLabel::Vote(vote.transaction.account_keys[0]), - CrdsValue::LeaderId(leader_id) => CrdsValueLabel::LeaderId(leader_id.id), + CrdsValue::ContactInfo(contact_info) => { + CrdsValueLabel::ContactInfo(contact_info.pubkey()) + } + CrdsValue::Vote(vote) => CrdsValueLabel::Vote(vote.pubkey()), + CrdsValue::LeaderId(leader_id) => CrdsValueLabel::LeaderId(leader_id.pubkey()), } } pub fn contact_info(&self) -> Option<&ContactInfo> { @@ -103,10 +189,50 @@ impl CrdsValue { ] } } + +impl Signable for CrdsValue { + fn sign(&mut self, keypair: &Keypair) { + match self { + CrdsValue::ContactInfo(contact_info) => contact_info.sign(keypair), + CrdsValue::Vote(vote) => vote.sign(keypair), + CrdsValue::LeaderId(leader_id) => leader_id.sign(keypair), + }; + } + fn verify(&self) -> bool { + match self { + CrdsValue::ContactInfo(contact_info) => contact_info.verify(), + CrdsValue::Vote(vote) => vote.verify(), + CrdsValue::LeaderId(leader_id) => leader_id.verify(), + } + } + + fn pubkey(&self) -> Pubkey { + match self { + CrdsValue::ContactInfo(contact_info) => contact_info.pubkey(), + CrdsValue::Vote(vote) => vote.pubkey(), + CrdsValue::LeaderId(leader_id) => leader_id.pubkey(), + } + } + + fn signable_data(&self) -> Vec { + unimplemented!() + } + + fn get_signature(&self) -> Signature { + unimplemented!() + } + + fn set_signature(&mut self, _: Signature) { + unimplemented!() + } +} + #[cfg(test)] mod test { use super::*; use contact_info::ContactInfo; + use solana_sdk::signature::{Keypair, KeypairUtil}; + use solana_sdk::timing::timestamp; use system_transaction::test_tx; #[test] @@ -134,14 +260,21 @@ mod test { let key = v.clone().contact_info().unwrap().id; assert_eq!(v.label(), CrdsValueLabel::ContactInfo(key)); - let v = CrdsValue::Vote(Vote { - transaction: test_tx(), - height: 1, - wallclock: 0, - }); + let v = CrdsValue::Vote(Vote::new(test_tx(), 1, 0)); assert_eq!(v.wallclock(), 0); let key = v.clone().vote().unwrap().transaction.account_keys[0]; assert_eq!(v.label(), CrdsValueLabel::Vote(key)); } + #[test] + fn test_signature() { + let keypair = Keypair::new(); + let fake_keypair = Keypair::new(); + let leader = LeaderId::new(keypair.pubkey(), Pubkey::default(), timestamp()); + let mut v = CrdsValue::LeaderId(leader); + v.sign(&keypair); + assert!(v.verify()); + v.sign(&fake_keypair); + assert!(!v.verify()); + } } diff --git a/src/fullnode.rs b/src/fullnode.rs index 9f61681d3c..328552da2c 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -232,7 +232,10 @@ impl Fullnode { let window = new_window(32 * 1024); let shared_window = Arc::new(RwLock::new(window)); node.info.wallclock = timestamp(); - let cluster_info = Arc::new(RwLock::new(ClusterInfo::new(node.info))); + let cluster_info = Arc::new(RwLock::new(ClusterInfo::new_with_keypair( + node.info, + keypair.clone(), + ))); let (rpc_service, rpc_pubsub_service) = Self::startup_rpc_services(rpc_addr, rpc_pubsub_addr, &bank, &cluster_info); diff --git a/tests/data_replicator.rs b/tests/data_replicator.rs index 4a6dd0d809..4bd7977ec0 100644 --- a/tests/data_replicator.rs +++ b/tests/data_replicator.rs @@ -11,6 +11,7 @@ use solana::ncp::Ncp; use solana::packet::{Blob, SharedBlob}; use solana::result; use solana::service::Service; +use solana::signature::{Keypair, KeypairUtil}; use solana_sdk::timing::timestamp; use std::net::UdpSocket; use std::sync::atomic::{AtomicBool, Ordering}; @@ -19,8 +20,9 @@ use std::thread::sleep; use std::time::Duration; fn test_node(exit: Arc) -> (Arc>, Ncp, UdpSocket) { - let mut tn = Node::new_localhost(); - let cluster_info = ClusterInfo::new(tn.info.clone()); + let keypair = Keypair::new(); + let mut tn = Node::new_localhost_with_pubkey(keypair.pubkey()); + let cluster_info = ClusterInfo::new_with_keypair(tn.info.clone(), Arc::new(keypair)); let c = Arc::new(RwLock::new(cluster_info)); let w = Arc::new(RwLock::new(vec![])); let d = Ncp::new(&c.clone(), w, None, tn.sockets.gossip, exit); diff --git a/tests/multinode.rs b/tests/multinode.rs index 0b3a51e2dd..e8217c6361 100644 --- a/tests/multinode.rs +++ b/tests/multinode.rs @@ -42,12 +42,13 @@ use std::thread::{sleep, Builder, JoinHandle}; use std::time::{Duration, Instant}; fn make_spy_node(leader: &NodeInfo) -> (Ncp, Arc>, Pubkey) { + let keypair = Keypair::new(); let exit = Arc::new(AtomicBool::new(false)); - let mut spy = Node::new_localhost(); + let mut spy = Node::new_localhost_with_pubkey(keypair.pubkey()); let me = spy.info.id.clone(); let daddr = "0.0.0.0:0".parse().unwrap(); spy.info.tvu = daddr; - let mut spy_cluster_info = ClusterInfo::new(spy.info); + let mut spy_cluster_info = ClusterInfo::new_with_keypair(spy.info, Arc::new(keypair)); spy_cluster_info.insert_info(leader.clone()); spy_cluster_info.set_leader(leader.id); let spy_cluster_info_ref = Arc::new(RwLock::new(spy_cluster_info)); @@ -64,11 +65,12 @@ fn make_spy_node(leader: &NodeInfo) -> (Ncp, Arc>, Pubkey) { } fn make_listening_node(leader: &NodeInfo) -> (Ncp, Arc>, Node, Pubkey) { + let keypair = Keypair::new(); let exit = Arc::new(AtomicBool::new(false)); - let new_node = Node::new_localhost(); + let new_node = Node::new_localhost_with_pubkey(keypair.pubkey()); let new_node_info = new_node.info.clone(); let me = new_node.info.id.clone(); - let mut new_node_cluster_info = ClusterInfo::new(new_node_info); + let mut new_node_cluster_info = ClusterInfo::new_with_keypair(new_node_info, Arc::new(keypair)); new_node_cluster_info.insert_info(leader.clone()); new_node_cluster_info.set_leader(leader.id); let new_node_cluster_info_ref = Arc::new(RwLock::new(new_node_cluster_info));